Skip to content

Commit a14efdd

Browse files
committed
Refactored
1 parent e9d34ed commit a14efdd

File tree

6 files changed

+44
-48
lines changed

6 files changed

+44
-48
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

+8-10
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,10 @@ trait ProgressReporter extends Logging {
157157
id = id,
158158
runId = runId,
159159
name = name,
160-
triggerTimestamp = formatTimestamp(currentTriggerStartTimestamp),
160+
timestamp = formatTimestamp(currentTriggerStartTimestamp),
161161
batchId = currentBatchId,
162162
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
163-
queryTimestamps = executionStats.eventTimeStats.asJava,
163+
eventTime = executionStats.eventTimeStats.asJava,
164164
stateOperators = executionStats.stateOperators.toArray,
165165
sources = sourceProgress.toArray,
166166
sink = sinkProgress)
@@ -182,12 +182,10 @@ trait ProgressReporter extends Logging {
182182

183183
/** Extracts statistics from the most recent query execution. */
184184
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
185-
val basicQueryTimestamps = Map(
186-
"watermark" -> offsetSeqMetadata.batchWatermarkMs,
187-
"processingTime" -> offsetSeqMetadata.batchTimestampMs).mapValues(formatTimestamp)
185+
val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
188186

189187
if (!hasNewData) {
190-
return ExecutionStats(Map.empty, Seq.empty, basicQueryTimestamps)
188+
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
191189
}
192190

193191
// We want to associate execution plan leaves to sources that generate them, so that we match
@@ -250,10 +248,10 @@ trait ProgressReporter extends Logging {
250248
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
251249
val stats = e.eventTimeStats.value
252250
Map(
253-
"eventTime.max" -> stats.max,
254-
"eventTime.min" -> stats.min,
255-
"eventTime.avg" -> stats.avg).mapValues(formatTimestamp)
256-
}.headOption.getOrElse(Map.empty) ++ basicQueryTimestamps
251+
"max" -> stats.max,
252+
"min" -> stats.min,
253+
"avg" -> stats.avg).mapValues(formatTimestamp)
254+
}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
257255

