GuidesChangelogData Inspector Library API Reference
Guides

How to read and write versioned layer data using the DataEngine

How to read and write versioned layer data using the DataEngine

Project dependencies

To run an application that uses the Data Client Library's DataEngine to read and write version data within a batch pipeline (Spark), use the spark-support modules as dependencies to your project.

libraryDependencies ++= Seq(
  "com.here.platform.data.client" %% "spark-support" % "1.22.33"
)
<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_2.13</artifactId>
        <version>1.22.33</version>
    </dependency>
</dependencies>

Reading versioned data

This snippet shows how to use the query API within a running Spark context to consume partitions:

import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.SparkSupport._
def countDataBytesUsingMultiplesWorkers(sc: SparkContext, catalog: HRN, layer: String): Long = {
  val masterActorSystem = DataClientSparkContextUtils.context.actorSystem

  // query current catalog for latest version
  val masterQueryApi = DataClient(masterActorSystem).queryApi(catalog)

  masterQueryApi.getLatestVersion().awaitResult() match {
    case Some(version) =>
      // Parallel require an in-memory sequence that need to be fully in spark.memory. A flatMap followed by
      // repartition allow to flush data into workers.
      val partitions: RDD[Partition] =
        sc.parallelize(Seq(layer))
          .flatMap { layer =>
            val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
            val workerQueryApi = DataClient(workerActorSystem).queryApi(catalog)

            workerQueryApi.getPartitionsAsIterator(version, layer).awaitResult()
          }
          .repartition(100)

      val partitionsAndData: RDD[(Partition, Array[Byte])] =
        partitions.mapPartitions({ partitions =>
          val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
          val readEngine = DataEngine(workerActorSystem).readEngine(catalog)

          partitions.map { partition =>
            partition -> readEngine.getDataAsBytes(partition).awaitResult()
          }
        })

      // process each partition and data
      val total: Long =
        partitionsAndData
          .map { case (_, data) => data.length.toLong }
          .reduce(_ + _)

      total

    case None => 0L
  }
}
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
private static long countDataBytesUsingMultiplesWorkers(
    JavaSparkContext jsc, HRN catalog, List<String> layerIds) {
  ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();

  // query current catalog for latest version
  QueryApi masterQueryApi = DataClient.get(masterActorSystem).queryApi(catalog);

  // Parallel require an in-memory sequence that need to be fully in spark.memory. A flatMap
  // followed by
  // repartition allow to flush data into workers.
  OptionalLong latestVersion =
      masterQueryApi.getLatestVersion(OptionalLong.empty()).toCompletableFuture().join();
  if (!latestVersion.isPresent()) return 0L;

  Long version = latestVersion.getAsLong();
  Optional<VersionedLayerContext> context = Optional.empty();

  JavaRDD<Partition> partitions =
      jsc.parallelize(layerIds)
          .flatMap(
              layerId -> {
                ActorSystem workerActorSystem =
                    DataClientSparkContextUtils.context().actorSystem();
                QueryApi workerQueryApi = DataClient.get(workerActorSystem).queryApi(catalog);

                return workerQueryApi
                    .getPartitionsAsIterator(version, layerId, AdditionalFields.AllFields())
                    .toCompletableFuture()
                    .join();
              })
          .repartition(100);

  JavaRDD<Map.Entry<Partition, byte[]>> partitionsAndData =
      partitions.mapPartitions(
          innerPartitions -> {
            ActorSystem workerActorSystem = DataClientSparkContextUtils.context().actorSystem();
            ReadEngine readEngine = DataEngine.get(workerActorSystem).readEngine(catalog);

            Map<Partition, byte[]> partitionsToBytesMap = new HashMap<>();
            while (innerPartitions.hasNext()) {
              Partition partition = innerPartitions.next();
              partitionsToBytesMap.put(
                  partition, readEngine.getDataAsBytes(partition).toCompletableFuture().join());
            }
            return partitionsToBytesMap.entrySet().iterator();
          });
  return partitionsAndData.map(entry -> (long) entry.getValue().length).reduce(Long::sum);
}

Writing versioned data

The following snippets demonstrate how to use the publish API within a running spark context.

Note

Sharing
The DataClientSparkContext object must not be shared between the master and the workers; each one needs its own instance.

