Skip to content

Commit 9a2e941

Browse files
committed
Minor changes
1 parent a14efdd commit 9a2e941

File tree

2 files changed

+3
-4
lines changed

2 files changed

+3
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,11 @@ class StreamingQueryProgress private[sql](
8787
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
8888
}
8989

90+
/** Convert map to JValue while handling empty maps. Also, this sorts the keys. */
9091
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
9192
if (map.isEmpty) return JNothing
9293
val keys = map.asScala.keySet.toSeq.sorted
93-
keys
94-
.map { k => k -> valueToJValue(map.get(k)) }
95-
.foldLeft("" -> JNothing: JObject)(_ ~ _)
94+
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
9695
}
9796

9897
("id" -> JString(id.toString)) ~

sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
5353
}
5454

5555

56-
test("event time and watermark metric") {
56+
test("event time and watermark metrics") {
5757
val inputData = MemoryStream[Int]
5858

5959
val windowedAggregation = inputData.toDF()

0 commit comments

Comments
 (0)