Skip to main content

Reconciling Spark APIs for Scala

Picture of Michał Pałka, Senior Scala Developer

Michał Pałka

Senior Scala Developer
Jul 22, 2022|10 min read
Reconciling_Spark_APIs_for_Scala_image-min.jpg
1​​case class Measurement(
2 stationId: Long,
3 temperature: Int /* in °C */,
4 pressure: Int /* in hPa */,
5 timestamp: Long
6)

Let's connect
1measurements
2 .groupByKey(_.stationId)
3 .mapGroups { (stationId, measurementss) =>
4 val temperatures = measurementss.map(_.temperature)
5 val pressures = measurementss.map(_.pressure)
6 (
7 stationId,
8 temperatures.min,
9 temperatures.max,
10 pressures.sum.toDouble / pressures.length
11 )
12 }
13 .filter(entry => entry._3 - entry._2 < 20)
14 .map(entry => (entry._1, entry._4))
1case class AggregatedMeasurement(
2 stationId: Long,
3 minTemperature: Int,
4 maxTemperature: Int,
5 avgPressure: Double
6)
7
8/* … */
9
10 measurements
11 .groupByKey(_.stationId)
12 .mapGroups { (stationId, measurementss) =>
13 val temperatures = measurementss.map(_.temperature)
14 val pressures = measurementss.map(_.pressure)
15 AggregatedMeasurement(
16 stationId = stationId,
17 minTemperature = temperatures.min,
18 maxTemperature = temperatures.max,
19 avgPressure = pressures.sum.toDouble / pressures.length
20 )
21 }
22 .filter(aggregated => aggregated.maxTemperature - aggregated.minTemperature < 20)
23 .map(aggregated => (aggregated.stationId, aggregated.avgPressure))
1measurements
2 .groupBy($"stationId")
3 .agg(
4 min($"temperature").as("minTemperature"),
5 max($"temperature").as("maxTemperature"),
6 avg($"pressure").as("avgPressure")
7 )
8 .where($"maxTemperature" - $"minTemperture" < lit(20))
9 .select($"stationId", $"avgPressure")
1 measurements
2 .groupBy($.stationId)
3 .agg(
4 min($.temperature).as("minTemperature"),
5 max($.temperature).as("maxTemperature"),
6 avg($.pressure).as("avgPressure")
7 )
8 .where($.maxTemperature - $.minTemperature < lit(20))
9 .select($.stationId, $.avgPressure)
1import scala.language.dynamics
2import org.apache.spark.sql.functions.col
3
4object $ extends Dynamic {
5 def selectDynamic(name: String) = col(name)
6}
1import org.apache.spark.sql.{ Column => UntypedColumn }
2import org.apache.spark.sql.functions.col
3
4class Column[T](val untyped: UntypedColumn) extends AnyVal
5
6trait RowModel extends Selectable {
7 def selectDynamic(name: String) = Column(col(name))
8}
9
10def $: RowModel { /* ... */ } = new RowModel { /* .. */ }
1RowModel {
2 def stationId: Column[Long]
3 def temperature: Column[Int]
4 def pressure: Column[Int],
5 def timestamp: Column[Long]
6}
1def bar(fun: Context => Int) = ???
2def baz(implicit context: Context): Int = ???
3
4bar { implicit context =>
5 baz
6}
1def bar(fun: Context ?=> Int) = ???
2def baz(using context: Context): Int = ???
3
4bar {
5 baz
6}
1def $(using rowModel: RowModel): rowModel.type = rowModel
1//> using scala "3.2.0"
2//> using lib "org.virtuslab::iskra:0.0.2"
3
4import org.virtuslab.iskra.api.*
5
6case class Measurement(
7 stationId: Long,
8 temperature: Int /* in °C */,
9 pressure: Int /* in hPa */,
10 timestamp: Long
11)
12
13@main def run() =
14 given spark: SparkSession = SparkSession.builder()
15 .master("local")
16 .appName("weather-stations")
17 .getOrCreate()
18
19 val measurements = Seq(
20 Measurement(1, 10, 1020, 1641399107),
21 Measurement(2, -5, 1036, 1647015112),
22 Measurement(1, 19, 996, 1649175104),
23 Measurement(2, 25, 1015, 1657030348),
24 /* more data … */
25 ).toTypedDF
26
27 import functions.{avg, min, max, lit}
28
29 measurements
30 .groupBy($.stationId)
31 .agg(
32 min($.temperature).as("minTemperature"),
33 max($.temperature).as("maxTemperature"),
34 avg($.pressure).as("avgPressure")
35 )
36 .where($.maxTemperature - $.minTemperature < lit(20))
37 .select($.stationId, $.avgPressure)
38 .show()
iskra-coding

Let's connect

Curated by Sebastian Synowiec

Subscribe to our newsletter and never miss an article