18
18
package org .apache .spark .sql .streaming
19
19
20
20
import java .{util => ju }
21
+ import java .lang .{Long => JLong }
21
22
import java .util .UUID
22
23
23
24
import scala .collection .JavaConverters ._
@@ -29,7 +30,6 @@ import org.json4s.JsonDSL._
29
30
import org .json4s .jackson .JsonMethods ._
30
31
31
32
import org .apache .spark .annotation .Experimental
32
- import org .apache .spark .sql .catalyst .util .DateTimeUtils
33
33
34
34
/**
35
35
* :: Experimental ::
@@ -61,13 +61,20 @@ class StateOperatorProgress private[sql](
61
61
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
62
62
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
63
63
* @param name User-specified name of the query, null if not specified.
64
- * @param timestamp Timestamp (ms) of the beginning of the trigger .
64
+ * @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps .
65
65
* @param batchId A unique id for the current batch of data being processed. Note that in the
66
66
* case of retries after a failure a given batchId my be executed more than once.
67
67
* Similarly, when there is no data to be processed, the batchId will not be
68
68
* incremented.
69
69
* @param durationMs The amount of time taken to perform various operations in milliseconds.
70
- * @param currentWatermark The current event time watermark in milliseconds
70
+ * @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
71
+ * {
72
+ * "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger
73
+ * "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger
74
+ * "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger
75
+ * "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger
76
+ * }
77
+ * All timestamps are in ISO8601 format, i.e. UTC timestamps.
71
78
* @param stateOperators Information about operators in the query that store state.
72
79
* @param sources detailed statistics on data being read from each of the streaming sources.
73
80
* @since 2.1.0
@@ -79,8 +86,8 @@ class StreamingQueryProgress private[sql](
79
86
val name : String ,
80
87
val timestamp : String ,
81
88
val batchId : Long ,
82
- val durationMs : ju.Map [String , java.lang. Long ],
83
- val currentWatermark : Long ,
89
+ val durationMs : ju.Map [String , JLong ],
90
+ val eventTime : ju. Map [ String , String ] ,
84
91
val stateOperators : Array [StateOperatorProgress ],
85
92
val sources : Array [SourceProgress ],
86
93
val sink : SinkProgress ) {
@@ -107,18 +114,22 @@ class StreamingQueryProgress private[sql](
107
114
if (value.isNaN || value.isInfinity) JNothing else JDouble (value)
108
115
}
109
116
117
+ /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
118
+ def safeMapToJValue [T ](map : ju.Map [String , T ], valueToJValue : T => JValue ): JValue = {
119
+ if (map.isEmpty) return JNothing
120
+ val keys = map.asScala.keySet.toSeq.sorted
121
+ keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
122
+ }
123
+
110
124
(" id" -> JString (id.toString)) ~
111
125
(" runId" -> JString (runId.toString)) ~
112
126
(" name" -> JString (name)) ~
113
127
(" timestamp" -> JString (timestamp)) ~
114
128
(" numInputRows" -> JInt (numInputRows)) ~
115
129
(" inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
116
130
(" processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
117
- (" durationMs" -> durationMs
118
- .asScala
119
- .map { case (k, v) => k -> JInt (v.toLong): JObject }
120
- .reduce(_ ~ _)) ~
121
- (" currentWatermark" -> JInt (currentWatermark)) ~
131
+ (" durationMs" -> safeMapToJValue[JLong ](durationMs, v => JInt (v.toLong))) ~
132
+ (" eventTime" -> safeMapToJValue[String ](eventTime, s => JString (s))) ~
122
133
(" stateOperators" -> JArray (stateOperators.map(_.jsonValue).toList)) ~
123
134
(" sources" -> JArray (sources.map(_.jsonValue).toList)) ~
124
135
(" sink" -> sink.jsonValue)
0 commit comments