GuidesChangelogData Inspector Library API Reference
Guides

Data processing with ETL and interactive map layers

Data processing with ETL and interactive map layers

Overview

In scenarios involving data processing, particularly with Extract, Transform, and Load (ETL) operations, it is essential to implement efficient and reliable methods. This document outlines the recommended approach for handling ETL tasks, especially when dealing with interactive map layer data.

Recommended technologies

Spark support for ETL operations:

Extraction and Loading:

Utilize the spark-support library for enhanced efficiency in read and write operations. This is critical for both the extraction (Extract) and loading (Load) phases of the ETL process.

Transformation:

To perform necessary data transformations effectively, it is advisable to leverage Apache Spark's Dataframes. These provide a robust and scalable framework for handling complex data manipulation tasks.

Project dependencies

If you want to create an application that uses the HERE platform Spark Connector to do ETL on an interactive map layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.

Formats

The spark connector provides write functionality for an interactive map layer specific format. Note that interactive map layer format/schema is based on GeoJSON and therefore inherently flexible.

ETL with interactive map layer data

The following snippet demonstrates how to read/modify/write DataFrame[Row](Dataset<Row>) with an interactive map layers data:

  1. read interactive map layer data into a dataframe using spark-support
  2. update the dataframe. in this example by applying a user defined function(UDF) to it. you can use other ways of course. but in our example a UDF is used to update the properties column.
  3. write the dataframe back with spark-support
import com.here.examples.platform.data.client.spark.ExampleUtil.{
  getCatalogHrn,
  getLayerId,
  getSparkSession
}
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, SparkSession}
def readDF(): DataFrame = {
  log.info(s"Reading data from $inCatalogHrn:$inLayerId")
  val query = ".."
  spark
    .readLayer(inCatalogHrn, inLayerId)
    .query(query)
    .load()
}

def writeDf(df: DataFrame): Unit = {
  log.info(s"Writing data to $outCatalogHrn:$outLayerId")
  df.writeLayer(outCatalogHrn, outLayerId)
    .save()
}

def transform(df: DataFrame): DataFrame = { // create a User Defined Function to update features
  def updateProperties(m: Map[String, String]): Map[String, String] =
    m.updated("info", "\"modified info\"")

  val updatePropertiesUDF = udf(updateProperties _)

  import spark.implicits._

  df.withColumn(
    "properties",
    updatePropertiesUDF($"properties")
  )
}

// Extract
val df = readDF()

// Transform
val updated = transform(df)

// Load
writeDf(updated)
import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import java.util.Map;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
static Dataset<Row> readDF() {
  log.info("Reading data from {}:{}", inCatalogHrn, inLayerId);
  String query = "..";
  return JavaLayerDataFrameReader.create(spark)
      .readLayer(inCatalogHrn, inLayerId)
      .query(query)
      .load();
}

static Map<String, String> updateProperties(Map<String, String> props) {
  props.put("info", "\"modified info\"");
  return props;
}

static Dataset<Row> transform(Dataset<Row> df) {
  // Registering the UDF
  UserDefinedFunction updatePropertiesUDF =
      udf(
          (Map<String, String> input) -> updateProperties(input),
          DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType));

  spark.udf().register("updatePropertiesUDF", updatePropertiesUDF);

  return df.withColumn("properties", callUDF("updatePropertiesUDF", col("properties")));
}

static void writeDF(Dataset<Row> df) {
  JavaLayerDataFrameWriter.create(df).writeLayer(outCatalogHrn, outLayerId).save();
}

static void runETL() {
  Dataset<Row> df = readDF();
  Dataset<Row> updated = transform(df);
  writeDF(updated);
}

Conclusion

While spark-support takes responsibility of reading data into dataframes from layers and writing it back from a dataframe. the transformation of dataframes can be done entirely with pure Spark capabilities.