Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18834][SS] Expose event time stats through StreamingQueryProgress #16258

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.streaming

import scala.math.max

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
Expand All @@ -28,24 +26,48 @@ import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.AccumulatorV2

/** Tracks the maximum positive long seen. */
class MaxLong(protected var currentValue: Long = 0)
extends AccumulatorV2[Long, Long] {
/** Class for collecting event time stats with an accumulator */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please document the time unit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented it in EventTimeWatermarkExec

case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count: Long) {
def add(eventTime: Long): Unit = {
this.max = math.max(this.max, eventTime)
this.min = math.min(this.min, eventTime)
this.sum += eventTime
this.count += 1
}

def merge(that: EventTimeStats): Unit = {
this.max = math.max(this.max, that.max)
this.min = math.min(this.min, that.min)
this.sum += that.sum
this.count += that.count
}

def avg: Long = (sum.toDouble / count).toLong
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not use sum / count?

}

object EventTimeStats {
def zero: EventTimeStats = EventTimeStats(
max = Long.MinValue, min = Long.MaxValue, sum = 0L, count = 0L)
}

/** Accumulator that collects stats on event time in a batch. */
class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTimeStats.zero)
extends AccumulatorV2[Long, EventTimeStats] {

override def isZero: Boolean = value == 0
override def value: Long = currentValue
override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue)
override def isZero: Boolean = value == EventTimeStats.zero
override def value: EventTimeStats = currentStats
override def copy(): AccumulatorV2[Long, EventTimeStats] = new EventTimeStatsAccum(currentStats)

override def reset(): Unit = {
currentValue = 0
currentStats = EventTimeStats.zero
}

override def add(v: Long): Unit = {
currentValue = max(v, value)
currentStats.add(v)
}

override def merge(other: AccumulatorV2[Long, Long]): Unit = {
currentValue = max(value, other.value)
override def merge(other: AccumulatorV2[Long, EventTimeStats]): Unit = {
currentStats.merge(other.value)
}
}

Expand All @@ -61,15 +83,14 @@ case class EventTimeWatermarkExec(
delay: CalendarInterval,
child: SparkPlan) extends SparkPlan {

// TODO: Use Spark SQL Metrics?
val maxEventTime = new MaxLong
sparkContext.register(maxEventTime)
val eventTimeStats = new EventTimeStatsAccum()
sparkContext.register(eventTimeStats)

override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output)
iter.map { row =>
maxEventTime.add(getEventTime(row).getLong(0))
eventTimeStats.add((getEventTime(row).getLong(0).toDouble / 1000).toLong)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be getEventTime(row).getLong(0) / 1000

row
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import org.apache.spark.util.Clock
trait ProgressReporter extends Logging {

case class ExecutionStats(
inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress])
inputRows: Map[Source, Long],
stateOperators: Seq[StateOperatorProgress],
eventTimeStats: Map[String, String])

// Internal state of the stream, required for computing metrics.
protected def id: UUID
Expand Down Expand Up @@ -127,12 +129,7 @@ trait ProgressReporter extends Logging {
protected def finishTrigger(hasNewData: Boolean): Unit = {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

val executionStats: ExecutionStats = if (!hasNewData) {
ExecutionStats(Map.empty, Seq.empty)
} else {
extractExecutionStats
}

val executionStats = extractExecutionStats(hasNewData)
val processingTimeSec =
(currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000

Expand Down Expand Up @@ -160,10 +157,10 @@ trait ProgressReporter extends Logging {
id = id,
runId = runId,
name = name,
timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
timestamp = formatTimestamp(currentTriggerStartTimestamp),
batchId = currentBatchId,
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
currentWatermark = offsetSeqMetadata.batchWatermarkMs,
eventTime = executionStats.eventTimeStats.asJava,
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
sink = sinkProgress)
Expand All @@ -184,7 +181,13 @@ trait ProgressReporter extends Logging {
}

/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats: ExecutionStats = {
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))

if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
}

// We want to associate execution plan leaves to sources that generate them, so that we match
// the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
// Consider the translation from the streaming logical plan to the final executed plan.
Expand Down Expand Up @@ -241,7 +244,16 @@ trait ProgressReporter extends Logging {
numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
}

ExecutionStats(numInputRows, stateOperators)
val eventTimeStats = lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
val stats = e.eventTimeStats.value
Map(
"max" -> stats.max,
"min" -> stats.min,
"avg" -> stats.avg).mapValues(formatTimestamp)
}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp

ExecutionStats(numInputRows, stateOperators, eventTimeStats)
}

/** Records the duration of running `body` for the next query progress update. */
Expand All @@ -257,6 +269,10 @@ trait ProgressReporter extends Logging {
result
}

private def formatTimestamp(millis: Long): String = {
timestampFormat.format(new Date(millis))
}

