From e9d34edb5386313c580e64fbd6c279d7f79dc381 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 12 Dec 2016 15:32:50 -0800 Subject: [PATCH 1/6] Added timestamps to progress --- .../streaming/EventTimeWatermarkExec.scala | 53 +++++++++----- .../streaming/ProgressReporter.scala | 40 ++++++++--- .../execution/streaming/StreamExecution.scala | 33 +++++---- .../apache/spark/sql/streaming/progress.scala | 70 ++++++++++--------- .../StreamingQueryListenerSuite.scala | 3 + ...StreamingQueryStatusAndProgressSuite.scala | 24 ++++--- .../sql/streaming/StreamingQuerySuite.scala | 6 +- .../spark/sql/streaming/WatermarkSuite.scala | 49 ++++++++++--- 8 files changed, 186 insertions(+), 92 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 4c8cb069d23a0..ccd4af7cd06f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import scala.math.max - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} @@ -28,24 +26,48 @@ import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.AccumulatorV2 -/** Tracks the maximum positive long seen. */ -class MaxLong(protected var currentValue: Long = 0) - extends AccumulatorV2[Long, Long] { +/** Class for collecting event time stats with an accumulator */ +case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count: Long) { + def add(eventTime: Long): Unit = { + this.max = math.max(this.max, eventTime) + this.min = math.min(this.min, eventTime) + this.sum += eventTime + this.count += 1 + } + + def merge(that: EventTimeStats): Unit = { + this.max = math.max(this.max, that.max) + this.min = math.min(this.min, that.min) + this.sum += that.sum + this.count += that.count + } + + def avg: Long = (sum.toDouble / count).toLong +} + +object EventTimeStats { + def zero: EventTimeStats = EventTimeStats( + max = Long.MinValue, min = Long.MaxValue, sum = 0L, count = 0L) +} + +/** Accumulator that collects stats on event time in a batch. */ +class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTimeStats.zero) + extends AccumulatorV2[Long, EventTimeStats] { - override def isZero: Boolean = value == 0 - override def value: Long = currentValue - override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue) + override def isZero: Boolean = value == EventTimeStats.zero + override def value: EventTimeStats = currentStats + override def copy(): AccumulatorV2[Long, EventTimeStats] = new EventTimeStatsAccum(currentStats) override def reset(): Unit = { - currentValue = 0 + currentStats = EventTimeStats.zero } override def add(v: Long): Unit = { - currentValue = max(v, value) + currentStats.add(v) } - override def merge(other: AccumulatorV2[Long, Long]): Unit = { - currentValue = max(value, other.value) + override def merge(other: AccumulatorV2[Long, EventTimeStats]): Unit = { + currentStats.merge(other.value) } } @@ -61,15 +83,14 @@ case class EventTimeWatermarkExec( delay: CalendarInterval, child: SparkPlan) extends SparkPlan { - // TODO: Use Spark SQL Metrics? - val maxEventTime = new MaxLong - sparkContext.register(maxEventTime) + val eventTimeStats = new EventTimeStatsAccum() + sparkContext.register(eventTimeStats) override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitions { iter => val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output) iter.map { row => - maxEventTime.add(getEventTime(row).getLong(0)) + eventTimeStats.add((getEventTime(row).getLong(0).toDouble / 1000).toLong) row } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 40e3151337af6..82da3e07ca91a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -41,7 +41,9 @@ import org.apache.spark.util.Clock trait ProgressReporter extends Logging { case class ExecutionStats( - inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress]) + inputRows: Map[Source, Long], + stateOperators: Seq[StateOperatorProgress], + eventTimeStats: Map[String, String]) // Internal state of the stream, required for computing metrics. protected def id: UUID @@ -127,12 +129,7 @@ trait ProgressReporter extends Logging { protected def finishTrigger(hasNewData: Boolean): Unit = { currentTriggerEndTimestamp = triggerClock.getTimeMillis() - val executionStats: ExecutionStats = if (!hasNewData) { - ExecutionStats(Map.empty, Seq.empty) - } else { - extractExecutionStats - } - + val executionStats = extractExecutionStats(hasNewData) val processingTimeSec = (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000 @@ -160,10 +157,10 @@ trait ProgressReporter extends Logging { id = id, runId = runId, name = name, - timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)), + triggerTimestamp = formatTimestamp(currentTriggerStartTimestamp), batchId = currentBatchId, durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava, - currentWatermark = offsetSeqMetadata.batchWatermarkMs, + queryTimestamps = executionStats.eventTimeStats.asJava, stateOperators = executionStats.stateOperators.toArray, sources = sourceProgress.toArray, sink = sinkProgress) @@ -184,7 +181,15 @@ trait ProgressReporter extends Logging { } /** Extracts statistics from the most recent query execution. */ - private def extractExecutionStats: ExecutionStats = { + private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { + val basicQueryTimestamps = Map( + "watermark" -> offsetSeqMetadata.batchWatermarkMs, + "processingTime" -> offsetSeqMetadata.batchTimestampMs).mapValues(formatTimestamp) + + if (!hasNewData) { + return ExecutionStats(Map.empty, Seq.empty, basicQueryTimestamps) + } + // We want to associate execution plan leaves to sources that generate them, so that we match // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. // Consider the translation from the streaming logical plan to the final executed plan. @@ -241,7 +246,16 @@ trait ProgressReporter extends Logging { numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)) } - ExecutionStats(numInputRows, stateOperators) + val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + val stats = e.eventTimeStats.value + Map( + "eventTime.max" -> stats.max, + "eventTime.min" -> stats.min, + "eventTime.avg" -> stats.avg).mapValues(formatTimestamp) + }.headOption.getOrElse(Map.empty) ++ basicQueryTimestamps + + ExecutionStats(numInputRows, stateOperators, eventTimeStats) } /** Records the duration of running `body` for the next query progress update. */ @@ -257,6 +271,10 @@ trait ProgressReporter extends Logging { result } + private def formatTimestamp(millis: Long): String = { + timestampFormat.format(new Date(millis)) + } + /** Updates the message returned in `status`. */ protected def updateStatusMessage(message: String): Unit = { currentStatus = currentStatus.copy(message = message) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 39be222d05d0f..ead7271566c35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -360,6 +360,24 @@ class StreamExecution( if (hasNewData) { // Current batch timestamp in milliseconds offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() + // Update the eventTime watermark if we find one in the plan. + if (lastExecution != null) { + lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") + e.eventTimeStats.value.max - e.delay.milliseconds + }.headOption.foreach { newWatermarkMs => + if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { + logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") + offsetSeqMetadata.batchWatermarkMs = newWatermarkMs + } else { + logDebug( + s"Event time didn't move: $newWatermarkMs < " + + s"${offsetSeqMetadata.batchWatermarkMs}") + } + } + } + updateStatusMessage("Writing offsets to log") reportTimeTaken("walCommit") { assert(offsetLog.add( @@ -462,21 +480,6 @@ class StreamExecution( sink.addBatch(currentBatchId, nextBatch) } - // Update the eventTime watermark if we find one in the plan. - lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec => - logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}") - (e.maxEventTime.value / 1000) - e.delay.milliseconds() - }.headOption.foreach { newWatermark => - if (newWatermark > offsetSeqMetadata.batchWatermarkMs) { - logInfo(s"Updating eventTime watermark to: $newWatermark ms") - offsetSeqMetadata.batchWatermarkMs = newWatermark - } else { - logTrace(s"Event time didn't move: $newWatermark < " + - s"$offsetSeqMetadata.currentEventTimeWatermark") - } - } - awaitBatchLock.lock() try { // Wake up any threads that are waiting for the stream to progress. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index d1568758b7a43..f52bb88d97960 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import java.{util => ju} +import java.lang.{Long => JLong} import java.util.UUID import scala.collection.JavaConverters._ @@ -31,27 +32,6 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.util.DateTimeUtils -/** - * :: Experimental :: - * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. - */ -@Experimental -class StateOperatorProgress private[sql]( - val numRowsTotal: Long, - val numRowsUpdated: Long) { - - /** The compact JSON representation of this progress. */ - def json: String = compact(render(jsonValue)) - - /** The pretty (i.e. indented) JSON representation of this progress. */ - def prettyJson: String = pretty(render(jsonValue)) - - private[sql] def jsonValue: JValue = { - ("numRowsTotal" -> JInt(numRowsTotal)) ~ - ("numRowsUpdated" -> JInt(numRowsUpdated)) - } -} - /** * :: Experimental :: * Information about progress made in the execution of a [[StreamingQuery]] during @@ -61,13 +41,13 @@ class StateOperatorProgress private[sql]( * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. - * @param timestamp Timestamp (ms) of the beginning of the trigger. + * @param triggerTimestamp Timestamp (ms) of the beginning of the trigger. * @param batchId A unique id for the current batch of data being processed. Note that in the * case of retries after a failure a given batchId my be executed more than once. * Similarly, when there is no data to be processed, the batchId will not be * incremented. * @param durationMs The amount of time taken to perform various operations in milliseconds. - * @param currentWatermark The current event time watermark in milliseconds + * @param queryTimestamps Statistics of event time seen in this batch * @param stateOperators Information about operators in the query that store state. * @param sources detailed statistics on data being read from each of the streaming sources. * @since 2.1.0 @@ -77,10 +57,10 @@ class StreamingQueryProgress private[sql]( val id: UUID, val runId: UUID, val name: String, - val timestamp: String, + val triggerTimestamp: String, val batchId: Long, - val durationMs: ju.Map[String, java.lang.Long], - val currentWatermark: Long, + val durationMs: ju.Map[String, JLong], + val queryTimestamps: ju.Map[String, String], val stateOperators: Array[StateOperatorProgress], val sources: Array[SourceProgress], val sink: SinkProgress) { @@ -107,18 +87,23 @@ class StreamingQueryProgress private[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } + def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { + if (map.isEmpty) return JNothing + val keys = map.asScala.keySet.toSeq.sorted + keys + .map { k => k -> valueToJValue(map.get(k)) } + .foldLeft("" -> JNothing: JObject)(_ ~ _) + } + ("id" -> JString(id.toString)) ~ ("runId" -> JString(runId.toString)) ~ ("name" -> JString(name)) ~ - ("timestamp" -> JString(timestamp)) ~ + ("triggerTimestamp" -> JString(triggerTimestamp)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ - ("durationMs" -> durationMs - .asScala - .map { case (k, v) => k -> JInt(v.toLong): JObject } - .reduce(_ ~ _)) ~ - ("currentWatermark" -> JInt(currentWatermark)) ~ + ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~ + ("queryTimestamps" -> safeMapToJValue[String](queryTimestamps, s => JString(s))) ~ ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~ ("sink" -> sink.jsonValue) @@ -200,3 +185,24 @@ class SinkProgress protected[sql]( ("description" -> JString(description)) } } + +/** + * :: Experimental :: + * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. + */ +@Experimental +class StateOperatorProgress private[sql]( + val numRowsTotal: Long, + val numRowsUpdated: Long) { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + + private[sql] def jsonValue: JValue = { + ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index f75f5b537e41b..b6a62b42085ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryProgressEvent serialization") { def testSerialization(event: QueryProgressEvent): Unit = { + import scala.collection.JavaConverters._ val json = JsonProtocol.sparkEventToJson(event) val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent] assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality + assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala) + assert(newEvent.progress.queryTimestamps.asScala === event.progress.queryTimestamps.asScala) } testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1)) testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 193c943f83be8..4ff5cdeb30d42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -38,13 +38,18 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress1.id.toString}", | "runId" : "${testProgress1.runId.toString}", | "name" : "myName", - | "timestamp" : "2016-12-05T20:54:20.827Z", + | "triggerTimestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, | "durationMs" : { | "total" : 0 | }, - | "currentWatermark" : 3, + | "queryTimestamps" : { + | "eventTime.avg" : "2016-12-05T20:54:20.827Z", + | "eventTime.max" : "2016-12-05T20:54:20.827Z", + | "eventTime.min" : "2016-12-05T20:54:20.827Z", + | "processingTime" : "2016-12-05T20:54:20.827Z" + | }, | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1 @@ -71,12 +76,11 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress2.id.toString}", | "runId" : "${testProgress2.runId.toString}", | "name" : null, - | "timestamp" : "2016-12-05T20:54:20.827Z", + | "triggerTimestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "durationMs" : { | "total" : 0 | }, - | "currentWatermark" : 3, | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1 @@ -131,10 +135,14 @@ object StreamingQueryStatusAndProgressSuite { id = UUID.randomUUID, runId = UUID.randomUUID, name = "myName", - timestamp = "2016-12-05T20:54:20.827Z", + triggerTimestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - currentWatermark = 3L, + queryTimestamps = Map( + "eventTime.max" -> "2016-12-05T20:54:20.827Z", + "eventTime.min" -> "2016-12-05T20:54:20.827Z", + "eventTime.avg" -> "2016-12-05T20:54:20.827Z", + "processingTime" -> "2016-12-05T20:54:20.827Z").asJava, stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), sources = Array( new SourceProgress( @@ -153,10 +161,10 @@ object StreamingQueryStatusAndProgressSuite { id = UUID.randomUUID, runId = UUID.randomUUID, name = null, // should not be present in the json - timestamp = "2016-12-05T20:54:20.827Z", + triggerTimestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - currentWatermark = 3L, + queryTimestamps = Map.empty[String, String].asJava, // empty maps should be handled correctly stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), sources = Array( new SourceProgress( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 7be2f216919b0..9edba539aebb6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import scala.collection.JavaConverters._ + import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.concurrent.Eventually._ @@ -243,7 +245,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { assert(progress.id === query.id) assert(progress.name === query.name) assert(progress.batchId === 0) - assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC + assert(progress.triggerTimestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC assert(progress.numInputRows === 2) assert(progress.processedRowsPerSecond === 2.0) @@ -253,6 +255,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { assert(progress.durationMs.get("walCommit") === 0) assert(progress.durationMs.get("triggerExecution") === 1000) + assert(progress.queryTimestamps.get("processingTime") === "1970-01-01T00:00:00.300Z") + assert(!progress.queryTimestamps.keySet.asScala.exists(_.toLowerCase.contains("eventtime"))) assert(progress.sources.length === 1) assert(progress.sources(0).description contains "MemoryStream") assert(progress.sources(0).startOffset === null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 12f3c3e5ff3d9..f8374d8b6bac4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.streaming +import java.{util => ju} +import java.text.SimpleDateFormat + import org.scalatest.BeforeAndAfter import org.apache.spark.internal.Logging @@ -50,8 +53,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { } - test("watermark metric") { - + test("event time and watermark metric") { val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() @@ -61,21 +63,43 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.queryTimestamps) + true + } + testStream(windowedAggregation)( AddData(inputData, 15), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 5000 + assertEventStats { e => + assert(e.get("eventTime.max") === formatTimestamp(15)) + assert(e.get("eventTime.min") === formatTimestamp(15)) + assert(e.get("eventTime.avg") === formatTimestamp(15)) + assert(e.get("watermark") === formatTimestamp(0)) }, - AddData(inputData, 15), + AddData(inputData, 10, 12, 14), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 5000 + assertEventStats { e => + assert(e.get("eventTime.max") === formatTimestamp(14)) + assert(e.get("eventTime.min") === formatTimestamp(10)) + assert(e.get("eventTime.avg") === formatTimestamp(12)) + assert(e.get("watermark") === formatTimestamp(5)) }, AddData(inputData, 25), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 15000 + assertEventStats { e => + assert(e.get("eventTime.max") === formatTimestamp(25)) + assert(e.get("eventTime.min") === formatTimestamp(25)) + assert(e.get("eventTime.avg") === formatTimestamp(25)) + assert(e.get("watermark") === formatTimestamp(5)) + }, + AddData(inputData, 25), + CheckAnswer((10, 3)), + assertEventStats { e => + assert(e.get("eventTime.max") === formatTimestamp(25)) + assert(e.get("eventTime.min") === formatTimestamp(25)) + assert(e.get("eventTime.avg") === formatTimestamp(25)) + assert(e.get("watermark") === formatTimestamp(15)) } ) } @@ -206,4 +230,11 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { CheckAnswer((10, 1)) ) } + + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) + + private def formatTimestamp(sec: Long): String = { + timestampFormat.format(new ju.Date(sec * 1000)) + } } From a14efdd71dea7da9c8a7e28a6e7f3463e2af1c1a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 12 Dec 2016 17:00:26 -0800 Subject: [PATCH 2/6] Refactored --- .../streaming/ProgressReporter.scala | 18 +++++------ .../apache/spark/sql/streaming/progress.scala | 12 ++++---- .../StreamingQueryListenerSuite.scala | 2 +- ...StreamingQueryStatusAndProgressSuite.scala | 30 +++++++++---------- .../sql/streaming/StreamingQuerySuite.scala | 4 +-- .../spark/sql/streaming/WatermarkSuite.scala | 26 ++++++++-------- 6 files changed, 44 insertions(+), 48 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 82da3e07ca91a..549b93694d949 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -157,10 +157,10 @@ trait ProgressReporter extends Logging { id = id, runId = runId, name = name, - triggerTimestamp = formatTimestamp(currentTriggerStartTimestamp), + timestamp = formatTimestamp(currentTriggerStartTimestamp), batchId = currentBatchId, durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava, - queryTimestamps = executionStats.eventTimeStats.asJava, + eventTime = executionStats.eventTimeStats.asJava, stateOperators = executionStats.stateOperators.toArray, sources = sourceProgress.toArray, sink = sinkProgress) @@ -182,12 +182,10 @@ trait ProgressReporter extends Logging { /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { - val basicQueryTimestamps = Map( - "watermark" -> offsetSeqMetadata.batchWatermarkMs, - "processingTime" -> offsetSeqMetadata.batchTimestampMs).mapValues(formatTimestamp) + val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) if (!hasNewData) { - return ExecutionStats(Map.empty, Seq.empty, basicQueryTimestamps) + return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) } // We want to associate execution plan leaves to sources that generate them, so that we match @@ -250,10 +248,10 @@ trait ProgressReporter extends Logging { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => val stats = e.eventTimeStats.value Map( - "eventTime.max" -> stats.max, - "eventTime.min" -> stats.min, - "eventTime.avg" -> stats.avg).mapValues(formatTimestamp) - }.headOption.getOrElse(Map.empty) ++ basicQueryTimestamps + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg).mapValues(formatTimestamp) + }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp ExecutionStats(numInputRows, stateOperators, eventTimeStats) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index f52bb88d97960..7e7ce3ebb1458 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -41,13 +41,13 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. - * @param triggerTimestamp Timestamp (ms) of the beginning of the trigger. + * @param timestamp Timestamp (ms) of the beginning of the trigger. * @param batchId A unique id for the current batch of data being processed. Note that in the * case of retries after a failure a given batchId my be executed more than once. * Similarly, when there is no data to be processed, the batchId will not be * incremented. * @param durationMs The amount of time taken to perform various operations in milliseconds. - * @param queryTimestamps Statistics of event time seen in this batch + * @param eventTime Statistics of event time seen in this batch * @param stateOperators Information about operators in the query that store state. * @param sources detailed statistics on data being read from each of the streaming sources. * @since 2.1.0 @@ -57,10 +57,10 @@ class StreamingQueryProgress private[sql]( val id: UUID, val runId: UUID, val name: String, - val triggerTimestamp: String, + val timestamp: String, val batchId: Long, val durationMs: ju.Map[String, JLong], - val queryTimestamps: ju.Map[String, String], + val eventTime: ju.Map[String, String], val stateOperators: Array[StateOperatorProgress], val sources: Array[SourceProgress], val sink: SinkProgress) { @@ -98,12 +98,12 @@ class StreamingQueryProgress private[sql]( ("id" -> JString(id.toString)) ~ ("runId" -> JString(runId.toString)) ~ ("name" -> JString(name)) ~ - ("triggerTimestamp" -> JString(triggerTimestamp)) ~ + ("timestamp" -> JString(timestamp)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~ - ("queryTimestamps" -> safeMapToJValue[String](queryTimestamps, s => JString(s))) ~ + ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~ ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~ ("sink" -> sink.jsonValue) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index b6a62b42085ff..7c6745ac8285a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -190,7 +190,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent] assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala) - assert(newEvent.progress.queryTimestamps.asScala === event.progress.queryTimestamps.asScala) + assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala) } testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1)) testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 4ff5cdeb30d42..c970743a31ad6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -38,17 +38,17 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress1.id.toString}", | "runId" : "${testProgress1.runId.toString}", | "name" : "myName", - | "triggerTimestamp" : "2016-12-05T20:54:20.827Z", + | "timestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, | "durationMs" : { | "total" : 0 | }, - | "queryTimestamps" : { - | "eventTime.avg" : "2016-12-05T20:54:20.827Z", - | "eventTime.max" : "2016-12-05T20:54:20.827Z", - | "eventTime.min" : "2016-12-05T20:54:20.827Z", - | "processingTime" : "2016-12-05T20:54:20.827Z" + | "eventTime" : { + | "avg" : "2016-12-05T20:54:20.827Z", + | "max" : "2016-12-05T20:54:20.827Z", + | "min" : "2016-12-05T20:54:20.827Z", + | "watermark" : "2016-12-05T20:54:20.827Z" | }, | "stateOperators" : [ { | "numRowsTotal" : 0, @@ -76,7 +76,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress2.id.toString}", | "runId" : "${testProgress2.runId.toString}", | "name" : null, - | "triggerTimestamp" : "2016-12-05T20:54:20.827Z", + | "timestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "durationMs" : { | "total" : 0 @@ -135,14 +135,14 @@ object StreamingQueryStatusAndProgressSuite { id = UUID.randomUUID, runId = UUID.randomUUID, name = "myName", - triggerTimestamp = "2016-12-05T20:54:20.827Z", + timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - queryTimestamps = Map( - "eventTime.max" -> "2016-12-05T20:54:20.827Z", - "eventTime.min" -> "2016-12-05T20:54:20.827Z", - "eventTime.avg" -> "2016-12-05T20:54:20.827Z", - "processingTime" -> "2016-12-05T20:54:20.827Z").asJava, + eventTime = Map( + "max" -> "2016-12-05T20:54:20.827Z", + "min" -> "2016-12-05T20:54:20.827Z", + "avg" -> "2016-12-05T20:54:20.827Z", + "watermark" -> "2016-12-05T20:54:20.827Z").asJava, stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), sources = Array( new SourceProgress( @@ -161,10 +161,10 @@ object StreamingQueryStatusAndProgressSuite { id = UUID.randomUUID, runId = UUID.randomUUID, name = null, // should not be present in the json - triggerTimestamp = "2016-12-05T20:54:20.827Z", + timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - queryTimestamps = Map.empty[String, String].asJava, // empty maps should be handled correctly + eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), sources = Array( new SourceProgress( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9edba539aebb6..0541e0bc93520 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -245,7 +245,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { assert(progress.id === query.id) assert(progress.name === query.name) assert(progress.batchId === 0) - assert(progress.triggerTimestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC + assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC assert(progress.numInputRows === 2) assert(progress.processedRowsPerSecond === 2.0) @@ -255,8 +255,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { assert(progress.durationMs.get("walCommit") === 0) assert(progress.durationMs.get("triggerExecution") === 1000) - assert(progress.queryTimestamps.get("processingTime") === "1970-01-01T00:00:00.300Z") - assert(!progress.queryTimestamps.keySet.asScala.exists(_.toLowerCase.contains("eventtime"))) assert(progress.sources.length === 1) assert(progress.sources(0).description contains "MemoryStream") assert(progress.sources(0).startOffset === null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index f8374d8b6bac4..84f2ffb60a505 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -64,7 +64,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => - body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.queryTimestamps) + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) true } @@ -72,33 +72,33 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { AddData(inputData, 15), CheckAnswer(), assertEventStats { e => - assert(e.get("eventTime.max") === formatTimestamp(15)) - assert(e.get("eventTime.min") === formatTimestamp(15)) - assert(e.get("eventTime.avg") === formatTimestamp(15)) + assert(e.get("max") === formatTimestamp(15)) + assert(e.get("min") === formatTimestamp(15)) + assert(e.get("avg") === formatTimestamp(15)) assert(e.get("watermark") === formatTimestamp(0)) }, AddData(inputData, 10, 12, 14), CheckAnswer(), assertEventStats { e => - assert(e.get("eventTime.max") === formatTimestamp(14)) - assert(e.get("eventTime.min") === formatTimestamp(10)) - assert(e.get("eventTime.avg") === formatTimestamp(12)) + assert(e.get("max") === formatTimestamp(14)) + assert(e.get("min") === formatTimestamp(10)) + assert(e.get("avg") === formatTimestamp(12)) assert(e.get("watermark") === formatTimestamp(5)) }, AddData(inputData, 25), CheckAnswer(), assertEventStats { e => - assert(e.get("eventTime.max") === formatTimestamp(25)) - assert(e.get("eventTime.min") === formatTimestamp(25)) - assert(e.get("eventTime.avg") === formatTimestamp(25)) + assert(e.get("max") === formatTimestamp(25)) + assert(e.get("min") === formatTimestamp(25)) + assert(e.get("avg") === formatTimestamp(25)) assert(e.get("watermark") === formatTimestamp(5)) }, AddData(inputData, 25), CheckAnswer((10, 3)), assertEventStats { e => - assert(e.get("eventTime.max") === formatTimestamp(25)) - assert(e.get("eventTime.min") === formatTimestamp(25)) - assert(e.get("eventTime.avg") === formatTimestamp(25)) + assert(e.get("max") === formatTimestamp(25)) + assert(e.get("min") === formatTimestamp(25)) + assert(e.get("avg") === formatTimestamp(25)) assert(e.get("watermark") === formatTimestamp(15)) } ) From 9a2e941a451f212927ee7edcead8c866a619e159 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 12 Dec 2016 17:08:06 -0800 Subject: [PATCH 3/6] Minor changes --- .../main/scala/org/apache/spark/sql/streaming/progress.scala | 5 ++--- .../org/apache/spark/sql/streaming/WatermarkSuite.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 7e7ce3ebb1458..0f263fd1e6433 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -87,12 +87,11 @@ class StreamingQueryProgress private[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } + /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */ def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { if (map.isEmpty) return JNothing val keys = map.asScala.keySet.toSeq.sorted - keys - .map { k => k -> valueToJValue(map.get(k)) } - .foldLeft("" -> JNothing: JObject)(_ ~ _) + keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _) } ("id" -> JString(id.toString)) ~ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 84f2ffb60a505..f1cc19c6e235d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -53,7 +53,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { } - test("event time and watermark metric") { + test("event time and watermark metrics") { val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() From 8de17d26c947b42382ede2e6d83e7bb24fffc05a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 12 Dec 2016 17:14:03 -0800 Subject: [PATCH 4/6] Minor change --- .../apache/spark/sql/streaming/progress.scala | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 0f263fd1e6433..6bc408a6cb2f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -30,8 +30,27 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.util.DateTimeUtils +/** + * :: Experimental :: + * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. + */ +@Experimental +class StateOperatorProgress private[sql]( + val numRowsTotal: Long, + val numRowsUpdated: Long) { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + + private[sql] def jsonValue: JValue = { + ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) + } +} /** * :: Experimental :: * Information about progress made in the execution of a [[StreamingQuery]] during @@ -184,24 +203,3 @@ class SinkProgress protected[sql]( ("description" -> JString(description)) } } - -/** - * :: Experimental :: - * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. - */ -@Experimental -class StateOperatorProgress private[sql]( - val numRowsTotal: Long, - val numRowsUpdated: Long) { - - /** The compact JSON representation of this progress. */ - def json: String = compact(render(jsonValue)) - - /** The pretty (i.e. indented) JSON representation of this progress. */ - def prettyJson: String = pretty(render(jsonValue)) - - private[sql] def jsonValue: JValue = { - ("numRowsTotal" -> JInt(numRowsTotal)) ~ - ("numRowsUpdated" -> JInt(numRowsUpdated)) - } -} From 89389920c54b8927f397c4643245b75e0ef36740 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 12 Dec 2016 17:14:38 -0800 Subject: [PATCH 5/6] Add line --- .../src/main/scala/org/apache/spark/sql/streaming/progress.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 6bc408a6cb2f1..540166be27a11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -51,6 +51,7 @@ class StateOperatorProgress private[sql]( ("numRowsUpdated" -> JInt(numRowsUpdated)) } } + /** * :: Experimental :: * Information about progress made in the execution of a [[StreamingQuery]] during From b59ab8083de3f2441133fed35658dea39cd4a759 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 12 Dec 2016 20:08:44 -0800 Subject: [PATCH 6/6] Addressed comments --- .../execution/streaming/EventTimeWatermarkExec.scala | 6 +++--- .../org/apache/spark/sql/streaming/progress.scala | 11 +++++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index ccd4af7cd06f9..e8570d040dbe4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -42,7 +42,7 @@ case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count this.count += that.count } - def avg: Long = (sum.toDouble / count).toLong + def avg: Long = sum / count } object EventTimeStats { @@ -76,7 +76,7 @@ class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTime * adding appropriate metadata to this column, this operator also tracks the maximum observed event * time. Based on the maximum observed time and a user specified delay, we can calculate the * `watermark` after which we assume we will no longer see late records for a particular time - * period. + * period. Note that event time is measured in milliseconds. */ case class EventTimeWatermarkExec( eventTime: Attribute, @@ -90,7 +90,7 @@ case class EventTimeWatermarkExec( child.execute().mapPartitions { iter => val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output) iter.map { row => - eventTimeStats.add((getEventTime(row).getLong(0).toDouble / 1000).toLong) + eventTimeStats.add(getEventTime(row).getLong(0) / 1000) row } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 540166be27a11..e219cfde12656 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -61,13 +61,20 @@ class StateOperatorProgress private[sql]( * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. - * @param timestamp Timestamp (ms) of the beginning of the trigger. + * @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps. * @param batchId A unique id for the current batch of data being processed. Note that in the * case of retries after a failure a given batchId my be executed more than once. * Similarly, when there is no data to be processed, the batchId will not be * incremented. * @param durationMs The amount of time taken to perform various operations in milliseconds. - * @param eventTime Statistics of event time seen in this batch + * @param eventTime Statistics of event time seen in this batch. It may contain the following keys: + * { + * "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger + * "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger + * "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger + * "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger + * } + * All timestamps are in ISO8601 format, i.e. UTC timestamps. * @param stateOperators Information about operators in the query that store state. * @param sources detailed statistics on data being read from each of the streaming sources. * @since 2.1.0