Skip to content

Commit 25b9758

Browse files
committed
[SPARK-18834][SS] Expose event time stats through StreamingQueryProgress
## What changes were proposed in this pull request? - Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings. - Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started. ## How was this patch tested? Updated tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16258 from tdas/SPARK-18834. (cherry picked from commit c68fb42) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent f672bfd commit 25b9758

File tree

8 files changed

+161
-66
lines changed

8 files changed

+161
-66
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala

+38-17
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20-
import scala.math.max
21-
2220
import org.apache.spark.rdd.RDD
2321
import org.apache.spark.sql.catalyst.InternalRow
2422
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
@@ -28,24 +26,48 @@ import org.apache.spark.sql.types.MetadataBuilder
2826
import org.apache.spark.unsafe.types.CalendarInterval
2927
import org.apache.spark.util.AccumulatorV2
3028

31-
/** Tracks the maximum positive long seen. */
32-
class MaxLong(protected var currentValue: Long = 0)
33-
extends AccumulatorV2[Long, Long] {
29+
/** Class for collecting event time stats with an accumulator */
30+
case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count: Long) {
31+
def add(eventTime: Long): Unit = {
32+
this.max = math.max(this.max, eventTime)
33+
this.min = math.min(this.min, eventTime)
34+
this.sum += eventTime
35+
this.count += 1
36+
}
37+
38+
def merge(that: EventTimeStats): Unit = {
39+
this.max = math.max(this.max, that.max)
40+
this.min = math.min(this.min, that.min)
41+
this.sum += that.sum
42+
this.count += that.count
43+
}
44+
45+
def avg: Long = sum / count
46+
}
47+
48+
object EventTimeStats {
49+
def zero: EventTimeStats = EventTimeStats(
50+
max = Long.MinValue, min = Long.MaxValue, sum = 0L, count = 0L)
51+
}
52+
53+
/** Accumulator that collects stats on event time in a batch. */
54+
class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTimeStats.zero)
55+
extends AccumulatorV2[Long, EventTimeStats] {
3456

35-
override def isZero: Boolean = value == 0
36-
override def value: Long = currentValue
37-
override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue)
57+
override def isZero: Boolean = value == EventTimeStats.zero
58+
override def value: EventTimeStats = currentStats
59+
override def copy(): AccumulatorV2[Long, EventTimeStats] = new EventTimeStatsAccum(currentStats)
3860

3961
override def reset(): Unit = {
40-
currentValue = 0
62+
currentStats = EventTimeStats.zero
4163
}
4264

4365
override def add(v: Long): Unit = {
44-
currentValue = max(v, value)
66+
currentStats.add(v)
4567
}
4668

47-
override def merge(other: AccumulatorV2[Long, Long]): Unit = {
48-
currentValue = max(value, other.value)
69+
override def merge(other: AccumulatorV2[Long, EventTimeStats]): Unit = {
70+
currentStats.merge(other.value)
4971
}
5072
}
5173

