How to write versioned layer data
How to write versioned layer data
The spark-support module provides the class LayerDataFrameWriter, a custom
Spark
DataFrameWriter
for writing
DataFrames
to versioned layers.
Project dependencies
If you want to create an application that uses the HERE platform Spark Connector to write data to versioned layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.
Formats
The spark connector provides write functionality for the following formats:
- Protobuf
- Avro
- Parquet
- Raw
- JSON
- Text
- Csv
NoteUsage specifics
ForRawformat, data converter must be implemented fromDataConvertertrait/interface and set usingwithDataConvertermethod. Note thatformatmethod is not required.
Write process
For versioned layers, DataFrame rows are grouped by the mt_partition column
to create the partition data. The data will be uploaded into the versioned layer
using the write engine and afterwards published using the Publish API. Also,
DataFrame can include additional metadata.
Note that having multiple rows for the same partition is allowed only for Avro
and Parquet format. For the other formats, this will throw an error.
Metadata fields
All provided metadata columns for versioned layer:
| Column name | Data Type | Meaning | Require |
|---|---|---|---|
mt_partition | String | ID of partition in HERE platform | Yes |
mt_timestamp | Long | Timestamp of creation | No |
mt_checksum | String | Checksum of payload | No |
mt_crc | String | CRC of payload | No |
mt_dataSize | Long | Size of payload | No |
mt_compressedDataSize | Long | Compressed size of payload | No |
mt_dataHandle | String | Handler of the data | No |
NoteRestrictions of metadata
Metadata column type must be as in the table above, otherwise will throw anIllegalArgumentExceptionexception. Extra fields that started withmt_but not provided are ignored at writes.
Write data in protobuf format
The following snippet demonstrates how to manipulate and write a
DataFrame[Row](Dataset<Row>) as a Protobuf-Encoded data file:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, count, first, lit}
import org.apache.spark.sql.types.IntegerType
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate()
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// +-------+--------------+
// | text|partition_name|
// +-------+--------------+
// |value-1| partition-1|
// |value-2| partition-2|
// |value-1| partition-1|
// +-------+--------------+
// Some computations
val computationDataFrame = inputDataFrame
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"))
// Casting column type and adding extra mt_timestamp column
val outDataFrame = computationDataFrame
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis))
outDataFrame.show()
//+-------+-----+------------+-------------+
//| text|count|mt_partition| mt_timestamp|
//+-------+-----+------------+-------------+
//|value-2| 1| partition-1|1569598607978|
//|value-1| 2| partition-2|1569598607978|
//+-------+-----+------------+-------------+
outDataFrame
.writeLayer(catalogHrn, layerId)
.option("olp.connector.metadata-columns", true)
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-protobuf-layer";
Dataset<Row> inputDataset = loadDataFrame(sparkSession);
inputDataset.show();
// ------------------------
// | text|partition_name|
// ------------------------
// |value-1| partition-1|
// |value-2| partition-2|
// |value-1| partition-1|
// ------------------------
// Some computations
Dataset<Row> computationDataset =
inputDataset
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"));
// Casting column type and adding extra mt_timestamp column
Dataset<Row> outDataset =
computationDataset
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis()));
outDataset.show();
// ------------------------------------------
// | text|count|mt_partition| mt_timestamp|
// ------------------------------------------
// |value-2| 1| partition-1|1569598607978|
// |value-1| 2| partition-2|1569598607978|
// ------------------------------------------
JavaLayerDataFrameWriter.create(outDataset)
.writeLayer(catalogHrn, layerId)
.option("olp.connector.metadata-columns", true)
.save();
sparkSession.stop();
NoteRestrictions
Metadatamt_partitioncolumn must have unique values, otherwise will throw anIllegalArgumentExceptionexception. Layer must-haveapplication/x-protobufcontent type.
Write data in Avro and Parquet formats
The following snippet demonstrates how to write a
DataFrame[Row](Dataset<Row>) as an Avro or Parquet-Encoded data file:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// +--------------------+------+-----+
// | mt_partition|tileId|index|
// +--------------------+------+-----+
// |6c6a6eb5-e631-467...| 5| 1|
// |6c6a6eb5-e631-467...| 6| 2|
// |4ce8fb63-3cc2-420...| 7| 10|
// |4ce8fb63-3cc2-420...| 8| 20|
// +--------------------+------+-----+
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-parquet-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
NoteUsage specifics
Avro and Parquet are columnar storage formats. Therefore, theDataFramecan have duplicates inmt_partitioncolumn. Rows group bymt_partitionand save each group in a single platform partition. Layer must-haveapplication/x-parquetorapplication/x-avro-binarycontent type in accordance to type.
Write data in raw format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains a column
data with message as string.
NoteRestrictions
Metadatamt_partitioncolumn must have unique values, otherwise will throw anIllegalArgumentExceptionexception. Layer must-haveapplication/octet-streamcontent type.
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-raw").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------
inputDataFrame
.writeLayer(catalogHrn, layerId)
.withDataConverter(new VersionedDataConverter {
override def serializeGroup(
rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]
): GroupedData[VersionedRowMetadata] = {
// One row per partition Id so we have just to process first element of the iterator
val bytes = "serializeGroup=>".getBytes ++ rows
.next()
.getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-raw").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-raw-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.withDataConverter(
new VersionedDataConverter() {
@Override
public GroupedData<VersionedRowMetadata> serializeGroup(
VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes =
ArrayUtils.addAll("serializeGroup=>".getBytes(), rows.next().getAs("data"));
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
sparkSession.stop();Write data in JSON format
The following snippet demonstrates how to write a
DataFrame[Row](Dataset<Row>) as a JSON row:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// +--------------------+------+-----+
// | mt_partition|tileId|index|
// +--------------------+------+-----+
// |6c6a6eb5-e631-467...| 5| 1|
// |6c6a6eb5-e631-467...| 6| 2|
// |4ce8fb63-3cc2-420...| 7| 10|
// |4ce8fb63-3cc2-420...| 8| 20|
// +--------------------+------+-----+
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();Write data in text format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains a column
text with message as string.
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// +--------------------+-----+
// | mt_partition| text|
// +--------------------+-----+
// |6c6a6eb5-e631-467...| 5_1|
// |6c6a6eb5-e631-467...| 6_2|
// |4ce8fb63-3cc2-420...| 7_10|
// |4ce8fb63-3cc2-420...| 8_20|
// +--------------------+-----+
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-text-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();Write data in csv format
The following snippet demonstrates how to write a
DataFrame[Row](Dataset<Row>) as a CSV row:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// +--------------------+------+-----+
// | mt_partition|tileId|index|
// +--------------------+------+-----+
// |6c6a6eb5-e631-467...| 5| 1|
// |6c6a6eb5-e631-467...| 6| 2|
// |4ce8fb63-3cc2-420...| 7| 10|
// |4ce8fb63-3cc2-420...| 8| 20|
// +--------------------+------+-----+
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();
NoteUsage specifics
The connector groups rows bymt_partitioncolumn value. In this case, must be implemented DataConverter inherited fromDataConvertertrait. Also, a versioned layer must-haveapplication/octet-streamcontent type.
Updated 22 days ago