Skip to content

Commit 65fa7bc

Browse files
marmbrusrxin
authored andcommitted
[SQL] Simple framework for debugging query execution
Only records number of tuples and unique dataTypes output right now... Example: ```scala scala> import org.apache.spark.sql.execution.debug._ scala> hql("SELECT value FROM src WHERE key > 10").debug(sparkContext) Results returned: 489 == Project [value#1:0] == Tuples output: 489 value StringType: {java.lang.String} == Filter (key#0:1 > 10) == Tuples output: 489 value StringType: {java.lang.String} key IntegerType: {java.lang.Integer} == HiveTableScan [value#1,key#0], (MetastoreRelation default, src, None), None == Tuples output: 500 value StringType: {java.lang.String} key IntegerType: {java.lang.Integer} ``` Author: Michael Armbrust <michael@databricks.com> Closes #1005 from marmbrus/debug and squashes the following commits: dcc3ca6 [Michael Armbrust] Add comments. c9dded2 [Michael Armbrust] Simple framework for debugging query execution (cherry picked from commit c6e041d) Signed-off-by: Reynold Xin <rxin@apache.org>
1 parent 73cd1f8 commit 65fa7bc

File tree

3 files changed

+119
-50
lines changed

3 files changed

+119
-50
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

-5
Original file line numberDiff line numberDiff line change
@@ -281,11 +281,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
281281
|== Physical Plan ==
282282
|${stringOrError(executedPlan)}
283283
""".stripMargin.trim
284-
285-
/**
286-
* Runs the query after interposing operators that print the result of each intermediate step.
287-
*/
288-
def debugExec() = DebugQuery(executedPlan).execute().collect()
289284
}
290285

291286
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala

-45
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import scala.collection.mutable.HashSet
21+
22+
import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
23+
import org.apache.spark.annotation.DeveloperApi
24+
import org.apache.spark.SparkContext._
25+
import org.apache.spark.sql.{SchemaRDD, Row}
26+
27+
/**
28+
* :: DeveloperApi ::
29+
* Contains methods for debugging query execution.
30+
*
31+
* Usage:
32+
* {{{
33+
* sql("SELECT key FROM src").debug
34+
* }}}
35+
*/
36+
package object debug {
37+
38+
/**
39+
* :: DeveloperApi ::
40+
* Augments SchemaRDDs with debug methods.
41+
*/
42+
@DeveloperApi
43+
implicit class DebugQuery(query: SchemaRDD) {
44+
def debug(implicit sc: SparkContext): Unit = {
45+
val plan = query.queryExecution.executedPlan
46+
val visited = new collection.mutable.HashSet[Long]()
47+
val debugPlan = plan transform {
48+
case s: SparkPlan if !visited.contains(s.id) =>
49+
visited += s.id
50+
DebugNode(sc, s)
51+
}
52+
println(s"Results returned: ${debugPlan.execute().count()}")
53+
debugPlan.foreach {
54+
case d: DebugNode => d.dumpStats()
55+
case _ =>
56+
}
57+
}
58+
}
59+
60+
private[sql] case class DebugNode(
61+
@transient sparkContext: SparkContext,
62+
child: SparkPlan) extends UnaryNode {
63+
def references = Set.empty
64+
65+
def output = child.output
66+
67+
implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
68+
def zero(initialValue: HashSet[String]): HashSet[String] = {
69+
initialValue.clear()
70+
initialValue
71+
}
72+
73+
def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
74+
v1 ++= v2
75+
v1
76+
}
77+
}
78+
79+
/**
80+
* A collection of stats for each column of output.
81+
* @param elementTypes the actual runtime types for the output. Useful when there are bugs
82+
* causing the wrong data to be projected.
83+
*/
84+
case class ColumnStat(
85+
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
86+
val tupleCount = sparkContext.accumulator[Int](0)
87+
88+
val numColumns = child.output.size
89+
val columnStats = Array.fill(child.output.size)(new ColumnStat())
90+
91+
def dumpStats(): Unit = {
92+
println(s"== ${child.simpleString} ==")
93+
println(s"Tuples output: ${tupleCount.value}")
94+
child.output.zip(columnStats).foreach { case(attr, stat) =>
95+
val actualDataTypes =stat.elementTypes.value.mkString("{", ",", "}")
96+
println(s" ${attr.name} ${attr.dataType}: $actualDataTypes")
97+
}
98+
}
99+
100+
def execute() = {
101+
child.execute().mapPartitions { iter =>
102+
new Iterator[Row] {
103+
def hasNext = iter.hasNext
104+
def next() = {
105+
val currentRow = iter.next()
106+
tupleCount += 1
107+
var i = 0
108+
while (i < numColumns) {
109+
val value = currentRow(i)
110+
columnStats(i).elementTypes += HashSet(value.getClass.getName)
111+
i += 1
112+
}
113+
currentRow
114+
}
115+
}
116+
}
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)