@@ -54,22 +76,21 @@ class MaxLong(protected var currentValue: Long = 0)
5476
* adding appropriate metadata to this column, this operator also tracks the maximum observed event
5577
* time. Based on the maximum observed time and a user specified delay, we can calculate the
5678
* `watermark` after which we assume we will no longer see late records for a particular time
57-
* period.
79+
* period. Note that event time is measured in milliseconds.
5880
*/
5981
case class EventTimeWatermarkExec(
6082
eventTime: Attribute,
6183
delay: CalendarInterval,
6284
child: SparkPlan) extends SparkPlan {
6385

64-
// TODO: Use Spark SQL Metrics?
65-
val maxEventTime = new MaxLong
66-
sparkContext.register(maxEventTime)
86+
val eventTimeStats = new EventTimeStatsAccum()
87+
sparkContext.register(eventTimeStats)
6788

6889
override protected def doExecute(): RDD[InternalRow] = {
6990
child.execute().mapPartitions { iter =>
7091
val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output)
7192
iter.map { row =>
72-
maxEventTime.add(getEventTime(row).getLong(0))
93+
eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
7394
row
7495
}
7596
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

+27-11
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ import org.apache.spark.util.Clock
4141
trait ProgressReporter extends Logging {
4242

4343
case class ExecutionStats(
44-
inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress])
44+
inputRows: Map[Source, Long],
45+
stateOperators: Seq[StateOperatorProgress],
46+
eventTimeStats: Map[String, String])
4547

4648
// Internal state of the stream, required for computing metrics.
4749
protected def id: UUID
@@ -127,12 +129,7 @@ trait ProgressReporter extends Logging {
127129
protected def finishTrigger(hasNewData: Boolean): Unit = {
128130
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
129131

130-
val executionStats: ExecutionStats = if (!hasNewData) {
131-
ExecutionStats(Map.empty, Seq.empty)
132-
} else {
133-
extractExecutionStats
134-
}
135-
132+
val executionStats = extractExecutionStats(hasNewData)
136133
val processingTimeSec =
137134
(currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000
138135

@@ -160,10 +157,10 @@ trait ProgressReporter extends Logging {
160157
id = id,
161158
runId = runId,
162159
name = name,
163-
timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
160+
timestamp = formatTimestamp(currentTriggerStartTimestamp),
164161
batchId = currentBatchId,
165162
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
166-
currentWatermark = offsetSeqMetadata.batchWatermarkMs,
163+
eventTime = executionStats.eventTimeStats.asJava,
167164
stateOperators = executionStats.stateOperators.toArray,
168165
sources = sourceProgress.toArray,
169166
sink = sinkProgress)
@@ -184,7 +181,13 @@ trait ProgressReporter extends Logging {
184181
}
185182

186183
/** Extracts statistics from the most recent query execution. */
187-
private def extractExecutionStats: ExecutionStats = {
184+
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
185+
val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
186+
187+
if (!hasNewData) {
188+
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
189+
}
190+
188191
// We want to associate execution plan leaves to sources that generate them, so that we match
189192
// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
190193
// Consider the translation from the streaming logical plan to the final executed plan.
@@ -241,7 +244,16 @@ trait ProgressReporter extends Logging {
241244
numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
242245
}
243246

244-
ExecutionStats(numInputRows, stateOperators)
247+
val eventTimeStats = lastExecution.executedPlan.collect {
248+
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
249+
val stats = e.eventTimeStats.value
250+
Map(
251+
"max" -> stats.max,
252+
"min" -> stats.min,
253+
"avg" -> stats.avg).mapValues(formatTimestamp)
254+
}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
255+
256+
ExecutionStats(numInputRows, stateOperators, eventTimeStats)
245257
}
246258

247259
/** Records the duration of running `body` for the next query progress update. */
@@ -257,6 +269,10 @@ trait ProgressReporter extends Logging {
257269
result
258270
}
259271

272+
private def formatTimestamp(millis: Long): String = {
273+
timestampFormat.format(new Date(millis))
274+
}
275+
260276
/** Updates the message returned in `status`. */
261277
protected def updateStatusMessage(message: String): Unit = {
262278
currentStatus = currentStatus.copy(message = message)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

+18-15
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,24 @@ class StreamExecution(
382382
if (hasNewData) {
383383
// Current batch timestamp in milliseconds
384384
offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
385+
// Update the eventTime watermark if we find one in the plan.
386+
if (lastExecution != null) {
387+
lastExecution.executedPlan.collect {
388+
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
389+
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
390+
e.eventTimeStats.value.max - e.delay.milliseconds
391+
}.headOption.foreach { newWatermarkMs =>
392+
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
393+
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
394+
offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
395+
} else {
396+
logDebug(
397+
s"Event time didn't move: $newWatermarkMs < " +
398+
s"${offsetSeqMetadata.batchWatermarkMs}")
399+
}
400+
}
401+
}
402+
385403
updateStatusMessage("Writing offsets to log")
386404
reportTimeTaken("walCommit") {
387405
assert(offsetLog.add(
@@ -485,21 +503,6 @@ class StreamExecution(
485503
sink.addBatch(currentBatchId, nextBatch)
486504
}
487505

488-
// Update the eventTime watermark if we find one in the plan.
489-
lastExecution.executedPlan.collect {
490-
case e: EventTimeWatermarkExec =>
491-
logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
492-
(e.maxEventTime.value / 1000) - e.delay.milliseconds()
493-
}.headOption.foreach { newWatermark =>
494-
if (newWatermark > offsetSeqMetadata.batchWatermarkMs) {
495-
logInfo(s"Updating eventTime watermark to: $newWatermark ms")
496-
offsetSeqMetadata.batchWatermarkMs = newWatermark
497-
} else {
498-
logTrace(s"Event time didn't move: $newWatermark < " +
499-
s"$offsetSeqMetadata.currentEventTimeWatermark")
500-
}
501-
}
502-
503506
awaitBatchLock.lock()
504507
try {
505508
// Wake up any threads that are waiting for the stream to progress.

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

+21-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.streaming
1919

2020
import java.{util => ju}
21+
import java.lang.{Long => JLong}
2122
import java.util.UUID
2223

2324
import scala.collection.JavaConverters._
@@ -29,7 +30,6 @@ import org.json4s.JsonDSL._
2930
import org.json4s.jackson.JsonMethods._
3031

3132
import org.apache.spark.annotation.Experimental
32-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3333

3434
/**
3535
* :: Experimental ::
@@ -61,13 +61,20 @@ class StateOperatorProgress private[sql](
6161
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
6262
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
6363
* @param name User-specified name of the query, null if not specified.
64-
* @param timestamp Timestamp (ms) of the beginning of the trigger.
64+
* @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
6565
* @param batchId A unique id for the current batch of data being processed. Note that in the
6666
* case of retries after a failure a given batchId my be executed more than once.
6767
* Similarly, when there is no data to be processed, the batchId will not be
6868
* incremented.
6969
* @param durationMs The amount of time taken to perform various operations in milliseconds.
70-
* @param currentWatermark The current event time watermark in milliseconds
70+
* @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
71+
* {
72+
* "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger
73+
* "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger
74+
* "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger
75+
* "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger
76+
* }
77+
* All timestamps are in ISO8601 format, i.e. UTC timestamps.
7178
* @param stateOperators Information about operators in the query that store state.
7279
* @param sources detailed statistics on data being read from each of the streaming sources.
7380
* @since 2.1.0
@@ -79,8 +86,8 @@ class StreamingQueryProgress private[sql](
7986
val name: String,
8087
val timestamp: String,
8188
val batchId: Long,
82-
val durationMs: ju.Map[String, java.lang.Long],
83-
val currentWatermark: Long,
89+
val durationMs: ju.Map[String, JLong],
90+
val eventTime: ju.Map[String, String],
8491
val stateOperators: Array[StateOperatorProgress],
8592
val sources: Array[SourceProgress],
8693
val sink: SinkProgress) {
@@ -107,18 +114,22 @@ class StreamingQueryProgress private[sql](
107114
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
108115
}
109116

117+
/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
118+
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
119+
if (map.isEmpty) return JNothing
120+
val keys = map.asScala.keySet.toSeq.sorted
121+
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
122+
}
123+
110124
("id" -> JString(id.toString)) ~
111125
("runId" -> JString(runId.toString)) ~
112126
("name" -> JString(name)) ~
113127
("timestamp" -> JString(timestamp)) ~
114128
("numInputRows" -> JInt(numInputRows)) ~
115129
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
116130
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
117-
("durationMs" -> durationMs
118-
.asScala
119-
.map { case (k, v) => k -> JInt(v.toLong): JObject }
120-
.reduce(_ ~ _)) ~
121-
("currentWatermark" -> JInt(currentWatermark)) ~
131+
("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
132+
("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
122133
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
123134
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
124135
("sink" -> sink.jsonValue)

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

+3
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
185185

186186
test("QueryProgressEvent serialization") {
187187
def testSerialization(event: QueryProgressEvent): Unit = {
188+
import scala.collection.JavaConverters._
188189
val json = JsonProtocol.sparkEventToJson(event)
189190
val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
190191
assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality
192+
assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
193+
assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala)
191194
}
192195
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
193196
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala

+12-4
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
4444
| "durationMs" : {
4545
| "total" : 0
4646
| },
47-
| "currentWatermark" : 3,
47+
| "eventTime" : {
48+
| "avg" : "2016-12-05T20:54:20.827Z",
49+
| "max" : "2016-12-05T20:54:20.827Z",
50+
| "min" : "2016-12-05T20:54:20.827Z",
51+
| "watermark" : "2016-12-05T20:54:20.827Z"
52+
| },
4853
| "stateOperators" : [ {
4954
| "numRowsTotal" : 0,
5055
| "numRowsUpdated" : 1
@@ -76,7 +81,6 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
7681
| "durationMs" : {
7782
| "total" : 0
7883
| },
79-
| "currentWatermark" : 3,
8084
| "stateOperators" : [ {
8185
| "numRowsTotal" : 0,
8286
| "numRowsUpdated" : 1
@@ -134,7 +138,11 @@ object StreamingQueryStatusAndProgressSuite {
134138
timestamp = "2016-12-05T20:54:20.827Z",
135139
batchId = 2L,
136140
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
137-
currentWatermark = 3L,
141+
eventTime = Map(
142+
"max" -> "2016-12-05T20:54:20.827Z",
143+
"min" -> "2016-12-05T20:54:20.827Z",
144+
"avg" -> "2016-12-05T20:54:20.827Z",
145+
"watermark" -> "2016-12-05T20:54:20.827Z").asJava,
138146
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
139147
sources = Array(
140148
new SourceProgress(
@@ -156,7 +164,7 @@ object StreamingQueryStatusAndProgressSuite {
156164
timestamp = "2016-12-05T20:54:20.827Z",
157165
batchId = 2L,
158166
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
159-
currentWatermark = 3L,
167+
eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly
160168
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
161169
sources = Array(
162170
new SourceProgress(

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.commons.lang3.RandomStringUtils
2123
import org.scalactic.TolerantNumerics
2224
import org.scalatest.concurrent.Eventually._

0 commit comments

Comments
 (0)