Data processing with ETL and interactive map layers
Data processing with ETL and interactive map layers
Overview
In scenarios involving data processing, particularly with Extract, Transform, and Load (ETL) operations, it is essential to implement efficient and reliable methods. This document outlines the recommended approach for handling ETL tasks, especially when dealing with interactive map layer data.
Recommended technologies
Spark support for ETL operations:
Extraction and Loading:
Utilize the spark-support library for enhanced efficiency in read and write
operations. This is critical for both the extraction (Extract) and loading
(Load) phases of the ETL process.
Transformation:
To perform necessary data transformations effectively, it is advisable to leverage Apache Spark's Dataframes. These provide a robust and scalable framework for handling complex data manipulation tasks.
Project dependencies
If you want to create an application that uses the HERE platform Spark Connector to do ETL on an interactive map layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.
Formats
The spark connector provides write functionality for an interactive map layer specific format. Note that interactive map layer format/schema is based on GeoJSON and therefore inherently flexible.
ETL with interactive map layer data
The following snippet demonstrates how to read/modify/write
DataFrame[Row](Dataset<Row>) with an interactive map layers data:
- read interactive map layer data into a dataframe using
spark-support - update the dataframe. in this example by applying a user defined
function(UDF) to it. you can use other ways of course. but in our example a
UDF is used to update the
propertiescolumn. - write the dataframe back with
spark-support
import com.here.examples.platform.data.client.spark.ExampleUtil.{
getCatalogHrn,
getLayerId,
getSparkSession
}
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, SparkSession}
def readDF(): DataFrame = {
log.info(s"Reading data from $inCatalogHrn:$inLayerId")
val query = ".."
spark
.readLayer(inCatalogHrn, inLayerId)
.query(query)
.load()
}
def writeDf(df: DataFrame): Unit = {
log.info(s"Writing data to $outCatalogHrn:$outLayerId")
df.writeLayer(outCatalogHrn, outLayerId)
.save()
}
def transform(df: DataFrame): DataFrame = { // create a User Defined Function to update features
def updateProperties(m: Map[String, String]): Map[String, String] =
m.updated("info", "\"modified info\"")
val updatePropertiesUDF = udf(updateProperties _)
import spark.implicits._
df.withColumn(
"properties",
updatePropertiesUDF($"properties")
)
}
// Extract
val df = readDF()
// Transform
val updated = transform(df)
// Load
writeDf(updated)import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import java.util.Map;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
static Dataset<Row> readDF() {
log.info("Reading data from {}:{}", inCatalogHrn, inLayerId);
String query = "..";
return JavaLayerDataFrameReader.create(spark)
.readLayer(inCatalogHrn, inLayerId)
.query(query)
.load();
}
static Map<String, String> updateProperties(Map<String, String> props) {
props.put("info", "\"modified info\"");
return props;
}
static Dataset<Row> transform(Dataset<Row> df) {
// Registering the UDF
UserDefinedFunction updatePropertiesUDF =
udf(
(Map<String, String> input) -> updateProperties(input),
DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType));
spark.udf().register("updatePropertiesUDF", updatePropertiesUDF);
return df.withColumn("properties", callUDF("updatePropertiesUDF", col("properties")));
}
static void writeDF(Dataset<Row> df) {
JavaLayerDataFrameWriter.create(df).writeLayer(outCatalogHrn, outLayerId).save();
}
static void runETL() {
Dataset<Row> df = readDF();
Dataset<Row> updated = transform(df);
writeDF(updated);
}Conclusion
While spark-support takes responsibility of reading data into dataframes from
layers and writing it back from a dataframe. the transformation of dataframes
can be done entirely with pure Spark capabilities.
Updated 22 days ago