Skip to main content

Table schemas in data pipelines Spark: How to handle large, nested & growing ones

Picture of Michał Mstowski, Big Data Software Engineer

Michał Mstowski

Big Data Software Engineer
Aug 20, 2021|12 min read
Table_schemas_in_data_pipelines_Spark_How_to_handle_large,_nested_&_growing_ones_image-min.jpg
1_uipFZ0sga9fYPVPk1EqFmw

1| products: array (nullable = true)
2| | element: struct (containsNull = true)
3| | | created: string (nullable = true)
4| | | description: string (nullable = true)
5| | | fulfilment: array (nullable = true)
6| | | | element: struct (containsNull = true)
7| | | | | id: string (nullable = true)
8| | | | | status: string (nullable = true)
9| | | | | quantity: struct (nullable = true)
10| | | | | | lineIds: array (nullable = true)
11| | | | | | | element: string (containsNull = true)
12| | | | | | number: string (nullable = true)
13| | | | | | measure: string (nullable = true)
14| | | | | | size: string (nullable = true)
15| | | | | products: array (nullable = true)
16| | | | | | element: struct (containsNull = true)
17| | | | | | | id: string (nullable = true)
18| | | | | | | quantity: struct (nullable = true)
19| | | | | | | | ids: array (nullable = true)
20| | | | | | | | | element: string (containsNull = true)
21| | | | | | | | number: string (nullable = true)
22| | | | | | | | measure: string (nullable = true)
23| | | | | | | reason: string (nullable = true)
24| | | | | tracking: struct (nullable = true)
25| | | | | | number: string (nullable = true)
26| | | gtin: string (nullable = true)
27| | | lines: array (nullable = true)
28| | | | element: struct (containsNull = true)
29| | | | | created: string (nullable = true)
30| | | | | id: string (nullable = true)
31| | | | | lastUpdated: string (nullable = true)
1case class DataSource(???)
2case class DataTarget(???)
3
4def readData(source:String): Array[DataSource] = {
5 // read data from externa source or from hdfs
6}
7
8def mapData(sourceDS: Dataset[DataSource]): Dataset[DataTarget] = {
9 // change column names, add ingestion timestamp etc.
10}
11
12import spark.implicits._
13val source = ???
14val targeTable = ???
15
16val sourceData = readData(source)
17val targetDS = mapData(sourceData.toDS)
18targetDS.writeTable(targetTable)
1_YZIYq3-NmgGjeamzHqLVEAAA

Subscribe to our newsletter and never miss an article