Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18834][SS] Expose event time stats through StreamingQueryProgress #16258

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.streaming

import scala.math.max

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
Expand All @@ -28,24 +26,48 @@ import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.AccumulatorV2

/** Tracks the maximum positive long seen. */
class MaxLong(protected var currentValue: Long = 0)
extends AccumulatorV2[Long, Long] {
/** Class for collecting event time stats with an accumulator */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please document the time unit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented it in EventTimeWatermarkExec

case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count: Long) {
def add(eventTime: Long): Unit = {
this.max = math.max(this.max, eventTime)
this.min = math.min(this.min, eventTime)
this.sum += eventTime
this.count += 1
}

def merge(that: EventTimeStats): Unit = {
this.max = math.max(this.max, that.max)
this.min = math.min(this.min, that.min)
this.sum += that.sum
this.count += that.count
}

def avg: Long = sum / count
}

object EventTimeStats {
def zero: EventTimeStats = EventTimeStats(
max = Long.MinValue, min = Long.MaxValue, sum = 0L, count = 0L)
}

/** Accumulator that collects stats on event time in a batch. */
class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTimeStats.zero)
extends AccumulatorV2[Long, EventTimeStats] {

override def isZero: Boolean = value == 0
override def value: Long = currentValue
override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue)
override def isZero: Boolean = value == EventTimeStats.zero
override def value: EventTimeStats = currentStats
override def copy(): AccumulatorV2[Long, EventTimeStats] = new EventTimeStatsAccum(currentStats)

override def reset(): Unit = {
currentValue = 0
currentStats = EventTimeStats.zero
}

override def add(v: Long): Unit = {
currentValue = max(v, value)
currentStats.add(v)
}

override def merge(other: AccumulatorV2[Long, Long]): Unit = {
currentValue = max(value, other.value)
override def merge(other: AccumulatorV2[Long, EventTimeStats]): Unit = {
currentStats.merge(other.value)
}
}

Expand All @@ -54,22 +76,21 @@ class MaxLong(protected var currentValue: Long = 0)
* adding appropriate metadata to this column, this operator also tracks the maximum observed event
* time. Based on the maximum observed time and a user specified delay, we can calculate the
* `watermark` after which we assume we will no longer see late records for a particular time
* period.
* period. Note that event time is measured in milliseconds.
*/
case class EventTimeWatermarkExec(
eventTime: Attribute,
delay: CalendarInterval,
child: SparkPlan) extends SparkPlan {

// TODO: Use Spark SQL Metrics?
val maxEventTime = new MaxLong
sparkContext.register(maxEventTime)
val eventTimeStats = new EventTimeStatsAccum()
sparkContext.register(eventTimeStats)

override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output)
iter.map { row =>
maxEventTime.add(getEventTime(row).getLong(0))
eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
row
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import org.apache.spark.util.Clock
trait ProgressReporter extends Logging {

case class ExecutionStats(
inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress])
inputRows: Map[Source, Long],
stateOperators: Seq[StateOperatorProgress],
eventTimeStats: Map[String, String])

// Internal state of the stream, required for computing metrics.
protected def id: UUID
Expand Down Expand Up @@ -127,12 +129,7 @@ trait ProgressReporter extends Logging {
protected def finishTrigger(hasNewData: Boolean): Unit = {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

val executionStats: ExecutionStats = if (!hasNewData) {
ExecutionStats(Map.empty, Seq.empty)
} else {
extractExecutionStats
}

val executionStats = extractExecutionStats(hasNewData)
val processingTimeSec =
(currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000

Expand Down Expand Up @@ -160,10 +157,10 @@ trait ProgressReporter extends Logging {
id = id,
runId = runId,
name = name,
timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
timestamp = formatTimestamp(currentTriggerStartTimestamp),
batchId = currentBatchId,
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
currentWatermark = offsetSeqMetadata.batchWatermarkMs,
eventTime = executionStats.eventTimeStats.asJava,
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
sink = sinkProgress)
Expand All @@ -184,7 +181,13 @@ trait ProgressReporter extends Logging {
}

/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats: ExecutionStats = {
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))

if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
}

// We want to associate execution plan leaves to sources that generate them, so that we match
// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
// Consider the translation from the streaming logical plan to the final executed plan.
Expand Down Expand Up @@ -241,7 +244,16 @@ trait ProgressReporter extends Logging {
numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
}

ExecutionStats(numInputRows, stateOperators)
val eventTimeStats = lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
val stats = e.eventTimeStats.value
Map(
"max" -> stats.max,
"min" -> stats.min,
"avg" -> stats.avg).mapValues(formatTimestamp)
}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp

ExecutionStats(numInputRows, stateOperators, eventTimeStats)
}

/** Records the duration of running `body` for the next query progress update. */
Expand All @@ -257,6 +269,10 @@ trait ProgressReporter extends Logging {
result
}

private def formatTimestamp(millis: Long): String = {
timestampFormat.format(new Date(millis))
}

