How to write volatile layer data
How to write volatile layer data
The spark-support module provides the class LayerDataFrameWriter, a custom
Spark
DataFrameWriter
for writing
DataFrames
to volatile layers.
Project Dependencies
If you want to create an application that uses the HERE platform Spark Connector to write data to volatile layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.
Formats
The spark connector provides write functionality for the following formats:
- Protobuf
- Avro
- Parquet
- Raw
- JSON
- Text
- Csv
NoteUsage specifics
ForRawformat, data converter must be implemented fromDataConvertertrait/interface and set usingwithDataConvertermethod. Note thatformatmethod is not required.
Write process
For volatile layers, DataFrame rows are grouped by the mt_partition column
to create the partition data. The data will be uploaded into the volatile layer
using the write engine and afterwards published using the Publish API. Also,
DataFrame can include additional metadata.
Note that having multiple rows for the same partition is allowed only for Avro
and Parquet format. For the other formats, this will throw an error.
Metadata fields
All provided metadata columns for volatile layer:
| Column name | Data Type | Meaning | Require |
|---|---|---|---|
mt_partition | String | ID of partition in HERE platform | Yes |
mt_timestamp | Long | Timestamp of creation | No |
mt_checksum | String | Checksum of payload | No |
mt_crc | String | CRC of payload | No |
mt_dataSize | Long | Size of payload | No |
mt_compressedDataSize | Long | Compressed size of payload | No |
NoteRestrictions of metadata
Metadata column type must be as in the table above, otherwise it will throw anIllegalArgumentExceptionexception. Extra fields that start withmt_prefix but are not provided are ignored during write operation.
Write data in protobuf format
The following snippet demonstrates how to manipulate and write a
DataFrame[Row](Dataset<Row>) as a Protobuf-Encoded data file:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, count, first, lit}
import org.apache.spark.sql.types.IntegerType
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate()
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// ------------------------
// | text|partition_name|
// ------------------------
// |value-1| partition-1|
// |value-2| partition-2|
// |value-1| partition-1|
// ------------------------
// Some computations
val computationDataFrame = inputDataFrame
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"))
// Casting column type and adding extra mt_timestamp column
val outDataFrame = computationDataFrame
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis))
outDataFrame.show()
// ------------------------------------------
// | text|count|mt_partition| mt_timestamp|
// ------------------------------------------
// |value-2| 1| partition-1|1569598607978|
// |value-1| 2| partition-2|1569598607978|
// ------------------------------------------
outDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-protobuf-layer";
Dataset<Row> inputDataset = loadDataFrame(sparkSession);
inputDataset.show();
// ------------------------
// | text|partition_name|
// ------------------------
// |value-1| partition-1|
// |value-2| partition-2|
// |value-1| partition-1|
// ------------------------
// Some computations
Dataset<Row> computationDataset =
inputDataset
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"));
// Casting column type and adding extra mt_timestamp column
Dataset<Row> outDataset =
computationDataset
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis()));
outDataset.show();
// ------------------------------------------
// | text|count|mt_partition| mt_timestamp|
// ------------------------------------------
// |value-2| 1| partition-1|1569598607978|
// |value-1| 2| partition-2|1569598607978|
// ------------------------------------------
JavaLayerDataFrameWriter.create(outDataset).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
NoteRestrictions
Metadatamt_partitioncolumn must have unique values, otherwise will throw anIllegalArgumentExceptionexception. Layer must haveapplication/x-protobufcontent type.
Write data in Avro and Parquet formats
The following snippet demonstrates how to write a
DataFrame[Row](Dataset<Row>) as an Avro or Parquet-Encoded data file:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// ---------------------------
// |tileId|index|mt_partition|
// ---------------------------
// | 5| 1| partition-1|
// | 6| 2| partition-1|
// | 7| 10| partition-2|
// | 8| 20| partition-2|
// ---------------------------
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-parquet-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
NoteUsage specifics
Avro and Parquet are columnar storage formats. Therefore, theDataFramecan have duplicates inmt_partitioncolumn. Rows group bymt_partitionand save each group in a single platform partition. Layer must haveapplication/x-parquetorapplication/x-avro-binarycontent type in accordance to type.
Write data in raw format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains a column
data with message as string.
NoteRestrictions
Metadatamt_partitioncolumn must have unique values, otherwise it will throw anIllegalArgumentExceptionexception. Layer must haveapplication/octet-streamcontent type.
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-raw").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------
inputDataFrame
.writeLayer(catalogHrn, layerId)
.withDataConverter(new VolatileDataConverter {
override def serializeGroup(rowMetadata: VolatileRowMetadata,
rows: Iterator[Row]): GroupedData[VolatileRowMetadata] = {
// One row per partition Id so we have just to process first element of the iterator
val bytes = "serializeGroup=>".getBytes ++ rows.next().getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-raw").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-raw-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.withDataConverter(
new VolatileDataConverter() {
@Override
public GroupedData<VolatileRowMetadata> serializeGroup(
VolatileRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes =
ArrayUtils.addAll("serializeGroup=>".getBytes(), rows.next().getAs("data"));
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
sparkSession.stop();Write data in JSON format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains column
intVal as integer, strVal as string.
NoteRestrictions
Metadatamt_partitioncolumn must have unique values, otherwise it will throw anIllegalArgumentExceptionexception. Layer must haveapplication/jsoncontent type.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-json").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// ------------------------------
// |mt_partition| intVal| strVal|
// ------------------------------
// | partition-1|[1]|str1|
// | partition-2|[2]|str2|
// | partition-3|[3]|str3|
// ------------------------------
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-json").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();Write data in text format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains a column
data with message as string.
NoteRestrictions
Metadatamt_partitioncolumn must have unique values, otherwise it will throw anIllegalArgumentExceptionexception. Layer must havetext/plaincontent type. While writing data, Text data source only supports a single column per row.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-text").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// -------------------
// |mt_partition| data|
// -------------------
// | partition-1|[1]|
// | partition-2|[2]|
// | partition-3|[3]|
// -------------------
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-text").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-text-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();Write data in csv format
The following snippet demonstrates how to write a DataFrame as a file with an
arbitrary format. In this example, the input DataFrame contains column
field1 as integer, field2 as string.
NoteRestrictions
Metadatamt_partitioncolumn must have unique values, otherwise it will throw anIllegalArgumentExceptionexception. Layer must havetext/csvcontent type.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-csv").getOrCreate()
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// ------------------------------
// |mt_partition| field1| field2|
// ------------------------------
// | partition-1|[1]|str1|
// | partition-2|[2]|str2|
// | partition-3|[3]|str3|
// ------------------------------
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-csv").getOrCreate();
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();
NoteUsage specifics
The connector groups rows bymt_partitioncolumn value. In this case, data converter must be implemented fromDataConvertertrait/interface and set usingwithDataConvertermethod. Layer must have must haveapplication/octet-streamcontent type.For
Rawformat, data converter must be implemented fromDataConvertertrait/interface and set usingwithDataConvertermethod. Note thatformatmethod is not required.
Updated 22 days ago