How to read index layer data
How to read index layer data
The Data Client Library provides the class LayerDataFrameReader, a custom
Spark
DataFrameReader
for creating
DataFrames
that contain the data for all supported layer type including index layer.
All the formats supported by
DataFrameReader
are also supported by the LayerDataFrameReader. Additionally, formats such as
Apache Avro, Apache Parquet, Protobuf and raw byte arrays (octet-stream).
When you are reading from an index layer certain restrictions apply. See Get Data from an Index Layer which contains information about restrictions and known limitations.
Read process
Read operation works according to the following steps:
- Spark connector starts with a first communication with the server to get some useful information. For example layer type, layer schema, layer encoding format, etc.
- Partitions within the layer get filtered using the provided filter query. If the query is not provided, the value "timestamp=ge=0" will be used by default, and it would mean that all the partitions will be matched.
- At this stage, we know the layer format. We can now create its Spark corresponding file format and with partition data, we have an iterator of rows (records).
- Some implicit columns will be added to each row depending on the layer type and partition metadata.
- The resulting rows will be handed over to the Spark framework to return the
finalized
DataFrame.
Read with pagination
Spark supports query index data in parallel, just define quantity of desired parts and add option olp.connector.query-parallelism to reader.
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.format("raw")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 1200000)
val df: DataFrame = reader.load()Dataset<Row> df =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.format("raw")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
.load();Dataframe columns
Besides the user-defined columns which derive from the partition data, Spark connector provides additional columns used to represent the data partitioning information and partition payload attributes.
Data columns
Corresponds to user defined columns and derives from the partition data.
Layer partitioning columns
Corresponds to user defined index layer partitioning columns. They have the same
names as the layer definition but with the idx_ prefix following the type
conversions as defined below:
| Index type | Data Type |
|---|---|
| bool | Boolean |
| int | Long |
| string | String |
| HERETile | Long |
| HERETime | Long |
Partition payload attribute columns
| Column name | Data Type | Meaning |
|---|---|---|
mt_metadata | Map[String, String] | Metadata of partition |
mt_timestamp | Long | Timestamp of creation (UTC) |
mt_checksum | String | Checksum of payload |
mt_crc | String | CRC of payload |
mt_dataSize | Long | Size of payload |
mt_compressedDataSize | Long | Compressed size of payload |
Project Dependencies
If you want to create an application that uses the HERE platform Spark Connector to read data from index layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.
Read Parquet-Encoded Data
The following snippet demonstrates how to access a Parquet-encoded DataFrame
from an index layer of a catalog. Note that the parquet schema is expected to be
bundled with the data. Therefore, you don't need to specify the format
explicitly.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing parquet-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
val reader = sparkSession
.readLayer(catalogHrn, layerId)
//.format("parquet")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 1200000)
val df = reader.load()
df.printSchema()
df.show()
val messagesWithAtLeastOneSignRecognition = df
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0")
val count = messagesWithAtLeastOneSignRecognition.count()import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of an index layer containing parquet-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
Dataset<Row> df =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
// .format("parquet")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.query-parallelism", 100)
.load();
long messagesWithAtLeastOneSignRecognitionCount =
df.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0")
.count();Read Avro-Encoded Data
The following snippet demonstrates how to access an Avro-encoded DataFrame
from an index layer of a catalog. Note that the avro schema is expected to be
bundled with the data. Therefore, you don't need to specify the format
explicitly.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing avro-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
val reader = sparkSession
.readLayer(catalogHrn, layerId)
//.format("avro")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.query-parallelism", 100)
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 1200000)
val df: DataFrame = reader.load()
val messagesWithAtLeastOneSignRecognition = df
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0")
val count = messagesWithAtLeastOneSignRecognition.count()import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of an index layer containing avro-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
Dataset<Row> df =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
// .format("avro")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.query-parallelism", 100)
.load();
Dataset<Row> messagesWithAtLeastOneSignRecognition =
df.select("pathEvents.signRecognition").where("size(pathEvents.signRecognition) > 0");
long count = messagesWithAtLeastOneSignRecognition.count();Read Protobuf-Encoded Data
The following snippet demonstrates how to access a Protobuf-encoded DataFrame
from an index layer of a catalog. Note that the protobuf schema is expected to
be referenced from the layer configuration. Therefore, you don't need to specify
the format explicitly.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.size
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing protobuf-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
val reader = sparkSession
.readLayer(catalogHrn, layerId)
//.format("protobuf")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 1200000)
val df = reader.load()
val sqlContext = sparkSession.sqlContext
import sqlContext.implicits._
val messagesWithAtLeastOneSignRecognition = df
.select("mt_dataHandle", "message.pathEvents.signRecognition")
.where("size(message.pathEvents.signRecognition) > 0")
val dataHandle = messagesWithAtLeastOneSignRecognition
.select("mt_dataHandle")
.head()
.getString(0)
// Protobuf schema is of an SDII MessageList, so `size()` is used to compute the
// length of the `WrappedArray` (one array per SDII Message)
// Resulting `DataFrame` is a single row with a count, so `.head().getInt(0)` is used
// to retrieve the value
val count: Int = messagesWithAtLeastOneSignRecognition
.select(size($"signRecognition"))
.head()
.getInt(0)import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of an index layer containing protobuf-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
Dataset<Row> df =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
// .format("protobuf")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
.load();
Dataset<Row> messagesWithAtLeastOneSignRecognition =
df.select("mt_dataHandle", "message.pathEvents.signRecognition")
.where("size(message.pathEvents.signRecognition) > 0");
String dataHandle =
messagesWithAtLeastOneSignRecognition.select("mt_dataHandle").head().getString(0);
// Protobuf schema is of an SDII MessageList, so `size()` is used to compute the
// length of the `WrappedArray` (one array per SDII Message)
// Resulting `DataFrame` is a single row with a count, so `.head().getInt(0)` is used
// to retrieve the value
int count =
messagesWithAtLeastOneSignRecognition
.select(size(new Column("signRecognition")))
.head()
.getInt(0);Note that to read protobuf data from a layer, the schema must be specified in
the layer configuration and needs to be available on Artifact Service.
Furthermore the schema must have a ds variant. For more information on how to
maintain schemas, see the
Archetypes Developer's Guide.
Read Csv-Encoded Data
The following snippet demonstrates how to access a Csv-encoded DataFrame from
an index layer of a catalog. In this example, the csv row contains columns
field1 as integer and field2 as string.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of index layer)
val df = sparkSession
.readLayer(catalogHrn, layerId)
.query("eventType==SignRecognition")
.load()
df.select("idx_eventType", "field1").where("field1 > 0").show()
df.printSchema()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query("eventId=in=(1,2,3)")
.load();
dataFrame.select("idx_eventId", "field1").where("field1 > 0").show();
dataFrame.printSchema();Read Text-Encoded Data
The following snippet demonstrates how to access a Text-encoded DataFrame from
an index layer of a catalog. In this example, the row object contains field data
as string.
NoteRestrictions
While reading Text data, each line becomes each row that has stringvaluecolumn by default. Therefore, Text data source has only a single columnvalueper row.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of index layer)
val df = sparkSession
.readLayer(catalogHrn, layerId)
.query("eventType==SignRecognition")
.load()
df.select("idx_eventId", "value").show()
df.printSchema()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query("eventId=in=(1,2,3)")
.load();
dataFrame.select("idx_eventId", "value").show();
dataFrame.printSchema();Read JSON-Encoded Data
The following snippet demonstrates how to access a JSON-encoded DataFrame from
an index layer of a catalog. In this example, the JSON object contains property
intVal as integer and strVal as string.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of index layer)
val df = sparkSession
.readLayer(catalogHrn, layerId)
.query("eventType==SignRecognition")
.load()
df.select("idx_eventType", "intVal").where("intVal > 0").show()
df.printSchema()import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query("eventId=in=(1,2,3)")
.load();
dataFrame.select("idx_eventId", "intVal").where("intVal > 0").show();
dataFrame.printSchema();Read Other Formats
The following snippet demonstrates how to access data in any arbitrary format from an index layer of a catalog:
import org.apache.spark.sql._
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
val schema: StructType = new StructType(
Array[StructField](
StructField("mt_dataHandle", DataTypes.StringType, nullable = false, Metadata.empty),
StructField("signRecognitionCount", DataTypes.IntegerType, nullable = false, Metadata.empty)
))
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing protobuf-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
/// [spark-index-query-withparts]
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.format("raw")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 1200000)
val df: DataFrame = reader.load()
/// [spark-index-query-withparts]
val dfSignRecognitionCount: DataFrame = df.flatMap { row: Row =>
val messageList: mutable.Buffer[SdiiMessage.Message] =
SdiiMessageList.MessageList.parseFrom(row.getAs[Array[Byte]]("data")).getMessageList.asScala
messageList.map { message =>
RowFactory.create(row.getAs[Object]("mt_dataHandle"),
message.getPathEvents.getSignRecognitionCount.asInstanceOf[Object])
}
}(ExpressionEncoder(schema))
val messagesWithAtLeastOneSignRecognition = dfSignRecognitionCount
.select("mt_dataHandle", "signRecognitionCount")
.where("signRecognitionCount > 0")
val dataHandles = messagesWithAtLeastOneSignRecognition
.map[String]((r: Row) => r.getAs[String]("mt_dataHandle"))(Encoders.STRING)
.dropDuplicates()
.collectAsList()
val count = messagesWithAtLeastOneSignRecognition.count()import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StructType schema =
new StructType(
new StructField[] {
new StructField("mt_dataHandle", DataTypes.StringType, false, Metadata.empty()),
new StructField(
"signRecognitionCount", DataTypes.IntegerType, false, Metadata.empty())
});
// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
/// [spark-index-query-withparts]
Dataset<Row> df =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.format("raw")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
.load();
/// [spark-index-query-withparts]
Dataset<Row> dfSignRecognitionCount =
df.flatMap(
(FlatMapFunction<Row, Row>)
row ->
SdiiMessageList.MessageList.parseFrom(row.<byte[]>getAs("data"))
.getMessageList().stream()
.map(
m ->
RowFactory.create(
row.getAs("mt_dataHandle"),
m.getPathEvents().getSignRecognitionCount()))
.iterator(),
ExpressionEncoder.apply(schema));
Dataset<Row> messagesWithAtLeastOneSignRecognition =
dfSignRecognitionCount
.select("mt_dataHandle", "signRecognitionCount")
.where("signRecognitionCount > 0");
List<String> dataHandles =
messagesWithAtLeastOneSignRecognition
.map((MapFunction<Row, String>) row -> row.getAs("mt_dataHandle"), Encoders.STRING())
.dropDuplicates()
.collectAsList();
long count = messagesWithAtLeastOneSignRecognition.count();Known issues
DataFramecontains the columns representing the index layer structure definition but the relative location of these columns is at the very end, where they should be located before the metadata columns.- If these column values are not present we use some defaults instead of
NULLvalues. - Location of these columns should be at the very end of the row but they are located right after the payload columns.
Note
rawformat refers toapplication/octet-streamin layer config and not to be confused with raw layer config.- For information on RSQL, see RSQL.
Updated 21 days ago