GuidesChangelogData Inspector Library API Reference
Guides

How to write index layer data

How to write index layer data

The Data Client Library provides the class LayerDataFrameWriter, a custom Spark DataFrameWriter for writing DataFrames to index layers.

For index layers, data will be grouped by the values of the index attributes defined for the layer. If writing to a Protobuf-Encoded layer, there needs to be only one Row for each set of index attributes.

Project dependencies

If you want to create an application that uses the HERE platform Spark Connector to write data to index layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.

Write process

For index layers, DataFrame rows are grouped by indexes attributes to create the index data. The data will be uploaded into the index layer using the write engine and afterwards published using the Publish API. Also, DataFrame can include additional metadata.

Metadata fields

All provided metadata columns for index layer:

Column nameData TypeMeaningRequire
mt_metadataMap[String, String]Metadata of partitionNo
mt_timestampLongTimestamp of creation (UTC)No
mt_checksumStringChecksum of payloadNo
mt_crcStringCRC of payloadNo
mt_dataSizeLongSize of payloadNo
mt_compressedDataSizeLongCompressed size of payloadNo

Write data as Avro, Parquet, or Protobuf-encoded files

The following snippet demonstrates how to write a DataFrame as an Avro, Parquet, or Protobuf-Encoded data file to an index layer of a catalog:

import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, SparkSession}
// val inputDF: DataFrame (Input data stored as a DataFrame)
// val outputCatalogHrn: HRN (HRN of the output catalog that contains the layer $outputLayerId)
// val outputLayerId: String (ID of the output index layer. If protobuf, the schema should match the non-indexed columns of $inputDF)
inputDF
  .writeLayer(outputCatalogHrn, outputLayerId)
  .save()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Dataset<Row> inputDF (Input data stored as a DataFrame)
// HRN outputCatalogHrn (HRN of a catalog that contains the layer $outputLayerId)
// String outputLayerId (ID of the output index layer. If protobuf, the schema should match the
// non-indexed columns of $inputDF)
JavaLayerDataFrameWriter.create(inputDF)
    .writeLayer(outputCatalogHrn, outputLayerId)
    .option("olp.connector.metadata-columns", true)
    .save();

Write data in csv format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains column field1 as integer, field2 as string.

import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-csv").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-csv").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

Write data in text format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains a column data with message as string.

import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-text").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-text").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

Write data in JSON format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains column intVal as integer, strVal as string.

import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-json").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-json").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

Write data in other formats

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format to an index layer of a catalog. In this example, the input DataFrame contains a column data with messages as strings and the data of multiple rows is simply concatenated together:

import com.here.examples.platform.data.client.spark.ExampleUtil._
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
  GroupedData,
  IndexDataConverter,
  IndexRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
// val inputDF: DataFrame (Input data stored as a DataFrame)
// val catalogHrn: HRN (HRN of the output catalog that contains the layer $layerId)
// val layerId: String (ID of the output index layer)
inputDF
  .writeLayer(catalogHrn, layerId)
  .withDataConverter(new IndexDataConverter {
    override def serializeGroup(
        rowMetadata: IndexRowMetadata,
        rows: Iterator[Row]
    ): GroupedData[IndexRowMetadata] = {
      val joinedText = rows
        .map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)
        .mkString
      GroupedData(rowMetadata, joinedText.getBytes())
    }
  })
  .save()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Dataset<Row> inputDF (Input data stored as a DataSet<Row>)
// HRN catalogHrn (HRN of the output catalog that contains the layer $layerId)
// String layerId: String (ID of the output index layer)
JavaLayerDataFrameWriter.create(inputDF)
    .writeLayer(catalogHrn, layerId)
    .withDataConverter(
        new IndexDataConverter() {
          @Override
          public GroupedData<IndexRowMetadata> serializeGroup(
              IndexRowMetadata rowMetadata, Iterator<Row> rows) {
            StringBuilder builder = new StringBuilder();
            rows.forEachRemaining(row -> builder.append(new String(row.<byte[]>getAs("data"))));
            String joinedText = builder.toString();
            return new GroupedData<>(rowMetadata, joinedText.getBytes());
          }
        })
    .save();

Note

For information on RSQL, see RSQL.