Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes: #4592; Flesh Out Apache Spark Examples Documentation #4631

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions example/scalalib/spark/4-rdd-spark/build.mill
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package build
import mill._, scalalib._

object `package` extends RootModule with ScalaModule {
def scalaVersion = "2.12.15"
def ivyDeps = Seq(
ivy"org.apache.spark::spark-core:3.5.4"
)

def forkArgs = Seq(
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
)

def prependShellScript = ""

object test extends ScalaTests {
def ivyDeps = Seq(ivy"com.lihaoyi::utest:0.8.5")
def testFramework = "utest.runner.Framework"

def forkArgs = Seq(
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
)
}

}

/** Usage

> ./mill run
...
Basic Transformations:
Squares: 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400
Even Squares: 4, 16, 36, 64, 100, 144, 196, 256, 324, 400
...
Key-Value Operations:
ReduceByKey: (0,63), (1,70), (2,77)
GroupByKey: (0,List(3, 6, 9, 12, 15, 18)), (1,List(1, 4, 7, 10, 13, 16, 19)), (2,List(2, 5, 8, 11, 14, 17, 20))
...
Advanced Operations:
Consistent Sample: 5, 8, 17
Distinct: 4, 1, 2, 3
Union: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30
...
Final Counts:
Total numbers: 20
Total squares: 20
Total pairs: 20
...

> ./mill test
...foo.FooTests.Basic Transformations...
...foo.FooTests.Key-Value Operations...
...foo.FooTests.Advanced Transformations...
...foo.FooTests.Count Operations...
...Tests: 4, Passed: 4, Failed: 0...

*/
52 changes: 52 additions & 0 deletions example/scalalib/spark/4-rdd-spark/src/foo/Foo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package foo

import org.apache.spark.{SparkConf, SparkContext}

object Foo {

def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Spark RDD Example")
.setMaster("local[*]")
val sc = new SparkContext(conf)

try {
// 1. Basic RDD Creation & Transformations
val numbers = sc.parallelize(1 to 20)
val squares = numbers.map(x => x * x)
val evenSquares = squares.filter(_ % 2 == 0)

// 2. Key-Value Pair Operations
val pairs = numbers.map(x => (x % 3, x))
val reduced = pairs.reduceByKey(_ + _)
val grouped = pairs.groupByKey()

// 3. Advanced Transformations
val consistentSample = numbers.sample(withReplacement = false, fraction = 0.25, seed = 42L)
val distinct = sc.parallelize(Seq(1, 1, 2, 3, 4, 4)).distinct()
val union = numbers.union(sc.parallelize(15 to 30))

// 4. Actions & Results
println("Basic Transformations:")
println(s"Squares: ${squares.collect().mkString(", ")}")
println(s"Even Squares: ${evenSquares.collect().mkString(", ")}")

println("\nKey-Value Operations:")
println(s"ReduceByKey: ${reduced.collect().mkString(", ")}")
println(s"GroupByKey: ${grouped.mapValues(_.toList).collect().mkString(", ")}")

println("\nAdvanced Operations:")
println(s"Consistent Sample: ${consistentSample.collect().mkString(", ")}")
println(s"Distinct: ${distinct.collect().mkString(", ")}")
println(s"Union: ${union.collect().mkString(", ")}")

println("\nFinal Counts:")
println(s"Total numbers: ${numbers.count()}")
println(s"Total squares: ${squares.count()}")
println(s"Total pairs: ${pairs.count()}")

} finally {
sc.stop()
}
}
}
85 changes: 85 additions & 0 deletions example/scalalib/spark/4-rdd-spark/test/src/FooTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package foo

import org.apache.spark.{SparkConf, SparkContext}
import utest._

