@@ -19,15 +19,17 @@ package org.apache.spark.sql.streaming
19
19
20
20
import java .{util => ju }
21
21
import java .text .SimpleDateFormat
22
+ import java .util .{Calendar , Date }
22
23
23
24
import org .scalatest .BeforeAndAfter
24
25
25
26
import org .apache .spark .internal .Logging
26
27
import org .apache .spark .sql .{AnalysisException , Row }
27
28
import org .apache .spark .sql .execution .streaming ._
28
29
import org .apache .spark .sql .functions .{count , window }
30
+ import org .apache .spark .sql .InternalOutputModes .Complete
29
31
30
- class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
32
+ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
31
33
32
34
import testImplicits ._
33
35
@@ -52,48 +54,59 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
52
54
assert(e.getMessage contains " int" )
53
55
}
54
56
55
-
56
57
test(" event time and watermark metrics" ) {
57
- val inputData = MemoryStream [Int ]
58
+ // No event time metrics when there is no watermarking
59
+ val inputData1 = MemoryStream [Int ]
60
+ val aggWithoutWatermark = inputData1.toDF()
61
+ .withColumn(" eventTime" , $" value" .cast(" timestamp" ))
62
+ .groupBy(window($" eventTime" , " 5 seconds" ) as ' window )
63
+ .agg(count(" *" ) as ' count )
64
+ .select($" window" .getField(" start" ).cast(" long" ).as[Long ], $" count" .as[Long ])
58
65
59
- val windowedAggregation = inputData.toDF()
66
+ testStream(aggWithoutWatermark, outputMode = Complete )(
67
+ AddData (inputData1, 15 ),
68
+ CheckAnswer ((15 , 1 )),
69
+ assertEventStats { e => assert(e.isEmpty) },
70
+ AddData (inputData1, 10 , 12 , 14 ),
71
+ CheckAnswer ((10 , 3 ), (15 , 1 )),
72
+ assertEventStats { e => assert(e.isEmpty) }
73
+ )
74
+
75
+ // All event time metrics where watermarking is set
76
+ val inputData2 = MemoryStream [Int ]
77
+ val aggWithWatermark = inputData2.toDF()
60
78
.withColumn(" eventTime" , $" value" .cast(" timestamp" ))
61
79
.withWatermark(" eventTime" , " 10 seconds" )
62
80
.groupBy(window($" eventTime" , " 5 seconds" ) as ' window )
63
81
.agg(count(" *" ) as ' count )
64
82
.select($" window" .getField(" start" ).cast(" long" ).as[Long ], $" count" .as[Long ])
65
83
66
- def assertEventStats (body : ju.Map [String , String ] => Unit ): AssertOnQuery = AssertOnQuery { q =>
67
- body(q.recentProgress.filter(_.numInputRows > 0 ).lastOption.get.eventTime)
68
- true
69
- }
70
-
71
- testStream(windowedAggregation)(
72
- AddData (inputData, 15 ),
84
+ testStream(aggWithWatermark)(
85
+ AddData (inputData2, 15 ),
73
86
CheckAnswer (),
74
87
assertEventStats { e =>
75
88
assert(e.get(" max" ) === formatTimestamp(15 ))
76
89
assert(e.get(" min" ) === formatTimestamp(15 ))
77
90
assert(e.get(" avg" ) === formatTimestamp(15 ))
78
91
assert(e.get(" watermark" ) === formatTimestamp(0 ))
79
92
},
80
- AddData (inputData , 10 , 12 , 14 ),
93
+ AddData (inputData2 , 10 , 12 , 14 ),
81
94
CheckAnswer (),
82
95
assertEventStats { e =>
83
96
assert(e.get(" max" ) === formatTimestamp(14 ))
84
97
assert(e.get(" min" ) === formatTimestamp(10 ))
85
98
assert(e.get(" avg" ) === formatTimestamp(12 ))
86
99
assert(e.get(" watermark" ) === formatTimestamp(5 ))
87
100
},
88
- AddData (inputData , 25 ),
101
+ AddData (inputData2 , 25 ),
89
102
CheckAnswer (),
90
103
assertEventStats { e =>
91
104
assert(e.get(" max" ) === formatTimestamp(25 ))
92
105
assert(e.get(" min" ) === formatTimestamp(25 ))
93
106
assert(e.get(" avg" ) === formatTimestamp(25 ))
94
107
assert(e.get(" watermark" ) === formatTimestamp(5 ))
95
108
},
96
- AddData (inputData , 25 ),
109
+ AddData (inputData2 , 25 ),
97
110
CheckAnswer ((10 , 3 )),
98
111
assertEventStats { e =>
99
112
assert(e.get(" max" ) === formatTimestamp(25 ))
@@ -124,6 +137,33 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
124
137
)
125
138
}
126
139
140
+ test(" delay in months and years handled correctly" ) {
141
+ val currentTimeMs = System .currentTimeMillis
142
+ val currentTime = new Date (currentTimeMs)
143
+
144
+ val input = MemoryStream [Long ]
145
+ val aggWithWatermark = input.toDF()
146
+ .withColumn(" eventTime" , $" value" .cast(" timestamp" ))
147
+ .withWatermark(" eventTime" , " 2 years 5 months" )
148
+ .groupBy(window($" eventTime" , " 5 seconds" ) as ' window )
149
+ .agg(count(" *" ) as ' count )
150
+ .select($" window" .getField(" start" ).cast(" long" ).as[Long ], $" count" .as[Long ])
151
+
152
+ def monthsSinceEpoch (date : Date ): Int = { date.getYear * 12 + date.getMonth }
153
+
154
+ testStream(aggWithWatermark)(
155
+ AddData (input, currentTimeMs / 1000 ),
156
+ CheckAnswer (),
157
+ AddData (input, currentTimeMs / 1000 ),
158
+ CheckAnswer (),
159
+ assertEventStats { e =>
160
+ assert(timestampFormat.parse(e.get(" max" )).getTime === (currentTimeMs / 1000 ) * 1000 )
161
+ val watermarkTime = timestampFormat.parse(e.get(" watermark" ))
162
+ assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29 )
163
+ }
164
+ )
165
+ }
166
+
127
167
test(" recovery" ) {
128
168
val inputData = MemoryStream [Int ]
129
169
val df = inputData.toDF()
@@ -231,6 +271,13 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
231
271
)
232
272
}
233
273
274
+ private def assertEventStats (body : ju.Map [String , String ] => Unit ): AssertOnQuery = {
275
+ AssertOnQuery { q =>
276
+ body(q.recentProgress.filter(_.numInputRows > 0 ).lastOption.get.eventTime)
277
+ true
278
+ }
279
+ }
280
+
234
281
private val timestampFormat = new SimpleDateFormat (" yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" ) // ISO8601
235
282
timestampFormat.setTimeZone(ju.TimeZone .getTimeZone(" UTC" ))
236
283
0 commit comments