How to build a batch pipeline with Maven archetypes (Java)
How to build a Batch Pipeline with Maven archetypes (Java)
To build a Batch Pipeline using the Data Processing Library, you use the SDK Maven Archetypes to create a skeleton for the project. The HERE platform is used to manage credentials, to create a catalog and manage access rights. The SDK Maven Archetypes is used to create a skeleton of the project.
This example demonstrates how to create a pipeline that reads the Road Topology & Geometry layer in the HERE Map Content catalog and then writes the number of segment references (cardinality) for every topology node in each input partition.
Credentials
There are two types of credentials you require:
- Platform credentials provide access to the
platform API. First, create a new application at
Create an app.
Once the application is created, click Create a key to download
the credentials. By default, Data Processing Library looks for
credentials in the
$HOME/.here/credentials.propertiesfile. Make sure your credentials file is placed in this location. - Repository credentials enable you to access the
repository where the Data Processing Library is. Go to Access tokens and click Create token. This downloads the
settings.xmlfile. Copy this file to the$HOME/.m2/folder.
Create the output catalog
First, create a new catalog to serve as the output catalog for the
pipeline. The catalog has one layer where, for each partition of the
Road Topology & Geometry layer, there is a partition containing the
cardinalities of the topology nodes in that partition. You also need
one additional layer, state, which is
reserved for the Data Processing Library.
Log into the platform. Select the Data tab and do the following:
- Click Add new catalog.
- Specify a CATALOG NAME, and a CATALOG ID for your catalog, such as batch-processing-quickstart-username.
- Next, add a CATALOG SUMMARY and a CONTACT EMAIL.
- Click Save and wait for the Data API to create your new catalog.
Then, give your application read/write access to the catalog, as follows:
- Select your catalog by searching for its name in the Search for data box.
- Go to Sharing, and in Manage Sharing select SHARE CATALOG WITH App. Insert your application ID, click Grant and check read and write.
- Finally, click Grant to enable your changes.
Add layers to the catalog:
- Click Add new layer and create a layer with
node-cardinalityas its ID. You can usenode-cardinalityas the layer's name too, or choose a different, human readable name. - You need a HERE Tile layer, and the zoom level must be the same as the input Road Topology & Geometry layer, 12. Select Versioned for Layer Type, which you must use for every layer processed by a batch pipeline.
- Keep the default
Content Typeofapplication/x-protobufso you can use Protobuf to encode the partitions. Leave the Schema field set toNone. - Then, click Save to complete the layer creation.
- Proceed with a second layer,
state, and configure it according to the second row of the following table, which lists the configuration of all layers in the catalog.
| Layer ID | Partitioning | Zoom Level | Layer Type | Content Type | Schema |
|---|---|---|---|---|---|
| node-cardinality | HERE Tile | 12 | Versioned | application/x-protobuf | None |
| state | Generic | N/A | Versioned | application/octet-stream | None |
The catalog is now fully configured. Proceed with creating a project.
Create a project
The Data SDK includes Maven Archetypes to simplify the process of creating new batch pipelines. Using the Maven Archetypes, you can build a complete project structure using a few shell commands. The archetype automatically generates POM files that include all of the basic dependencies, sample configuration files, and source files you can edit to implement your own logic. You need to create at least three projects:
- a top-level project, for convenience, to compile all sub-projects with a single POM file
- a nested Schema project, to build Java/Scala bindings for the Protocol Buffers schema
- a Process project, to build the processing logic
The following steps assume that Maven is installed and the mvn executable is
in your PATH variable. You must run all of the commands below from a bash
shell. The tree command is used to show the folder structures.
Alternatively, you can use ls -R as a replacement.
First, create a top-level project named nodecardinality by running the
following command, press ENTER to confirm:
$ pwd
~/projects
$ mvn archetype:generate -DarchetypeGroupId=org.codehaus.mojo.archetypes \
-DarchetypeArtifactId=pom-root \
-DarchetypeVersion=1.1 \
-DgroupId=com.example \
-DartifactId=nodecardinality \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinalityThis creates a nodecardinality folder in the current directory, containing
the following files:
$ pwd
~/projects
$ tree
.
`-- nodecardinality
`-- pom.xml
1 directory, 1 fileSub-projects are created from within this folder. Navigate to the
nodecardinality folder to create the sub-projects. First create a
Model project by running the following command, press ENTER to confirm:
$ pwd
~/projects/nodecardinality
$ mvn archetype:generate -DarchetypeGroupId=com.here.platform.schema \
-DarchetypeArtifactId=project_archetype \
-DarchetypeVersion=RELEASE \
-DgroupId=com.example.nodecardinality \
-DartifactId=schema \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinality.schema \
-DmajorVersion=0For specific documentation about the latest version of the archetype included in the SDK, see the Archetypes Developer Guide.
This creates a project template in the nodecardinality/schema folder
containing a project to build the schema for the output catalog:
$ pwd
~/projects/nodecardinality
$ tree
.
|-- pom.xml
`-- schema
|-- ds
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| `-- resources
| |-- ResourcesReadMe.txt
| `-- renderers
| `-- ReadMe.txt
|-- java
| `-- pom.xml
|-- pom.xml
|-- proto
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| |-- proto
| | `-- com
| | `-- example
| | `-- nodecardinality
| | `-- schema
| | `-- v0
| | `-- schema.proto
| `-- resources
| `-- description.md
|-- scala
| `-- pom.xml
`-- schema.yml
20 directories, 13 filesFinally, still within the nodecardinality folder, run the following
command to create a processor template including a
Direct1ToNCompiler,
press ENTER to confirm:
$ pwd
~/projects/nodecardinality
$ mvn archetype:generate -DarchetypeGroupId=com.here.platform \
-DarchetypeArtifactId=batch-direct1ton-java-archetype \
-DarchetypeVersion=RELEASE \
-DgroupId=com.example.nodecardinality \
-DartifactId=processor \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinality.processorFor specific documentation about the latest version of the archetype included in the SDK, see the Archetypes Developer Guide.
An additional processor folder is now added to the nodecardinality project:
$ pwd
~/projects/nodecardinality
$ tree
.
|-- pom.xml
|-- processor
| |-- config
| | |-- pipeline-config.conf
| | `-- pipeline-job.conf
| |-- pom.xml
| `-- src
| `-- main
| |-- java
| | `-- com
| | `-- example
| | `-- nodecardinality
| | `-- processor
| | |-- Compiler.java
| | |-- CompilerConfig.java
| | |-- IntermediateData.java
| | `-- Main.java
| `-- resources
| |-- application.conf
| `-- log4j.properties
`-- schema
|-- ds
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| `-- resources
| |-- ResourcesReadMe.txt
| `-- renderers
| `-- ReadMe.txt
|-- java
| `-- pom.xml
|-- pom.xml
|-- proto
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| |-- proto
| | `-- com
| | `-- example
| | `-- nodecardinality
| | `-- schema
| | `-- v0
| | `-- schema.proto
| `-- resources
| `-- description.md
|-- scala
| `-- pom.xml
`-- schema.yml
30 directories, 22 filesSchema sub-project
The nodecardinality/schema folder contains the skeleton of a Maven project
that builds Java and Scala libraries (usually referred to as bindings) to
de/serialize partitions encoded as Protos. This is necessary to encode
partitions in the output node-cardinality layer as Protobuf and
to specify a custom partition schema.
In the project folder, there are the following components:
- the main POM file,
pom.xml, used to compile the project - a
javafolder containing a POM file to build the Java bindings for the protocol buffers - a
scalafolder containing a POM file to build the Scala bindings for the protocol buffers - a
dsfolder containing a sub-project to bundle the resulting bindings and Protobuf definitions in a ZIP file, that can be published to the Platform Artifactory repository to enable the decoding of partitions from the platform portal - a
protofolder containing the Protobuf definitions. To specify the output schema, you need to customize this sub-project.
To create a custom Protobuf schema, add .proto files to the
nodecardinality/schema/proto/src folder. For more information on Protobuf, see the
Protocol Buffers Documentation.
The skeleton project you have just created already contains a .proto file
you can edit to quickly define the schema of the output partitions.
Open the
nodecardinality/schema/proto/src/main/proto/com/example/nodecardinality/schema/v0/schema.proto
file and search for main message definition:
syntax = "proto3";
package com.example.nodecardinality.schema.v0;
// Any dependent resources should be declared in the main pom, and the files referenced here:
//import "com/company/dependentGroupId/filename.proto";
// MainProtobufMessage is a placeholder, must match package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
int32 lat = 1;
int32 lon = 2;
}Change the name of the main message from MainProtobufMessage to
NodeCardinalityPartition, remove the sample fields lat and lon, and add a
repeated field named node_cardinality of type NodeCardinality.
Then, define an auxiliary message type NodeCardinality with two fields, the
ID of the node (id) and the cardinality of the node (cardinality). This new
Protobuf definition looks like this:
syntax = "proto3";
package com.example.nodecardinality.schema.v0;
// Any dependent resources should be declared in the main pom, and the files referenced here:
//import "com/company/dependentGroupId/filename.proto";
message NodeCardinalityPartition {
repeated NodeCardinality node_cardinality = 1;
}
message NodeCardinality {
string id = 1;
uint32 cardinality = 2;
}Since you changed the name of the main message, remember to update the
configuration used to build the Schema bundle. Open the POM file of the ds
sub-project (nodecardinality/schema/ds/pom.xml) and locate the
configuration for the layer-manifest-plugin:
<plugin>
<groupId>com.here.platform.schema.maven_plugins</groupId>
<artifactId>layer-manifest-plugin</artifactId>
<version>${here.plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>write-manifest</goal>
</goals>
</execution>
</executions>
<configuration>
<mainMessage>com.example.nodecardinality.schema.v0.MainProtobufMessage</mainMessage>
<inputDir>${project.build.directory}/proto</inputDir>
<writeManifest>true</writeManifest>
</configuration>
</plugin>Change the mainMessage path in the plugin configuration, replacing
com.example.nodecardinality.schema.v0.MainProtobufMessage with
com.example.nodecardinality.schema.v0.NodeCardinalityPartition:
<plugin>
<groupId>com.here.platform.schema.maven_plugins</groupId>
<artifactId>layer-manifest-plugin</artifactId>
<version>${here.plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>write-manifest</goal>
</goals>
</execution>
</executions>
<configuration>
<mainMessage>com.example.nodecardinality.schema.v0.NodeCardinalityPartition</mainMessage>
<inputDir>${project.build.directory}/proto</inputDir>
<writeManifest>true</writeManifest>
</configuration>
</plugin>Now compile the Schema project by running mvn install from
the nodecardinality/schema folder. Alternatively, run
this command from the top-level project.
Two libraries are built, schema_v0_java and schema_v0_scala_${scala.compat.version}, that
provide APIs to create a NodeCardinalityPartition object, serialize it to
a ByteArray, and deserialize it from a ByteArray.
To write the output partitions, use schema_v0_java in your processing logic.
To read the input partitions from the Road Topology &
Geometry Layer, use the corresponding Java bindings, provided by
com.here.schema.rib.topology-geometry_v2_java instead.
In the next section, you add those dependencies to the processor's sub-project.
Processing logic
The nodecardinality/processor folder contains the skeleton project of a batch
processing pipeline. This project builds the final processing application.
The project folder contains the following components::
- the main POM file
pom.xml, used to compile the project; - a
srcfolder containing the Java source files implementing the logic; - a
configfolder containing configuration files that can be used to run the pipeline locally, outside the Pipeline API.
Pipeline configuration
To perform a batch processing job, the Data Processing Library requires the following:
- HERE Resource Names (HRNs) for all the input catalogs and the output catalog.
- Versions for all the input catalogs for the batch job to process.
For pipelines in SCHEDULED state, this information is automatically provided
by the Pipeline API via two configuration files in
HOCON format:
pipeline-config.conf, providing the HRN of the input catalogs and output catalog.pipeline-job.conf, providing the versions of the input catalogs to be processed.
You upload the first file to the HERE Workspace pipeline once the pipeline is deployed; it never changes between compilations. The job configuration, on the other hand, is created on the fly by the Pipeline API, when a new job is deemed to be run. For example, a new version of an input catalog exists, and the output catalog needs to be updated.
During local development, when a batch pipeline is run locally without the Pipeline API, you must provide these configuration files ourselves, by setting two Java System Properties:
pipeline-config.file, containing the path to thepipeline-config.conffilepipeline-job.file, containing the path to thepipeline-job.conffile
The nodecardinality/processor/config folder contains templates for both
files, which you can edit and use for local development.
The pipeline configuration's template file
(nodecardinality/processor/config/pipeline-config.conf) looks like this:
// This configuration file is provided to specify input and
// output catalogs for pipelines during local development.
// In addition, the CLI uses this file when uploading the pipeline
// to the platform to create the relevant configuration.
// For more information on the HERE platform, see the
// platform documentation at https://here.com/docs.
//
// Do not include this file in the pipeline fat JAR you upload to the platform.
// When running your pipeline locally, use the command below.
//
// -Dpipeline-config.file=config/pipeline-config.conf
pipeline.config {
output-catalog { hrn = "hrn:here:data:::myoutput" } // TODO: Specify the output catalog HRN.
input-catalogs {
// The following keys are symbolic IDs for catalogs
// passed to the pipeline that the platform can use to
// bind to and identify specific inputs. Use one line
// for each input catalog in your project. Add and
// delete lines as necessary.
input-catalog-1 { hrn = "hrn:here:data:::myinput1" } // TODO: Specify the input catalog HRN.
input-catalog-2 { hrn = "hrn:here:data:::myinput2" } // TODO: Specify the input catalog HRN.
}
}pipeline.config.output-catalog.hrn indicates the output catalog's HRN. To
read the HRN of the catalog you just created, open the catalog in the platform portal.
If the catalog ID is batch-processing-quickstart, the corresponding HRN is
hrn:here:data:::batch-processing-quickstart.
pipeline.config.input-catalogs contains the HRN of all input
catalogs, indexed by arbitrary symbolic identifiers, used to
identify a specific input catalog in the job configuration and in the
processing logic. The template file contains two input catalogs, with
identifiers input-catalog-1 and input-catalog-2 and their
corresponding sample HRNs. In this project, there is only one input
catalog, HERE Map Content, with the HRN hrn:here:data::olp-here:rib-2.
Delete those two sample input catalogs, add one with the HRN mentioned
above, and then choose rib as its catalog ID:
pipeline.config {
output-catalog { hrn = "hrn:here:data:::batch-processing-quickstart" }
input-catalogs {
rib { hrn = "hrn:here:data::olp-here:rib-2" }
}
}The nodecardinality/processor/config/pipeline-job.conf file is a template for
the job configuration. It contains the following:
// This configuration file is provided to facilitate local development
// of pipelines with the HERE Data SDK and
// for use with pipelines that are not scheduled to run
// in reaction to changes in input catalogs.
// For more information on the HERE platform, see the
// platform documentation at https://here.com/docs.
//
// Do not include this file in the pipeline fat JAR you upload to the platform.
// When running your pipeline locally, use the command below.
//
// -Dpipeline-job.file=config/pipeline-job.conf
pipeline.job.catalog-versions {
input-catalogs {
// The following keys are symbolic IDs for catalogs
// passed to the pipeline that the platform can use to
// bind to and identify specific inputs.
// Specify only one processing-type/version pair per input catalog.
// Add an entry in the appropriate object (for example,
// input-catalog-1, input catalog-2, input-catalog-3, ...)
// for each input catalog in your project.
// For more information on the processing type options, see
// Build a Batch Pipeline with Maven Archetypes in the
// Data Processing Library documentation.
// Use case: Ignore results of previous compilation and fully process catalogs.
// TODO: If you want to use incremental processing, comment out these sections.
input-catalog-1 {
processing-type = "reprocess"
version = 0 // TODO: Specify the version of the catalog to be processed.
}
input-catalog-2 {
processing-type = "reprocess"
version = 2 // TODO: Specify the version of the catalog to be processed.
}
// Use case: Process catalogs incrementally. The processing
// types below are not recommended for local development. They are
// mostly used by platform pipelines to enable incremental compilation
// with the Data Processing Library.
// TODO: Comment out the section above, remove the comments below, and
// specify the versions.
// input-catalog-1 {
// processing-type = "no_changes"
// version = 0 // TODO: Specify the version of the catalog to be processed.
// }
// input-catalog-2 {
// processing-type = "changes"
// since-version = 1 // TODO: Specify the correct starting version of the changeset to be processed.
// version = 4 // TODO: Specify the ending version of the changeset to be processed.
// }
}
}For each input catalog specified in pipeline-config.conf, the
pipeline.job.catalog-versions.input-catalogs contains:
- the version of the catalog to use for processing
- the type of processing that the Data Processing Library uses for incremental compilation
The pipeline.job.catalog-versions.input-catalogs.input-catalog-ID.processing-type
can have three different values to denote three types of processing:
- reprocess: this type indicates that the catalog should be fully processed,
results from a previous compilation are not used to reduce the amount of
data being processed. This is the simplest type of processing to use, when you
are dealing with a manually written job configuration. It effectively
disables incremental compilation, a feature of the Data Processing Library
that allows you to reduce the amount of data processed using the results of
a previous compilation. With this type of processing, you must provide the
versionof the catalog to process. - no_changes: this type indicates that you want to reuse the same version of
the catalog used when the output catalog was last compiled. This type of
processing lets the Data Processing Library skip some compilation steps. You
must provide the
versionof the catalog to process. A valid processing configuration requires the version to be equal to the version used in the last compilation. The Data Processing Library makes sure this condition holds true before the processing starts, otherwise incremental compilation is disabled. - changes: this type indicates that you want to process a new version of the
catalog (
version) given the version processed in the last compilation (since-version). This type of processing may be used by the Data Processing Library to optimize processing, reducing the amount of data actually reprocessed. The processing configuration is valid as long the version of the catalog used in the last compilation is indeedsince-version; otherwise incremental compilation is disabled.
It is important to understand that both no_changes and changes are only used to enable optimizations internally, in the Data Processing Library. Conceptually, for any input catalog, the processing library fully processes a given version, always. For this quick start, and for local development, you should rely exclusively on the reprocess processing type. Once a pipeline is deployed, it is the Pipeline API duty to provide a valid job configuration that takes maximum advantage of the Data Processing Library's optimization capabilities.
At the time of writing, the latest version of the HERE Map Content
available is 1.
Let's configure the rib catalog using reprocess for
processing-type and 1 for version.
This is how pipeline-job.conf should look now:
pipeline.job.catalog-versions {
input-catalogs {
rib {
processing-type = "reprocess"
version = 1
}
}
}Dependencies
The SDK Maven Archetypes provides all basic dependencies in the pom.xml file
in the processor sub-project. You must manually add custom dependencies used
by your processing logic here. For this project two more dependencies are necessary:
com.here.schema.rib.topology-geometry_v2_java, to deserialize the input partitionscom.example.nodecardinality.schema_v0_java, which you have just created, to serialize the output partitions
Open the nodecardinality/processor/pom.xml file. There is already a
placeholder for the Java bindings created by the Schema project. To find it,
search for DATA_MODEL_PROJECT_NAME in the file:
Uncomment that dependency, then fill in the {DATA_MODEL_PROJECT_NAME} and
{DATA_MODEL_PROJECT_VERSION} placeholders with schema_v0 and
1.0.0, respectively:
<dependency>
<groupId>com.example.nodecardinality</groupId>
<artifactId>schema_v0_java</artifactId>
<version>1.0.0</version>
</dependency>Then, add a dependency on com.here.schema.rib.topology-geometry_v2_java:
<dependency>
<groupId>com.here.schema.rib</groupId>
<artifactId>topology-geometry_v2_java</artifactId>
<version>2.8.0</version>
</dependency>Processing logic
Now implement the actual processing logic by editing the Java
source files that the Maven archetypes created. In the
processor/src/main/java/com/example/nodecardinality/processor folder there are
four source files:
Main.java: contains the main entry point of the processing application, as a subclass ofPipelineRunner. TheDriveris configured with a singleDriverTaskcontaining oneDirect1ToNCompiler(implemented inCompiler.java).CompilerConfig.java: contains the compiler configuration, a class you may define to configure the business logic through theapplication.confconfiguration file. However, the business logic does not need to expose any configuration parameters, thus the default implementation is sufficient.IntermediateData.java: defines theIntermediateDataclass used by the compiler defined inCompiler.java;Compiler.java: implements the actual processing logic as aDirect1ToNCompiler.
First, decide which compilation pattern and
intermediate data to use for the task at hand. This quick start focuses on
functional patterns. Compared to the
RDD-based patterns, you get
incremental compilation with no intervention, and
you do not have to deal with Spark caveats such as partitioning, shuffling, or
persistence.
The underlying Spark application is still interesting to better understand how
the functional patterns work. All of the patterns implement different flavors
of the following scheme:
- The input metadata is retrieved and an
RDDofKeyandMetapairs is created.Keyuniquely identifies a partition, and contains the catalog ID, the layer ID, and the partition name.Metacontains information about the payload of the partition, and can be used together with the correspondingKeyto retrieve the content of a partition (payload). - A CompileIn transformation is applied to the input
RDD. The purpose of this step is to define the mapping between input and output partitions and to preprocess the input data into an intermediate representation that you define. In most compilation patterns, this step corresponds to aflatMap, where acompileInFnthat returns a sequence of(Key, IntermediateData)pairs given a single(Key, Meta)pair is applied to all elements of theRDD. This is then followed by agroupBytransformation to group all intermediate representations with the same output key together. - The resulting
RDDofKeyandIterable<IntermediateData>pairs is then processed by applying a CompileOut transformation, where aPayloadis produced for eachKeyfrom the grouped intermediate representations.
In this project, for each input partition of the HERE Map Content's
topology-geometry layer, you create an output partition with
the same Tile ID in the output catalog's node-cardinality
layer. The mapping between input and output does not depend on the
content of the input partitions; you just need the Tile ID that is
part of the partition's Key. For this reason you can use a direct compiler.
You will implement a direct 1:1 compilation since each input partition is used
to produce one output partition. This is a special case of 1:N compilation,
and therefore you only need a Direct1ToNCompiler pattern.
You still have to decide what IntermediateData to use between CompileIn and
CompileOut. Since you are performing a direct 1:1 compilation, you can
implement the processing logic in CompileOut directly. That means you can
forward the Key and Meta objects of the input partition from the CompileIn
transformation to the CompileOut transformation and process the input data.
Notice that more complex IntermediateData classes containing a processed
version of the input partition are usually necessary, especially when an input
partition is used to compile multiple output partitions and you want to avoid
processing the same data multiple times.
This is the IntermediateData class provided by the archetypes:
package com.example.nodecardinality.processor;
/**
* Intermediate compilation result holder used for data exchange between
* {@link Compiler#compileInFn(Pair)} and
* {@link Compiler#compileOutFn(Key, Iterable)} stages.
*/
public class IntermediateData {
// TODO: Configure your data definition, which is used during processing on the cluster.
// For more information, see the Data Processing Library documentation
// at https://here.com/docs.
private String attribute1;
private String attribute2;
public IntermediateData(String attribute1, String attribute2) {
this.attribute1 = attribute1;
this.attribute2 = attribute2;
}
public String getAttribute1() {
return attribute1;
}
public String getAttribute2() {
return attribute2;
}
}Let's rewrite the implementation to wrap a Key and Meta pair:
package com.example.nodecardinality.processor;
import com.here.platform.data.processing.java.catalog.partition.Key;
import com.here.platform.data.processing.java.catalog.partition.Meta;
public class IntermediateData {
private Key key;
private Meta meta;
IntermediateData(Key key, Meta meta) {
this.key = key;
this.meta = meta;
}
Key getKey() {
return key;
}
Meta getMeta() {
return meta;
}
}After that, open the
processor/src/main/java/com/example/nodecardinality/processor/Compiler.java
file.
You need to import the Java bindings for the HERE Map Content
topology-geometry layer and for your output layer:
import com.example.nodecardinality.schema.v0.Schema;
import com.here.platform.pipeline.logging.java.ContextAwareLogger;
import com.here.schema.rib.v2.TopologyGeometryPartitionOuterClass;
import com.here.schema.rib.v2.TopologyGeometry;Now, replace the placeholders for the input and output layers:
/**
* Input catalog ID
* The example code below uses only one input catalog and one input layer.
* If you need to use more than one input catalog and/or one input layer, then
* you need to add additional definitions here and handle these definitions in
* your retriever and Map implementations below.
* private Map<String, Retriever> catalogNameToRetrieverMap = new HashMap<>();
* catalogNameToRetrieverMap.put(IN_CATALOG, retriever);
*/
// TODO: Specify the name of your input catalog, use the same symbolic ID as that
// specified in the pipeline-config.conf file.
final private String IN_CATALOG = "YOUR_INPUT_CATALOG_NAME";
/** Name of input layer */
// TODO: Specify the name of your input layer, use the same symbolic ID as that
// used for the layer in the catalog on the platform portal.
final private String IN_LAYER = "your-input-layer";
/** Name of output layer */
// TODO: Specify the name of your output layer, use the same symbolic ID as you
// used when you created the layer.
final private String OUT_LAYER = "your-output-layer";For IN_CATALOG, use the symbolic ID configured in
pipeline-config.conf (rib). The IN_LAYER is topology-geometry
and the OUT_LAYER is node-cardinality:
/**
* Input catalog ID The example code below uses only one input catalog and one input layer. If you
* need to use more than one input catalog and/or one input layer, then you need to add additional
* definitions here and handle these definitions in your retriever and Map implementations below.
* private Map<String, Retriever> catalogNameToRetrieverMap = new HashMap<>();
* catalogNameToRetrieverMap.put(IN_CATALOG, retriever);
*/
// TODO: Specify the name of your input catalog, use the same symbolic ID as that
// specified in the pipeline-config.conf file.
private final String IN_CATALOG = "rib";
/** Name of input layer */
// TODO: Specify the name of your input layer, use the same symbolic ID as that
// used for the layer in the catalog on the platform portal.
private final String IN_LAYER = "topology-geometry";
/** Name of output layer */
// TODO: Specify the name of your output layer, use the same symbolic ID as you
// used when you created the layer.
private final String OUT_LAYER = "node-cardinality";In a direct compiler, the CompileIn function is split into:
- a
mappingFnfunction that returns a sequence of outputKeys given an inputKey - a
compileInFnfunction that returns anIntermediateDataobject given an inputKeyandMeta
The mapping established by mappingFn is used to send the IntermediateData
object to the corresponding output Key, which is then compiled in the
compileOutFn function.
Search for the mapping function:
public java.lang.Iterable<Key> mappingFn(Key inKey) {
List<Key> outputs = new ArrayList<>();
// TODO: Add code that defines the mapping. Uncomment the line below to add a 1:1 mapping.
// outputs.add(new Key(Default.OutCatalogId(), OUT_LAYER, inKey.partition()));
throw new RuntimeException("NOT Implemented");
// return outputs;
}Each input Key has to be mapped to an output Key with the same partition name,
a catalog ID equal to the output catalog (Default.OutCatalogId()), and
a layer ID equal to the output layer (OUT_LAYER).
First, import the Default object:
import com.here.platform.data.processing.java.driver.Default;Then implement mappingFn as follows:
public Iterable<Key> mappingFn(Key inKey) {
List<Key> outputs = new ArrayList<>();
outputs.add(inKey
.withCatalog(Default.OutCatalogId())
.withLayer(OUT_LAYER));
return outputs;
}In compileInFn, simply return an IntermediateData built out of the Key
and Meta of the input partition.
Replace the existing compileInFn method with the following:
public IntermediateData compileInFn(Pair<Key, Meta> in) {
return new IntermediateData(in.getKey(), in.getValue());
}Then replace compileOutFn with the following:
public Optional<Payload> compileOutFn(Key outKey, IntermediateData intermediate) {
Payload payload = retriever.getPayload(intermediate.getKey(), intermediate.getMeta());
try {
TopologyGeometryPartitionOuterClass.TopologyGeometryPartition partition =
TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.parseFrom(
payload.content());
Schema.NodeCardinalityPartition.Builder builder =
Schema.NodeCardinalityPartition.newBuilder();
// in the output partition node cardinalities will have the same order as the input
// nodes in HERE Map Content.
for (TopologyGeometry.Node node : partition.getNodeList()) {
builder.addNodeCardinality(
Schema.NodeCardinality.newBuilder()
.setId(node.getIdentifier())
.setCardinality(node.getSegmentRefCount())
.build());
}
Schema.NodeCardinalityPartition outputPartition = builder.build();
return Optional.of(new Payload(outputPartition.toByteArray()));
} catch (InvalidProtocolBufferException ex) {
throw new RuntimeException(ex);
}
}Here, the return values of mappingFn and compileInFn from above are passed
compileOutFn as the OutKey of the output partition and the IntermediateData
containing the InKey and InMeta pair from the corresponding input partition.
You retrieve the payload of the input partition using the retriever object
that is initialized when the Compiler object is constructed:
Payload payload = retriever.getPayload(intermediate.getKey(), intermediate.getMeta());Decode the corresponding topology partition using the Java bindings
of the HERE Map Content's topology-geometry layer:
TopologyGeometryPartitionOuterClass.TopologyGeometryPartition partition =
TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.parseFrom(
payload.content());Next, create the corresponding output partition using the Java bindings from
the Schema project. For each Protocol Buffer message, in this case
Schema.NodeCardinalityPartition, you get a builder (newBuilder) that
provides interfaces to set all fields in the message:
Schema.NodeCardinalityPartition.Builder builder =
Schema.NodeCardinalityPartition.newBuilder();You only have a repeated field node_cardinality, for which an
addNodeCardinality method is automatically generated to add elements to the
sequence. For each node in the input partition (partition.getNodeList()), use that method to add a NodeCardinality object with the node's ID
(node.getIdentifier()) and its cardinality (node.getSegmentRefCount()). The
NodeCardinality object is once again constructed with a builder:
// in the output partition node cardinalities will have the same order as the input
// nodes in RIB.
for (TopologyGeometry.Node node : partition.getNodeList()) {
builder.addNodeCardinality(
Schema.NodeCardinality.newBuilder()
.setId(node.getIdentifier())
.setCardinality(node.getSegmentRefCount())
.build());
}Next, use the build method to create a new
Schema.NodeCardinalityPartition:
Schema.NodeCardinalityPartition outputPartition = builder.build();To publish the data, you serialize the output partition to a byte array
(outputPartition.toByteArray()), parse a Payload object from it,
and return the optional payload. If you don't want a specific output
partition in the output catalog, you can use an empty Java.util.Optional
object. But in this case, you want to publish an output partition for
each input partition available:
return Optional.of(new Payload(outputPartition.toByteArray()));Now you can build the entire project from the top-level folder:
$ pwd
~/projects/nodecardinality
$ mvn installRun the processor locally
Processing a global catalog like the HERE Map Content can be time
consuming. However, you can limit the number of partitions to
process during local development by adding one or more partition
filters to the application.conf file.
In this case you use a BoundingBoxFilter to process only the partitions
inside a bounding box containing the city of Berlin.
Open the processor/src/main/resources/application.conf
file and append this partition filter configuration:
here.platform.data-processing.executors.partitionKeyFilters = [
{
className = "BoundingBoxFilter"
param = {
boundingBox {
// Berlin
north = 52.67551
south = 52.338261
east = 13.76116
west = 13.08835
}
}
}
]Make sure to rerun mvn install after making this change. From the processor
module folder, you can then run the compilation job using the configuration files set
up above:
$ pwd
~/projects/nodecardinality/processor
$ mvn exec:java -Dexec.mainClass=com.example.nodecardinality.processor.Main \
-Dpipeline-config.file=config/pipeline-config.conf \
-Dpipeline-job.file=config/pipeline-job.conf \
-Dexec.args="--master local[*]"The Maven exec plugin is used with the main class set to
com.example.nodecardinality.processor.Main and configuration files
config/pipeline-config.conf and config/pipeline-job.conf.
The PipelineRunner main method accepts an optional
--masterspark-master command line argument to set the master URL for
the cluster. Use local[*] to run Spark locally.
Inspect the catalog
To inspect the new catalog, do the following:
- Log into the platform. Search for the catalog from the Data tab and select it to switch to the catalog view.
- Select the
node-cardinalitylayer and select the Inspect tab. - Set the zoom level to 8 and search the map for Berlin.
- The output partitions are highlighted on the map, covering the bounding box of Berlin you specified.
Decode partitions
To decode the output partitions, you need to configure the layer to use
your custom schema. First, upload your schema to the platform.
From the nodecardinality/schema/ folder, run mvn deploy:
$ pwd
~/projects/nodecardinality/schema
$ mvn deployA schema with the same group ID, artifact ID, and version can be only
deployed once. To avoid collisions, ensure that your group ID
(com.example.nodecardinality in this example) is unique.
In the Data view of the portal, click
Browse schemas to display a list of deployed schemas your user can
access, and make sure your schema schema_v0 is there.
From the node-cardinality layer view of the portal, click More and
then select Reconfigure layer to access the layer configuration page.
You have now the possibility to change most of the parameters you set
during the creation of the layer, including the schema configuration.
Locate the Schema configuration, select schema_v0 from the top down
menu and then click Save at the bottom of the page.
From the Inspect tab, you can now select any partition. The decoded partition is displayed on the panel to the right.
Run The processor as the pipeline on the HERE Platform
Using the Pipeline API, you can run the batch pipeline you just created on a cluster. You can deploy your pipeline in one of two ways:
- scheduled pipeline, that automatically runs every time there is a new version of an input catalog
- run-once pipeline, that uses a
pipeline-job.confconfiguration you provide.
To use the Pipeline API, your App ID must belong to a Group. Your administrator must set up a Group ID in your account and assign your application to the group. Finally, you must grant that Group ID Read and Write access to the output catalog.
To deploy the processor to the HERE Workspace pipeline, you need to
package it into a fat JAR. The pom.xml file generated by the
Archetypes contains a platform profile for this purpose:
$ pwd
~/projects/nodecardinality/processor
$ mvn -Pplatform packageThe above command creates a fat JAR of the processor as
processor/target/processor-1.0-SNAPSHOT-platform.jar.
You must use this file to create a Pipeline Template.
Refer to the Pipelines Developer's Guide for detailed instructions on deploying and running pipelines from the platform.
Additionally, you can deploy and run a pipeline with the OLP CLI. Refer to the OLP CLI User Guide for details.
Updated 10 hours ago