/** Updates the message returned in `status`. */
protected def updateStatusMessage(message: String): Unit = {
currentStatus = currentStatus.copy(message = message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,24 @@ class StreamExecution(
if (hasNewData) {
// Current batch timestamp in milliseconds
offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
// Update the eventTime watermark if we find one in the plan.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this such that the watermark is updated before starting a batch, rather than after finishing a batch. This keeps batchWatermarkMs consistent with batchTimestampsMs, both are set before starting a batch, and reduces complexities in the ProgressReporter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this makes sense.

if (lastExecution != null) {
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
e.eventTimeStats.value.max - e.delay.milliseconds
}.headOption.foreach { newWatermarkMs =>
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
} else {
logDebug(
s"Event time didn't move: $newWatermarkMs < " +
s"${offsetSeqMetadata.batchWatermarkMs}")
}
}
}

updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
Expand Down Expand Up @@ -462,21 +480,6 @@ class StreamExecution(
sink.addBatch(currentBatchId, nextBatch)
}

// Update the eventTime watermark if we find one in the plan.
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec =>
logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
(e.maxEventTime.value / 1000) - e.delay.milliseconds()
}.headOption.foreach { newWatermark =>
if (newWatermark > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermark ms")
offsetSeqMetadata.batchWatermarkMs = newWatermark
} else {
logTrace(s"Event time didn't move: $newWatermark < " +
s"$offsetSeqMetadata.currentEventTimeWatermark")
}
}

awaitBatchLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import java.{util => ju}
import java.lang.{Long => JLong}
import java.util.UUID

import scala.collection.JavaConverters._
Expand All @@ -31,27 +32,6 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.util.DateTimeUtils

/**
* :: Experimental ::
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
*/
@Experimental
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long) {

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))

/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))

private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated))
}
}

/**
* :: Experimental ::
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this below to keep the StreamingQueryProgress class first in the file. More important code first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats actually the opposite of how most code in SQL is laid out, so I think it would be better to avoid this change. The logic here is declarations that are use later should come first (references before declaration make it harder to read), and stuff at the end of the file is kind of hidden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah, then for consistency, SourceProgress and SinkProgress should also be before StreamingQueryProgress. But thats a bigger change should be done in a different PR.

* Information about progress made in the execution of a [[StreamingQuery]] during
Expand All @@ -67,7 +47,7 @@ class StateOperatorProgress private[sql](
* Similarly, when there is no data to be processed, the batchId will not be
* incremented.
* @param durationMs The amount of time taken to perform various operations in milliseconds.
* @param currentWatermark The current event time watermark in milliseconds
* @param eventTime Statistics of event time seen in this batch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since you are touching this file, could you also fix the comment of @param timestamp? It's better to document the format as well, such as, The beginning time of the trigger in ISO8601 format. (e.g., 2016-12-05T20:54:20.827Z)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, could you also add an example in the comment of eventTime?

* @param stateOperators Information about operators in the query that store state.
* @param sources detailed statistics on data being read from each of the streaming sources.
* @since 2.1.0
Expand All @@ -79,8 +59,8 @@ class StreamingQueryProgress private[sql](
val name: String,
val timestamp: String,
val batchId: Long,
val durationMs: ju.Map[String, java.lang.Long],
val currentWatermark: Long,
val durationMs: ju.Map[String, JLong],
val eventTime: ju.Map[String, String],
val stateOperators: Array[StateOperatorProgress],
val sources: Array[SourceProgress],
val sink: SinkProgress) {
Expand All @@ -107,18 +87,23 @@ class StreamingQueryProgress private[sql](
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}

def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
if (map.isEmpty) return JNothing
val keys = map.asScala.keySet.toSeq.sorted
keys
.map { k => k -> valueToJValue(map.get(k)) }
.foldLeft("" -> JNothing: JObject)(_ ~ _)
}

("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("durationMs" -> durationMs
.asScala
.map { case (k, v) => k -> JInt(v.toLong): JObject }
.reduce(_ ~ _)) ~
("currentWatermark" -> JInt(currentWatermark)) ~
("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue)
Expand Down Expand Up @@ -200,3 +185,24 @@ class SinkProgress protected[sql](
("description" -> JString(description))
}
}

/**
* :: Experimental ::
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
*/
@Experimental
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long) {

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))

/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))

private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

test("QueryProgressEvent serialization") {
def testSerialization(event: QueryProgressEvent): Unit = {
import scala.collection.JavaConverters._
val json = JsonProtocol.sparkEventToJson(event)
val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality
assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala)
}
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "durationMs" : {
| "total" : 0
| },
| "currentWatermark" : 3,
| "eventTime" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome!

| "avg" : "2016-12-05T20:54:20.827Z",
| "max" : "2016-12-05T20:54:20.827Z",
| "min" : "2016-12-05T20:54:20.827Z",
| "watermark" : "2016-12-05T20:54:20.827Z"
| },
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
Expand Down Expand Up @@ -76,7 +81,6 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "durationMs" : {
| "total" : 0
| },
| "currentWatermark" : 3,
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
Expand Down Expand Up @@ -134,7 +138,11 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,
eventTime = Map(
"max" -> "2016-12-05T20:54:20.827Z",
"min" -> "2016-12-05T20:54:20.827Z",
"avg" -> "2016-12-05T20:54:20.827Z",
"watermark" -> "2016-12-05T20:54:20.827Z").asJava,
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
Expand All @@ -156,7 +164,7 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,
eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.streaming

import scala.collection.JavaConverters._

import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
Expand Down
Loading