How to read and write versioned layer data using the DataEngine
How to read and write versioned layer data using the DataEngine
Project dependencies
To run an application that uses the Data Client Library's DataEngine to read
and write version data within a batch pipeline (Spark), use the spark-support
modules as dependencies to your project.
libraryDependencies ++= Seq(
"com.here.platform.data.client" %% "spark-support" % "1.22.33"
)<dependencies>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_2.13</artifactId>
<version>1.22.33</version>
</dependency>
</dependencies>Reading versioned data
This snippet shows how to use the query API within a running Spark context to consume partitions:
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.SparkSupport._
def countDataBytesUsingMultiplesWorkers(sc: SparkContext, catalog: HRN, layer: String): Long = {
val masterActorSystem = DataClientSparkContextUtils.context.actorSystem
// query current catalog for latest version
val masterQueryApi = DataClient(masterActorSystem).queryApi(catalog)
masterQueryApi.getLatestVersion().awaitResult() match {
case Some(version) =>
// Parallel require an in-memory sequence that need to be fully in spark.memory. A flatMap followed by
// repartition allow to flush data into workers.
val partitions: RDD[Partition] =
sc.parallelize(Seq(layer))
.flatMap { layer =>
val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
val workerQueryApi = DataClient(workerActorSystem).queryApi(catalog)
workerQueryApi.getPartitionsAsIterator(version, layer).awaitResult()
}
.repartition(100)
val partitionsAndData: RDD[(Partition, Array[Byte])] =
partitions.mapPartitions({ partitions =>
val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
val readEngine = DataEngine(workerActorSystem).readEngine(catalog)
partitions.map { partition =>
partition -> readEngine.getDataAsBytes(partition).awaitResult()
}
})
// process each partition and data
val total: Long =
partitionsAndData
.map { case (_, data) => data.length.toLong }
.reduce(_ + _)
total
case None => 0L
}
}import com.here.platform.data.client.spark.DataClientSparkContextUtils;
private static long countDataBytesUsingMultiplesWorkers(
JavaSparkContext jsc, HRN catalog, List<String> layerIds) {
ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();
// query current catalog for latest version
QueryApi masterQueryApi = DataClient.get(masterActorSystem).queryApi(catalog);
// Parallel require an in-memory sequence that need to be fully in spark.memory. A flatMap
// followed by
// repartition allow to flush data into workers.
OptionalLong latestVersion =
masterQueryApi.getLatestVersion(OptionalLong.empty()).toCompletableFuture().join();
if (!latestVersion.isPresent()) return 0L;
Long version = latestVersion.getAsLong();
Optional<VersionedLayerContext> context = Optional.empty();
JavaRDD<Partition> partitions =
jsc.parallelize(layerIds)
.flatMap(
layerId -> {
ActorSystem workerActorSystem =
DataClientSparkContextUtils.context().actorSystem();
QueryApi workerQueryApi = DataClient.get(workerActorSystem).queryApi(catalog);
return workerQueryApi
.getPartitionsAsIterator(version, layerId, AdditionalFields.AllFields())
.toCompletableFuture()
.join();
})
.repartition(100);
JavaRDD<Map.Entry<Partition, byte[]>> partitionsAndData =
partitions.mapPartitions(
innerPartitions -> {
ActorSystem workerActorSystem = DataClientSparkContextUtils.context().actorSystem();
ReadEngine readEngine = DataEngine.get(workerActorSystem).readEngine(catalog);
Map<Partition, byte[]> partitionsToBytesMap = new HashMap<>();
while (innerPartitions.hasNext()) {
Partition partition = innerPartitions.next();
partitionsToBytesMap.put(
partition, readEngine.getDataAsBytes(partition).toCompletableFuture().join());
}
return partitionsToBytesMap.entrySet().iterator();
});
return partitionsAndData.map(entry -> (long) entry.getValue().length).reduce(Long::sum);
}Writing versioned data
The following snippets demonstrate how to use the publish API within a running spark context.
NoteSharing
TheDataClientSparkContextobject must not be shared between the master and the workers; each one needs its own instance.
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.SparkSupport._
case class CustomData(partition: String, layer: String, data: Array[Byte])
def publishUsingMultipleWorkers(sc: SparkContext,
catalog: HRN,
layerIds: Seq[String],
partitions: Seq[CustomData]): Unit = {
val masterActorSystem = DataClientSparkContextUtils.context.actorSystem
// start commit on master
val masterPublishApi = DataClient(masterActorSystem).publishApi(catalog)
val latestVersion = masterPublishApi.getBaseVersion().awaitResult()
// Please note that there can only be one active publication at a time per versioned layer.
val token: BatchToken =
masterPublishApi.startBatch2(latestVersion, Some(layerIds), dependencies = Nil).awaitResult()
// send partitions to workers and upload data and metadata
val commitParts: RDD[Done] =
sc.parallelize(partitions)
.mapPartitions({ partitions =>
// this code will run for each worker to process multiples partitions
val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
val workerPublishApi = DataClient(workerActorSystem).publishApi(catalog)
val workerWriteEngine = DataEngine(workerActorSystem).writeEngine(catalog)
val committedPartitions: Iterator[CommitPartition] =
partitions.map { partition =>
val newPartition =
NewPartition(
partition = partition.partition,
layer = partition.layer,
data = NewPartition.ByteArrayData(partition.data)
)
workerWriteEngine.put(newPartition).awaitResult()
}
workerPublishApi.publishToBatch(token, committedPartitions).awaitResult()
Seq(Done).iterator
})
// execute the RDD
commitParts.collect()
// complete the commit
masterPublishApi.completeBatch(token).awaitResult()
}import com.here.platform.data.client.spark.DataClientSparkContextUtils;
@SuppressWarnings("serial")
public static class CustomData implements Serializable {
String partition;
String layerId;
byte[] data;
CustomData(String partition, String layerId, byte[] data) {
this.partition = partition;
this.layerId = layerId;
this.data = data;
}
}
private static final int PARALLELISM = 3;
public static void publishUsingMultipleWorkers(
JavaSparkContext jsc, HRN catalog, List<String> layerIds, List<CustomData> partitions) {
ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();
// start commit on master
PublishApi masterPublishApi = DataClient.get(masterActorSystem).publishApi(catalog);
// Get the latest catalog version of the catalog
OptionalLong latestVersion = masterPublishApi.getBaseVersion().toCompletableFuture().join();
// Start a publication batch on top of most recent catalog version. This commit has no upstream
// catalog dependencies thus the second parameter is an empty collection.
BatchToken token =
masterPublishApi
.startBatch2(latestVersion, Optional.of(layerIds), Collections.emptyList())
.toCompletableFuture()
.join();
// send partitions to workers and upload data and metadata
JavaRDD<Done> commitParts =
jsc.parallelize(partitions, PARALLELISM)
.mapPartitions(
cdPartitions -> {
// this code will run for each worker to process multiples partitions
ActorSystem workerActorSystem =
DataClientSparkContextUtils.context().actorSystem();
PublishApi workerPublishApi =
DataClient.get(workerActorSystem).publishApi(catalog);
WriteEngine workerWriteEngine =
DataEngine.get(workerActorSystem).writeEngine(catalog);
List<CommitPartition> committedPartitions = new ArrayList<>();
while (cdPartitions.hasNext()) {
CustomData cdPartition = cdPartitions.next();
com.here.platform.data.client.javadsl.NewPartition newPartition =
new com.here.platform.data.client.javadsl.NewPartition.Builder()
.withPartition(cdPartition.partition)
.withData(cdPartition.data)
.withLayer(cdPartition.layerId)
.build();
workerWriteEngine
.put(newPartition)
.thenAccept(committedPartitions::add)
.toCompletableFuture()
.join();
}
Done done =
workerPublishApi
.publishToBatch(token, committedPartitions.iterator())
.toCompletableFuture()
.join();
return Collections.singletonList(done).iterator();
});
// execute the RDD
commitParts.collect();
// complete the commit
masterPublishApi.completeBatch(token).toCompletableFuture().join();
}Updated 18 days ago