Skip to content

Commit b59ab80

Browse files
committed
Addressed comments
1 parent 8938992 commit b59ab80

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count
4242
this.count += that.count
4343
}
4444

45-
def avg: Long = (sum.toDouble / count).toLong
45+
def avg: Long = sum / count
4646
}
4747

4848
object EventTimeStats {
@@ -76,7 +76,7 @@ class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTime
7676
* adding appropriate metadata to this column, this operator also tracks the maximum observed event
7777
* time. Based on the maximum observed time and a user specified delay, we can calculate the
7878
* `watermark` after which we assume we will no longer see late records for a particular time
79-
* period.
79+
* period. Note that event time is measured in milliseconds.
8080
*/
8181
case class EventTimeWatermarkExec(
8282
eventTime: Attribute,
@@ -90,7 +90,7 @@ case class EventTimeWatermarkExec(
9090
child.execute().mapPartitions { iter =>
9191
val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output)
9292
iter.map { row =>
93-
eventTimeStats.add((getEventTime(row).getLong(0).toDouble / 1000).toLong)
93+
eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
9494
row
9595
}
9696
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

+9-2
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,20 @@ class StateOperatorProgress private[sql](
6161
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
6262
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
6363
* @param name User-specified name of the query, null if not specified.
64-
* @param timestamp Timestamp (ms) of the beginning of the trigger.
64+
* @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
6565
* @param batchId A unique id for the current batch of data being processed. Note that in the
6666
* case of retries after a failure a given batchId my be executed more than once.
6767
* Similarly, when there is no data to be processed, the batchId will not be
6868
* incremented.
6969
* @param durationMs The amount of time taken to perform various operations in milliseconds.
70-
* @param eventTime Statistics of event time seen in this batch
70+
* @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
71+
* {
72+
* "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger
73+
* "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger
74+
* "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger
75+
* "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger
76+
* }
77+
* All timestamps are in ISO8601 format, i.e. UTC timestamps.
7178
* @param stateOperators Information about operators in the query that store state.
7279
* @param sources detailed statistics on data being read from each of the streaming sources.
7380
* @since 2.1.0

0 commit comments

Comments
 (0)