Interface with the Data API via Spark
Interface with the Data API via Spark
When you use this processing library, you implement Compilers,
mix in the InputLayers and OutputLayers traits. These interfaces define
data structures for the catalog and layer IDs. By implementing
these interfaces, you tell the processing library which input catalogs and
input layers each compiler requires. The processing library queries the input
catalogs accordingly and provides the metadata to the compiler. In addition,
the processing library requires each compiler to specify the output layers
that it produces.
The Data Processing Library uses this information to implement incremental
publishing via the Publisher, where output layers specified in each compiler
are queried before publishing, to detect:
- which partitions contain new payloads
- which partitions are unchanged
- which partitions are deleted
Since there is only one output catalog, it is not necessary to specify its
identifier in application.conf or when you run the application. However,
partition keys contain a catalog identified and you have to use the
Default.OutCatalogId value when your compiler products output keys.
Input layers and catalogs can be shared across DriverTasks; the processing
library optimizes this by querying the Data API only once. However, each layer
of the output catalog can be produced by one task only. This is a
requirement to implement incremental publishing: the Publisher collects all
the metadata of a layer candidate to be published in one single place to
perform the comparison, the conditional upload, and the multipart commit
properly.
It is a mistake to have two or more tasks specify the same output layer, as the layer would be fully overwritten by the second task, resulting in an invalid output.
The following sections describe important internals of the Catalog and
Publisher, which the Driver operates. While you do not need to directly
operate these internals, this information is necessary to understand how input
catalogs are accessed and how payloads are pushed to the output catalog.
Query a catalog
The com.here.platform.data.processing.catalog package contains APIs to access
catalogs via Spark RDDs.
The Catalog trait provides convenient access to a Data API catalog via Spark.
The following operations are supported, via the Data Client Library:
- Snapshot queries: construct an RDD of all the metadata of the catalog at a given version, eventually restricted to a set of layers.
- Change queries: construct an RDD of changed metadata between two versions, eventually restricted to a set of layers.
- Commit: regroup (Spark
coalesce) an RDD of (updated) metadata into a fixed number of parts, upload them in parallel, and perform a multipart commit. - Configuration: easily access catalog configuration.
Most queries, except for configuration and queries for the latest version, are
performed in parallel by Spark worker nodes. This ensures that metadata is not
concentrated in the Driver, preventing a bottleneck that would hinder
scalability.
Publish and commit to a catalog
The com.here.platform.data.processing.publisher package provides higher-level
functionality to publish the output payloads at the end of data processing and
then commit the result. The Publisher class provides two methods to publish
the output of compilation: full snapshot publishing and
incremental publishing. In both cases, the Publisher requires the following
input:
- an RDD of output keys and payloads, which are the candidates to be published
- an RDD of metadata for the output catalog, which contains what was already published
The Publisher performs the following steps:
- Joins the payloads through the keys (partition + layer), that are candidates to be published with the metadata already published.
- Calculates the hashes of the payloads.
- Discards all the payloads if their hashes correspond to those already published; discarding all output data that has not changed.
- Uploads all the payloads that are actually new via an
Uploader, generating new metadata entries for them. - Returns an RDD of metadata to be committed to the
Catalog.
However, the two publishing methods vary in that:
- Full snapshot publishing deletes each entry of the output catalog which was not explicitly provided in the input, in the resulting commit.
- Incremental publishing does not modify partitions that were not provided in the output catalog.
Changes provided to the Publisher, either a new partition content or a delete
request, are applied on top of the existing partitions.
The publisher merely performs incremental publishing based on hash differences, it is not an incremental compiler.
In both cases:
- Payloads of newly-introduced keys are uploaded and committed as new metadata.
- Payloads of keys already present in the output are discarded if not modified, otherwise they are uploaded and committed as changed metadata.
- Empty payloads of keys already present in the output become deleted metadata.
It's important to note that all of this processing takes place in Spark worker
nodes: hash calculation and comparison, eventual uploading, generation of the
metadata to be committed. The RDD with the resulting metadata can then be
passed to the Catalog for the actual commit.
State layer
The library requires the output catalog to have an additional layer
configured with generic partitioning scheme, for internal use. Applications
cannot publish data to this layer. The layer ID is configurable but, its
default name is: state layer.
The state layer is used by stateful compilation patterns to persist some RDDs
and retrieve them on the next run.
Typically, within these RDDs, the input-output dependency graph is persisted.
This graph specifies which input partition affects which output partition. This
information is stored per-DriverTask and is needed in incremental compilation
to identify which output partitions are candidates to be recompiled and
republished.
Additionally, Fingerprints, that are required to guarantee the correctness of
incremental runs, are also stored in this layer.
Updated 21 days ago