/** Updates the message returned in `status`. */
protected def updateStatusMessage(message: String): Unit = {
currentStatus = currentStatus.copy(message = message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,24 @@ class StreamExecution(
if (hasNewData) {
// Current batch timestamp in milliseconds
offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
// Update the eventTime watermark if we find one in the plan.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this such that the watermark is updated before starting a batch, rather than after finishing a batch. This keeps batchWatermarkMs consistent with batchTimestampsMs, both are set before starting a batch, and reduces complexities in the ProgressReporter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this makes sense.

if (lastExecution != null) {
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
e.eventTimeStats.value.max - e.delay.milliseconds
}.headOption.foreach { newWatermarkMs =>
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
} else {
logDebug(
s"Event time didn't move: $newWatermarkMs < " +
s"${offsetSeqMetadata.batchWatermarkMs}")
}
}
}

updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
Expand Down Expand Up @@ -462,21 +480,6 @@ class StreamExecution(
sink.addBatch(currentBatchId, nextBatch)
}

// Update the eventTime watermark if we find one in the plan.
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec =>
logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
(e.maxEventTime.value / 1000) - e.delay.milliseconds()
}.headOption.foreach { newWatermark =>
if (newWatermark > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermark ms")
offsetSeqMetadata.batchWatermarkMs = newWatermark
} else {
logTrace(s"Event time didn't move: $newWatermark < " +
s"$offsetSeqMetadata.currentEventTimeWatermark")
}
}

awaitBatchLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import java.{util => ju}
import java.lang.{Long => JLong}
import java.util.UUID

import scala.collection.JavaConverters._
Expand All @@ -29,7 +30,6 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.util.DateTimeUtils

/**
* :: Experimental ::
Expand Down Expand Up @@ -61,13 +61,20 @@ class StateOperatorProgress private[sql](
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
* @param timestamp Timestamp (ms) of the beginning of the trigger.
* @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
* @param batchId A unique id for the current batch of data being processed. Note that in the
* case of retries after a failure a given batchId my be executed more than once.
* Similarly, when there is no data to be processed, the batchId will not be
* incremented.
* @param durationMs The amount of time taken to perform various operations in milliseconds.
* @param currentWatermark The current event time watermark in milliseconds
* @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
* {
* "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all, I am just leaving a comment as a gentle reminder to note that we probably should replace < or > to other ones such as {@literal <} or {@literal >} in the future. Please refer #16013 (comment). This causes javadoc8 break.

[error] .../java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:19: error: bad use of '>'
[error]  *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
[error]                             ^
[error] .../java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:20: error: bad use of '>'
[error]  *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
[error]                             ^
[error] .../java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:21: error: bad use of '>'
[error]  *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
[error]                             ^
[error] .../java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:22: error: bad use of '>'
[error]  *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
[error]                                   ^

* "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger
* "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger
* "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger
* }
* All timestamps are in ISO8601 format, i.e. UTC timestamps.
* @param stateOperators Information about operators in the query that store state.
* @param sources detailed statistics on data being read from each of the streaming sources.
* @since 2.1.0
Expand All @@ -79,8 +86,8 @@ class StreamingQueryProgress private[sql](
val name: String,
val timestamp: String,
val batchId: Long,
val durationMs: ju.Map[String, java.lang.Long],
val currentWatermark: Long,
val durationMs: ju.Map[String, JLong],
val eventTime: ju.Map[String, String],
val stateOperators: Array[StateOperatorProgress],
val sources: Array[SourceProgress],
val sink: SinkProgress) {
Expand All @@ -107,18 +114,22 @@ class StreamingQueryProgress private[sql](
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}

/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
if (map.isEmpty) return JNothing
val keys = map.asScala.keySet.toSeq.sorted
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
}

("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("durationMs" -> durationMs
.asScala
.map { case (k, v) => k -> JInt(v.toLong): JObject }
.reduce(_ ~ _)) ~
("currentWatermark" -> JInt(currentWatermark)) ~
("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

test("QueryProgressEvent serialization") {
def testSerialization(event: QueryProgressEvent): Unit = {
import scala.collection.JavaConverters._
val json = JsonProtocol.sparkEventToJson(event)
val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality
assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala)
}
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "durationMs" : {
| "total" : 0
| },
| "currentWatermark" : 3,
| "eventTime" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome!

| "avg" : "2016-12-05T20:54:20.827Z",
| "max" : "2016-12-05T20:54:20.827Z",
| "min" : "2016-12-05T20:54:20.827Z",
| "watermark" : "2016-12-05T20:54:20.827Z"
| },
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
Expand Down Expand Up @@ -76,7 +81,6 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "durationMs" : {
| "total" : 0
| },
| "currentWatermark" : 3,
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
Expand Down Expand Up @@ -134,7 +138,11 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,
eventTime = Map(
"max" -> "2016-12-05T20:54:20.827Z",
"min" -> "2016-12-05T20:54:20.827Z",
"avg" -> "2016-12-05T20:54:20.827Z",
"watermark" -> "2016-12-05T20:54:20.827Z").asJava,
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
Expand All @@ -156,7 +164,7 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,
eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.streaming

import scala.collection.JavaConverters._

import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
Expand Down
Loading