How to write index layer data
How to write index layer data
The Data Client Library provides the class LayerDataFrameWriter, a custom
Spark
DataFrameWriter
for writing
DataFrames
to index layers.
For index layers, data will be grouped by the values of the index attributes
defined for the layer. If writing to a Protobuf-Encoded layer, there needs to be
only one Row for each set of index attributes.
Project dependencies
If you want to create an application that uses the HERE platform Spark Connector to write data to index layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.
Write process
For index layers, DataFrame rows are grouped by indexes attributes to create
the index data. The data will be uploaded into the index layer using the write
engine and afterwards published using the Publish API. Also, DataFrame can
include additional metadata.
Metadata fields
All provided metadata columns for index layer:
| Column name | Data Type | Meaning | Require |
|---|---|---|---|
mt_metadata | Map[String, String] | Metadata of partition | No |
mt_timestamp | Long | Timestamp of creation (UTC) | No |
mt_checksum | String | Checksum of payload | No |
mt_crc | String | CRC of payload | No |
mt_dataSize | Long | Size of payload | No |
mt_compressedDataSize | Long | Compressed size of payload | No |
Write data as Avro, Parquet, or Protobuf-encoded files
The following snippet demonstrates how to write a DataFrame as an Avro,
Parquet, or Protobuf-Encoded data file to an index layer of a catalog:
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, SparkSession}
// val inputDF: DataFrame (Input data stored as a DataFrame)
// val outputCatalogHrn: HRN (HRN of the output catalog that contains the layer $outputLayerId)
// val outputLayerId: String (ID of the output index layer. If protobuf, the schema should match the non-indexed columns of $inputDF)
inputDF
.writeLayer(outputCatalogHrn, outputLayerId)
.save()import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Dataset<Row> inputDF (Input data stored as a DataFrame)
// HRN outputCatalogHrn (HRN of a catalog that contains the layer $outputLayerId)
// String outputLayerId (ID of the output index layer. If protobuf, the schema should match the
// non-indexed columns of $inputDF)
JavaLayerDataFrameWriter.create(inputDF)
.writeLayer(outputCatalogHrn, outputLayerId)
.option("olp.connector.metadata-columns", true)
.save();Write data in csv format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains column
field1 as integer, field2 as string.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-csv").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-csv").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();Write data in text format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains a column
data with message as string.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-text").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-text").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();Write data in JSON format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains column
intVal as integer, strVal as string.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-json").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-json").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();Write data in other formats
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format to an index layer of a catalog. In this example, the input
DataFrame contains a column data with messages as strings and the data of
multiple rows is simply concatenated together:
import com.here.examples.platform.data.client.spark.ExampleUtil._
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
IndexDataConverter,
IndexRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
// val inputDF: DataFrame (Input data stored as a DataFrame)
// val catalogHrn: HRN (HRN of the output catalog that contains the layer $layerId)
// val layerId: String (ID of the output index layer)
inputDF
.writeLayer(catalogHrn, layerId)
.withDataConverter(new IndexDataConverter {
override def serializeGroup(
rowMetadata: IndexRowMetadata,
rows: Iterator[Row]
): GroupedData[IndexRowMetadata] = {
val joinedText = rows
.map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)
.mkString
GroupedData(rowMetadata, joinedText.getBytes())
}
})
.save()import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Dataset<Row> inputDF (Input data stored as a DataSet<Row>)
// HRN catalogHrn (HRN of the output catalog that contains the layer $layerId)
// String layerId: String (ID of the output index layer)
JavaLayerDataFrameWriter.create(inputDF)
.writeLayer(catalogHrn, layerId)
.withDataConverter(
new IndexDataConverter() {
@Override
public GroupedData<IndexRowMetadata> serializeGroup(
IndexRowMetadata rowMetadata, Iterator<Row> rows) {
StringBuilder builder = new StringBuilder();
rows.forEachRemaining(row -> builder.append(new String(row.<byte[]>getAs("data"))));
String joinedText = builder.toString();
return new GroupedData<>(rowMetadata, joinedText.getBytes());
}
})
.save();
NoteFor information on RSQL, see RSQL.
Updated 22 days ago