How to write interactive map layer data
How to write interactive map layer data
The spark-support module provides the class LayerDataFrameWriter, a custom
Spark
DataFrameWriter
for writing
DataFrames
to interactive map layers.
Project dependencies
If you want to create an application that uses the HERE platform Spark Connector
to write data to an interactive map layer, add the required dependencies to your
project as described in chapter
Dependencies for Spark Connector.
Formats
The spark connector provides write functionality for an interactive map layer
specific format. Note that interactive map layer format/schema is based on
GeoJSON and therefore inherently flexible.
Write process
For interactive map layers, DataFrame rows are published directly to the
interactive map layer using the Publish API. Objects are "upserted" into the
layer, meaning they will be updated if they already exist and created if they
don't.
Data fields
DataFrames to be written to an interactive map layer must follow this
structure:
| Column name | Data Type | Meaning |
|---|---|---|
mt_id | String | OID of the object |
geometry | ROW<STRING, STRING> | Object's geometry, first field will contain type, second field will contain coordinates |
properties | MAP<STRING, STRING> | Object's properties in reduced format |
custom_members | MAP<STRING, STRING> | Non-standard top-level fields in reduced format |
mt_tags | ARRAY | The object's tags |
Note that additional columns can be present, but will be ignored.
Also note
NoteRestrictions for
Stringvalues in properties and custom members maps
If you want to write a String value to either the properties or custom_members
maps, make sure that double quotes are at the beginning and end of the String
value, as part of the value! This is unfortunately necessary in order for
JSON serialization to work correctly during writing.
Write data to interactive map layer
The following snippet demonstrates how to create and write a
DataFrame[Row](Dataset<Row>) to an interactive map layer:
import com.here.platform.data.client.model.geojson.{Feature, Geometry, Point}
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.internal.InteractiveMapPartitionHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{Row, SparkSession}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
val numObjectsInX = 360
val numObjectsInY = 18
val newRows = ListBuffer[Row]()
val schema = InteractiveMapPartitionHelper.writeSchema
log.info("Generating test data, " + (numObjectsInX * numObjectsInY) + " objects")
for (x <- 0 until numObjectsInX)
for (y <- 0 until numObjectsInY) {
val oid = "X" + x + "_Y" + y
val coords = List[Double](
((360.0 / numObjectsInX.toDouble) * x) - 180.0,
((180.0 / numObjectsInY.toDouble) * y) - 90.0 // Stay in Mercator's bounds.
)
val geo: Geometry = new Point.Builder().withCoordinates(coords.map(Double.box).asJava).build
// NOTE: You MUST ensure that a properties Map is set when creating Features this way!
// It may be empty, but it absolutely MUST be there.
val properties = mutable.Map[String, Any]()
properties += ("info" -> ("Testobject #" + (y * numObjectsInY + x).toString))
properties += ("row" -> y)
properties += ("col" -> x)
val feature = new Feature.Builder()
.withId(oid)
.withGeometry(geo)
.withProperties(properties.asJava)
.withCustomMember("foo", "bar")
.build
newRows += InteractiveMapPartitionHelper.toRow(feature, schema)
}
log.info("Creating dataframe from test data.")
val writeDF = sparkSession.createDataFrame(
sparkSession.sparkContext.parallelize(newRows.toSeq),
schema
)
log.info("Writing test data to layer " + layerId)
writeDF
.writeLayer(catalogHrn, layerId)
.option("olp.connector.write-batch-size", 1000)
.save()
import com.here.platform.data.client.model.geojson.Feature;
import com.here.platform.data.client.model.geojson.Geometry;
import com.here.platform.data.client.model.geojson.Point;
import com.here.platform.data.client.spark.internal.InteractiveMapPartitionHelper;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
final int numObjectsInX = 360;
final int numObjectsInY = 18;
List<Row> newRows = new ArrayList<>(numObjectsInX * numObjectsInY);
// Create data.
// It's easier to create the Features and make Rows out of them than to create the rows
// directly.
log.info("Generating test data, " + (numObjectsInX * numObjectsInY) + " objects");
StructType schema = InteractiveMapPartitionHelper.writeSchema();
for (int x = 0; x < numObjectsInX; x++) {
for (int y = 0; y < numObjectsInY; y++) {
String oid = "X" + x + "_Y" + y;
List<Double> coordinates = new ArrayList<>(2);
coordinates.add(((360.0 / (double) numObjectsInX) * x) - 180.0);
coordinates.add(((180.0 / (double) numObjectsInY) * y) - 90.0);
Geometry geo = new Point.Builder().withCoordinates(coordinates).build();
// NOTE: You MUST ensure that a properties Map is set when creating Features this way!
// It may be empty, but it absolutely MUST be there.
Map<String, Object> properties = new HashMap<>();
properties.put("info", new StringBuilder("Testobject #").append(y * numObjectsInY + x));
properties.put("row", y);
properties.put("col", x);
Feature f =
new Feature.Builder().withGeometry(geo).withId(oid).withProperties(properties).build();
Row row = InteractiveMapPartitionHelper.toRow(f, schema);
newRows.add(row);
}
}
// Create a dataframe from the created rows.
log.info("Creating dataframe from test data.");
Dataset<Row> writeDF =
sparkSession
.createDataFrame(
JavaSparkContext.fromSparkContext(sparkSession.sparkContext()).parallelize(newRows),
schema)
.drop("mt_datahub");
// Now do the actual writing.
log.info("Writing test data to layer " + layerId);
JavaLayerDataFrameWriter.create(writeDF)
.writeLayer(catalogHrn, layerId)
.option("olp.connector.write-batch-size", 1000)
.save();
long res = writeDF.count();
Updated 22 days ago