import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.SparkSupport._
case class CustomData(partition: String, layer: String, data: Array[Byte])
def publishUsingMultipleWorkers(sc: SparkContext,
                                catalog: HRN,
                                layerIds: Seq[String],
                                partitions: Seq[CustomData]): Unit = {
  val masterActorSystem = DataClientSparkContextUtils.context.actorSystem

  // start commit on master
  val masterPublishApi = DataClient(masterActorSystem).publishApi(catalog)
  val latestVersion = masterPublishApi.getBaseVersion().awaitResult()
  // Please note that there can only be one active publication at a time per versioned layer.
  val token: BatchToken =
    masterPublishApi.startBatch2(latestVersion, Some(layerIds), dependencies = Nil).awaitResult()

  // send partitions to workers and upload data and metadata
  val commitParts: RDD[Done] =
    sc.parallelize(partitions)
      .mapPartitions({ partitions =>
        // this code will run for each worker to process multiples partitions
        val workerActorSystem = DataClientSparkContextUtils.context.actorSystem

        val workerPublishApi = DataClient(workerActorSystem).publishApi(catalog)
        val workerWriteEngine = DataEngine(workerActorSystem).writeEngine(catalog)

        val committedPartitions: Iterator[CommitPartition] =
          partitions.map { partition =>
            val newPartition =
              NewPartition(
                partition = partition.partition,
                layer = partition.layer,
                data = NewPartition.ByteArrayData(partition.data)
              )

            workerWriteEngine.put(newPartition).awaitResult()
          }

        workerPublishApi.publishToBatch(token, committedPartitions).awaitResult()

        Seq(Done).iterator
      })

  // execute the RDD
  commitParts.collect()

  // complete the commit
  masterPublishApi.completeBatch(token).awaitResult()
}
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
@SuppressWarnings("serial")
public static class CustomData implements Serializable {
  String partition;
  String layerId;
  byte[] data;

  CustomData(String partition, String layerId, byte[] data) {
    this.partition = partition;
    this.layerId = layerId;
    this.data = data;
  }
}
private static final int PARALLELISM = 3;

public static void publishUsingMultipleWorkers(
    JavaSparkContext jsc, HRN catalog, List<String> layerIds, List<CustomData> partitions) {
  ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();

  // start commit on master
  PublishApi masterPublishApi = DataClient.get(masterActorSystem).publishApi(catalog);

  // Get the latest catalog version of the catalog
  OptionalLong latestVersion = masterPublishApi.getBaseVersion().toCompletableFuture().join();

  // Start a publication batch on top of most recent catalog version. This commit has no upstream
  // catalog dependencies thus the second parameter is an empty collection.
  BatchToken token =
      masterPublishApi
          .startBatch2(latestVersion, Optional.of(layerIds), Collections.emptyList())
          .toCompletableFuture()
          .join();

  // send partitions to workers and upload data and metadata
  JavaRDD<Done> commitParts =
      jsc.parallelize(partitions, PARALLELISM)
          .mapPartitions(
              cdPartitions -> {
                // this code will run for each worker to process multiples partitions
                ActorSystem workerActorSystem =
                    DataClientSparkContextUtils.context().actorSystem();
                PublishApi workerPublishApi =
                    DataClient.get(workerActorSystem).publishApi(catalog);
                WriteEngine workerWriteEngine =
                    DataEngine.get(workerActorSystem).writeEngine(catalog);

                List<CommitPartition> committedPartitions = new ArrayList<>();
                while (cdPartitions.hasNext()) {
                  CustomData cdPartition = cdPartitions.next();
                  com.here.platform.data.client.javadsl.NewPartition newPartition =
                      new com.here.platform.data.client.javadsl.NewPartition.Builder()
                          .withPartition(cdPartition.partition)
                          .withData(cdPartition.data)
                          .withLayer(cdPartition.layerId)
                          .build();

                  workerWriteEngine
                      .put(newPartition)
                      .thenAccept(committedPartitions::add)
                      .toCompletableFuture()
                      .join();
                }
                Done done =
                    workerPublishApi
                        .publishToBatch(token, committedPartitions.iterator())
                        .toCompletableFuture()
                        .join();
                return Collections.singletonList(done).iterator();
              });

  // execute the RDD
  commitParts.collect();

  // complete the commit
  masterPublishApi.completeBatch(token).toCompletableFuture().join();
}