258256
ExecutionStats(numInputRows, stateOperators, eventTimeStats)
259257
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
4141
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
4242
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
4343
* @param name User-specified name of the query, null if not specified.
44-
* @param triggerTimestamp Timestamp (ms) of the beginning of the trigger.
44+
* @param timestamp Timestamp (ms) of the beginning of the trigger.
4545
* @param batchId A unique id for the current batch of data being processed. Note that in the
4646
* case of retries after a failure a given batchId my be executed more than once.
4747
* Similarly, when there is no data to be processed, the batchId will not be
4848
* incremented.
4949
* @param durationMs The amount of time taken to perform various operations in milliseconds.
50-
* @param queryTimestamps Statistics of event time seen in this batch
50+
* @param eventTime Statistics of event time seen in this batch
5151
* @param stateOperators Information about operators in the query that store state.
5252
* @param sources detailed statistics on data being read from each of the streaming sources.
5353
* @since 2.1.0
@@ -57,10 +57,10 @@ class StreamingQueryProgress private[sql](
5757
val id: UUID,
5858
val runId: UUID,
5959
val name: String,
60-
val triggerTimestamp: String,
60+
val timestamp: String,
6161
val batchId: Long,
6262
val durationMs: ju.Map[String, JLong],
63-
val queryTimestamps: ju.Map[String, String],
63+
val eventTime: ju.Map[String, String],
6464
val stateOperators: Array[StateOperatorProgress],
6565
val sources: Array[SourceProgress],
6666
val sink: SinkProgress) {
@@ -98,12 +98,12 @@ class StreamingQueryProgress private[sql](
9898
("id" -> JString(id.toString)) ~
9999
("runId" -> JString(runId.toString)) ~
100100
("name" -> JString(name)) ~
101-
("triggerTimestamp" -> JString(triggerTimestamp)) ~
101+
("timestamp" -> JString(timestamp)) ~
102102
("numInputRows" -> JInt(numInputRows)) ~
103103
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
104104
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
105105
("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
106-
("queryTimestamps" -> safeMapToJValue[String](queryTimestamps, s => JString(s))) ~
106+
("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
107107
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
108108
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
109109
("sink" -> sink.jsonValue)

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
190190
val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
191191
assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality
192192
assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
193-
assert(newEvent.progress.queryTimestamps.asScala === event.progress.queryTimestamps.asScala)
193+
assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala)
194194
}
195195
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
196196
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala

+15-15
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,17 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
3838
| "id" : "${testProgress1.id.toString}",
3939
| "runId" : "${testProgress1.runId.toString}",
4040
| "name" : "myName",
41-
| "triggerTimestamp" : "2016-12-05T20:54:20.827Z",
41+
| "timestamp" : "2016-12-05T20:54:20.827Z",
4242
| "numInputRows" : 678,
4343
| "inputRowsPerSecond" : 10.0,
4444
| "durationMs" : {
4545
| "total" : 0
4646
| },
47-
| "queryTimestamps" : {
48-
| "eventTime.avg" : "2016-12-05T20:54:20.827Z",
49-
| "eventTime.max" : "2016-12-05T20:54:20.827Z",
50-
| "eventTime.min" : "2016-12-05T20:54:20.827Z",
51-
| "processingTime" : "2016-12-05T20:54:20.827Z"
47+
| "eventTime" : {
48+
| "avg" : "2016-12-05T20:54:20.827Z",
49+
| "max" : "2016-12-05T20:54:20.827Z",
50+
| "min" : "2016-12-05T20:54:20.827Z",
51+
| "watermark" : "2016-12-05T20:54:20.827Z"
5252
| },
5353
| "stateOperators" : [ {
5454
| "numRowsTotal" : 0,
@@ -76,7 +76,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
7676
| "id" : "${testProgress2.id.toString}",
7777
| "runId" : "${testProgress2.runId.toString}",
7878
| "name" : null,
79-
| "triggerTimestamp" : "2016-12-05T20:54:20.827Z",
79+
| "timestamp" : "2016-12-05T20:54:20.827Z",
8080
| "numInputRows" : 678,
8181
| "durationMs" : {
8282
| "total" : 0
@@ -135,14 +135,14 @@ object StreamingQueryStatusAndProgressSuite {
135135
id = UUID.randomUUID,
136136
runId = UUID.randomUUID,
137137
name = "myName",
138-
triggerTimestamp = "2016-12-05T20:54:20.827Z",
138+
timestamp = "2016-12-05T20:54:20.827Z",
139139
batchId = 2L,
140140
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
141-
queryTimestamps = Map(
142-
"eventTime.max" -> "2016-12-05T20:54:20.827Z",
143-
"eventTime.min" -> "2016-12-05T20:54:20.827Z",
144-
"eventTime.avg" -> "2016-12-05T20:54:20.827Z",
145-
"processingTime" -> "2016-12-05T20:54:20.827Z").asJava,
141+
eventTime = Map(
142+
"max" -> "2016-12-05T20:54:20.827Z",
143+
"min" -> "2016-12-05T20:54:20.827Z",
144+
"avg" -> "2016-12-05T20:54:20.827Z",
145+
"watermark" -> "2016-12-05T20:54:20.827Z").asJava,
146146
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
147147
sources = Array(
148148
new SourceProgress(
@@ -161,10 +161,10 @@ object StreamingQueryStatusAndProgressSuite {
161161
id = UUID.randomUUID,
162162
runId = UUID.randomUUID,
163163
name = null, // should not be present in the json
164-
triggerTimestamp = "2016-12-05T20:54:20.827Z",
164+
timestamp = "2016-12-05T20:54:20.827Z",
165165
batchId = 2L,
166166
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
167-
queryTimestamps = Map.empty[String, String].asJava, // empty maps should be handled correctly
167+
eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly
168168
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
169169
sources = Array(
170170
new SourceProgress(

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

+1-3
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
245245
assert(progress.id === query.id)
246246
assert(progress.name === query.name)
247247
assert(progress.batchId === 0)
248-
assert(progress.triggerTimestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC
248+
assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC
249249
assert(progress.numInputRows === 2)
250250
assert(progress.processedRowsPerSecond === 2.0)
251251

@@ -255,8 +255,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
255255
assert(progress.durationMs.get("walCommit") === 0)
256256
assert(progress.durationMs.get("triggerExecution") === 1000)
257257

258-
assert(progress.queryTimestamps.get("processingTime") === "1970-01-01T00:00:00.300Z")
259-
assert(!progress.queryTimestamps.keySet.asScala.exists(_.toLowerCase.contains("eventtime")))
260258
assert(progress.sources.length === 1)
261259
assert(progress.sources(0).description contains "MemoryStream")
262260
assert(progress.sources(0).startOffset === null)

sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala

+13-13
Original file line numberDiff line numberDiff line change
@@ -64,41 +64,41 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
6464
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
6565

6666
def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
67-
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.queryTimestamps)
67+
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
6868
true
6969
}
7070

7171
testStream(windowedAggregation)(
7272
AddData(inputData, 15),
7373
CheckAnswer(),
7474
assertEventStats { e =>
75-
assert(e.get("eventTime.max") === formatTimestamp(15))
76-
assert(e.get("eventTime.min") === formatTimestamp(15))
77-
assert(e.get("eventTime.avg") === formatTimestamp(15))
75+
assert(e.get("max") === formatTimestamp(15))
76+
assert(e.get("min") === formatTimestamp(15))
77+
assert(e.get("avg") === formatTimestamp(15))
7878
assert(e.get("watermark") === formatTimestamp(0))
7979
},
8080
AddData(inputData, 10, 12, 14),
8181
CheckAnswer(),
8282
assertEventStats { e =>
83-
assert(e.get("eventTime.max") === formatTimestamp(14))
84-
assert(e.get("eventTime.min") === formatTimestamp(10))
85-
assert(e.get("eventTime.avg") === formatTimestamp(12))
83+
assert(e.get("max") === formatTimestamp(14))
84+
assert(e.get("min") === formatTimestamp(10))
85+
assert(e.get("avg") === formatTimestamp(12))
8686
assert(e.get("watermark") === formatTimestamp(5))
8787
},
8888
AddData(inputData, 25),
8989
CheckAnswer(),
9090
assertEventStats { e =>
91-
assert(e.get("eventTime.max") === formatTimestamp(25))
92-
assert(e.get("eventTime.min") === formatTimestamp(25))
93-
assert(e.get("eventTime.avg") === formatTimestamp(25))
91+
assert(e.get("max") === formatTimestamp(25))
92+
assert(e.get("min") === formatTimestamp(25))
93+
assert(e.get("avg") === formatTimestamp(25))
9494
assert(e.get("watermark") === formatTimestamp(5))
9595
},
9696
AddData(inputData, 25),
9797
CheckAnswer((10, 3)),
9898
assertEventStats { e =>
99-
assert(e.get("eventTime.max") === formatTimestamp(25))
100-
assert(e.get("eventTime.min") === formatTimestamp(25))
101-
assert(e.get("eventTime.avg") === formatTimestamp(25))
99+
assert(e.get("max") === formatTimestamp(25))
100+
assert(e.get("min") === formatTimestamp(25))
101+
assert(e.get("avg") === formatTimestamp(25))
102102
assert(e.get("watermark") === formatTimestamp(15))
103103
}
104104
)

0 commit comments

Comments
 (0)