Skip to content

Commit e2bf782

Browse files
tdasuzadude
authored andcommitted
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on apache#16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#16304 from tdas/SPARK-18834-1.
1 parent 1d7753b commit e2bf782

File tree

4 files changed

+73
-18
lines changed

4 files changed

+73
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala

+6-1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
8484
child: SparkPlan) extends SparkPlan {
8585

8686
val eventTimeStats = new EventTimeStatsAccum()
87+
val delayMs = {
88+
val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
89+
delay.milliseconds + delay.months * millisPerMonth
90+
}
91+
8792
sparkContext.register(eventTimeStats)
8893

8994
override protected def doExecute(): RDD[InternalRow] = {
@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
101106
if (a semanticEquals eventTime) {
102107
val updatedMetadata = new MetadataBuilder()
103108
.withMetadata(a.metadata)
104-
.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
109+
.putLong(EventTimeWatermark.delayKey, delayMs)
105110
.build()
106111

107112
a.withMetadata(updatedMetadata)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
2525

2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{DataFrame, SparkSession}
28-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
28+
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
2929
import org.apache.spark.sql.execution.QueryExecution
3030
import org.apache.spark.sql.streaming._
3131
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
182182

183183
/** Extracts statistics from the most recent query execution. */
184184
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
185-
val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
185+
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
186+
val watermarkTimestamp =
187+
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
188+
else Map.empty[String, String]
186189

187190
if (!hasNewData) {
188191
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ class StreamExecution(
387387
lastExecution.executedPlan.collect {
388388
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
389389
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
390-
e.eventTimeStats.value.max - e.delay.milliseconds
390+
e.eventTimeStats.value.max - e.delayMs
391391
}.headOption.foreach { newWatermarkMs =>
392392
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
393393
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")

sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala

+61-14
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@ package org.apache.spark.sql.streaming
1919

2020
import java.{util => ju}
2121
import java.text.SimpleDateFormat
22+
import java.util.{Calendar, Date}
2223

2324
import org.scalatest.BeforeAndAfter
2425

2526
import org.apache.spark.internal.Logging
2627
import org.apache.spark.sql.{AnalysisException, Row}
2728
import org.apache.spark.sql.execution.streaming._
2829
import org.apache.spark.sql.functions.{count, window}
30+
import org.apache.spark.sql.InternalOutputModes.Complete
2931

30-
class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
32+
class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
3133

3234
import testImplicits._
3335

@@ -52,48 +54,59 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
5254
assert(e.getMessage contains "int")
5355
}
5456

55-
5657
test("event time and watermark metrics") {
57-
val inputData = MemoryStream[Int]
58+
// No event time metrics when there is no watermarking
59+
val inputData1 = MemoryStream[Int]
60+
val aggWithoutWatermark = inputData1.toDF()
61+
.withColumn("eventTime", $"value".cast("timestamp"))
62+
.groupBy(window($"eventTime", "5 seconds") as 'window)
63+
.agg(count("*") as 'count)
64+
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
5865

59-
val windowedAggregation = inputData.toDF()
66+
testStream(aggWithoutWatermark, outputMode = Complete)(
67+
AddData(inputData1, 15),
68+
CheckAnswer((15, 1)),
69+
assertEventStats { e => assert(e.isEmpty) },
70+
AddData(inputData1, 10, 12, 14),
71+
CheckAnswer((10, 3), (15, 1)),
72+
assertEventStats { e => assert(e.isEmpty) }
73+
)
74+
75+
// All event time metrics where watermarking is set
76+
val inputData2 = MemoryStream[Int]
77+
val aggWithWatermark = inputData2.toDF()
6078
.withColumn("eventTime", $"value".cast("timestamp"))
6179
.withWatermark("eventTime", "10 seconds")
6280
.groupBy(window($"eventTime", "5 seconds") as 'window)
6381
.agg(count("*") as 'count)
6482
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
6583

66-
def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
67-
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
68-
true
69-
}
70-
71-
testStream(windowedAggregation)(
72-
AddData(inputData, 15),
84+
testStream(aggWithWatermark)(
85+
AddData(inputData2, 15),
7386
CheckAnswer(),
7487
assertEventStats { e =>
7588
assert(e.get("max") === formatTimestamp(15))
7689
assert(e.get("min") === formatTimestamp(15))
7790
assert(e.get("avg") === formatTimestamp(15))
7891
assert(e.get("watermark") === formatTimestamp(0))
7992
},
80-
AddData(inputData, 10, 12, 14),
93+
AddData(inputData2, 10, 12, 14),
8194
CheckAnswer(),
8295
assertEventStats { e =>
8396
assert(e.get("max") === formatTimestamp(14))
8497
assert(e.get("min") === formatTimestamp(10))
8598
assert(e.get("avg") === formatTimestamp(12))
8699
assert(e.get("watermark") === formatTimestamp(5))
87100
},
88-
AddData(inputData, 25),
101+
AddData(inputData2, 25),
89102
CheckAnswer(),
90103
assertEventStats { e =>
91104
assert(e.get("max") === formatTimestamp(25))
92105
assert(e.get("min") === formatTimestamp(25))
93106
assert(e.get("avg") === formatTimestamp(25))
94107
assert(e.get("watermark") === formatTimestamp(5))
95108
},
96-
AddData(inputData, 25),
109+
AddData(inputData2, 25),
97110
CheckAnswer((10, 3)),
98111
assertEventStats { e =>
99112
assert(e.get("max") === formatTimestamp(25))
@@ -124,6 +137,33 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
124137
)
125138
}
126139

140+
test("delay in months and years handled correctly") {
141+
val currentTimeMs = System.currentTimeMillis
142+
val currentTime = new Date(currentTimeMs)
143+
144+
val input = MemoryStream[Long]
145+
val aggWithWatermark = input.toDF()
146+
.withColumn("eventTime", $"value".cast("timestamp"))
147+
.withWatermark("eventTime", "2 years 5 months")
148+
.groupBy(window($"eventTime", "5 seconds") as 'window)
149+
.agg(count("*") as 'count)
150+
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
151+
152+
def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth }
153+
154+
testStream(aggWithWatermark)(
155+
AddData(input, currentTimeMs / 1000),
156+
CheckAnswer(),
157+
AddData(input, currentTimeMs / 1000),
158+
CheckAnswer(),
159+
assertEventStats { e =>
160+
assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000)
161+
val watermarkTime = timestampFormat.parse(e.get("watermark"))
162+
assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29)
163+
}
164+
)
165+
}
166+
127167
test("recovery") {
128168
val inputData = MemoryStream[Int]
129169
val df = inputData.toDF()
@@ -231,6 +271,13 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
231271
)
232272
}
233273

274+
private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
275+
AssertOnQuery { q =>
276+
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
277+
true
278+
}
279+
}
280+
234281
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
235282
timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
236283

0 commit comments

Comments
 (0)