object FooTests extends TestSuite {
// Initialize SparkContext for tests
val conf = new SparkConf()
.setAppName("Spark RDD Tests")
.setMaster("local[*]")
val sc = new SparkContext(conf)

// Tests will run in parallel, so we need to ensure proper cleanup
override def utestAfterAll(): Unit = {
sc.stop()
}

val tests = Tests {
test("Basic Transformations") {
val numbers = sc.parallelize(1 to 20)
val squares = numbers.map(x => x * x)
val evenSquares = squares.filter(_ % 2 == 0)

// Test squares
assert(squares.collect().toList == List(
1, 4, 9, 16, 25, 36, 49, 64, 81, 100,
121, 144, 169, 196, 225, 256, 289, 324, 361, 400
))

// Test even squares
assert(evenSquares.collect().toList == List(
4, 16, 36, 64, 100, 144, 196, 256, 324, 400
))
}

test("Key-Value Operations") {
val numbers = sc.parallelize(1 to 20)
val pairs = numbers.map(x => (x % 3, x))
val reduced = pairs.reduceByKey(_ + _)
val grouped = pairs.groupByKey()

// Test reduceByKey
assert(reduced.collect().toMap == Map(
0 -> 63, // 3+6+9+12+15+18
1 -> 70, // 1+4+7+10+13+16+19
2 -> 77 // 2+5+8+11+14+17+20
))

// Test groupByKey
val groupedResult = grouped.mapValues(_.toList).collect().toMap
assert(groupedResult(0).sorted == List(3, 6, 9, 12, 15, 18))
assert(groupedResult(1).sorted == List(1, 4, 7, 10, 13, 16, 19))
assert(groupedResult(2).sorted == List(2, 5, 8, 11, 14, 17, 20))
}

test("Advanced Transformations") {
val numbers = sc.parallelize(1 to 20)

// Test consistent sampling
val sample1 = numbers.sample(withReplacement = false, 0.25, 42L)
val sample2 = numbers.sample(withReplacement = false, 0.25, 42L)
assert(sample1.collect().toList == sample2.collect().toList)

// Test distinct
val duplicates = sc.parallelize(Seq(1, 1, 2, 3, 4, 4))
assert(duplicates.distinct().collect().toSet == Set(1, 2, 3, 4))

// Test union
val rdd1 = sc.parallelize(1 to 5)
val rdd2 = sc.parallelize(3 to 7)
assert(rdd1.union(rdd2).collect().toSet == Set(1, 2, 3, 4, 5, 6, 7))
}

test("Count Operations") {
val numbers = sc.parallelize(1 to 20)
val pairs = numbers.map(x => (x % 3, x))

assert(numbers.count() == 20)
assert(pairs.count() == 20)

val emptyRDD = sc.parallelize(Seq.empty[Int])
assert(emptyRDD.count() == 0)
}
}
}
105 changes: 105 additions & 0 deletions example/scalalib/spark/5-sql-analytics/build.mill
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package build
import mill._, scalalib._

object `package` extends RootModule with ScalaModule {
def scalaVersion = "2.12.15"
def ivyDeps = Seq(
ivy"org.apache.spark::spark-core:3.5.4",
ivy"org.apache.spark::spark-sql:3.5.4"
)

def forkArgs = Seq(
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
)

def prependShellScript = ""

object test extends ScalaTests {
def ivyDeps = Seq(ivy"com.lihaoyi::utest:0.8.5")
def testFramework = "utest.runner.Framework"

def forkArgs = Seq(
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
)
}

}

/** Usage

> ./mill run
...
Running analysis: top_products
Results for top_products:
+----------+------------+-----------+----------+
|product_id|product_name|total_sales|sales_rank|
+----------+------------+-----------+----------+
|P00008 |Product 8 |4912 |1 |
|P00016 |Product 16 |4312 |2 |
|P00019 |Product 19 |4203 |3 |
|P00017 |Product 17 |3995 |4 |
|P00020 |Product 20 |3158 |5 |
|P00013 |Product 13 |2946 |6 |
|P00007 |Product 7 |2845 |7 |
|P00003 |Product 3 |2744 |8 |
|P00006 |Product 6 |2725 |9 |
|P00005 |Product 5 |2543 |10 |
|P00018 |Product 18 |2394 |11 |
|P00004 |Product 4 |2361 |12 |
|P00009 |Product 9 |2171 |13 |
|P00002 |Product 2 |2088 |14 |
|P00012 |Product 12 |1784 |15 |
|P00011 |Product 11 |1605 |16 |
|P00014 |Product 14 |1144 |17 |
|P00010 |Product 10 |1010 |18 |
|P00001 |Product 1 |929 |19 |
|P00015 |Product 15 |887 |20 |
+----------+------------+-----------+----------+
...
Running analysis: monthly_sales_trend
Results for monthly_sales_trend:
+----+-----+-------------+----------------+
|year|month|monthly_sales|cumulative_sales|
+----+-----+-------------+----------------+
|2024|2 |703 |703 |
|2024|3 |5241 |5944 |
|2024|4 |4741 |10685 |
|2024|5 |6162 |16847 |
|2024|6 |2136 |18983 |
|2024|7 |4009 |22992 |
|2024|8 |2527 |25519 |
|2024|9 |2886 |28405 |
|2024|10 |4997 |33402 |
|2024|11 |3487 |36889 |
|2024|12 |3219 |40108 |
|2025|1 |3597 |43705 |
|2025|2 |7051 |50756 |
+----+-----+-------------+----------------+
...
Running analysis: sales_by_region
Results for sales_by_region:
+-------------+-----------+
|region |total_sales|
+-------------+-----------+
|International|15378 |
|West |10796 |
|North |9466 |
|East |8147 |
|South |6969 |
+-------------+-----------+
...

> ./mill test
...foo.FooTests.DataLoadingTest...
...foo.FooTests.SalesByRegionAnalysis...
...foo.FooTests.TopProductsAnalysis...
...foo.FooTests.MonthlyTrendAnalysis...
...Tests: 4, Passed: 4, Failed: 0...

*/
Loading
Loading