How to publish metadata
How to publish metadata
Use CommitPartition to publish new or deleted partitions. You can mix both
creation and deletion in the same publication.
The HERE platform supports following types of layers.
- versioned
- volatile
- index
- stream
- objectstore
Stream / volatile layers
The platform stream and volatile layers may contain data that smaller than its metadata , which means there is no reason to publish metadata separately from the data. For information on publishing a stream/volatile layer, see Publish Data.
Versioned layers
Catalogs are empty when you first create them. All versioned layers are at version ∅.
To publish you first publication to a versioned layer, add the following:
// create publishApi for a catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)
// prepare a list of partitions to publish
val partitions =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.newCommitPartition(
partition = partitionId2,
layer = versionedLayerId,
dataHandle = "example-data-handle"
)
)
)
// publish initial version
val firstPublish: Future[Done] =
publishApi.publishBatch2(
baseVersion = None, // publication to an empty catalog can be done with `baseVersion = None`.
Some(Seq(versionedLayerId)),
dependencies = Seq.empty,
partitions = partitions
)// create publishApi for a catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
// prepare a list of partitions to publish
CommitPartition newPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition newPartition2 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId2)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);
Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);
// publish initial version
CompletableFuture<Done> futurePublish =
publishApi
.publishBatch2(
emptyVersion,
Optional.of(Arrays.asList(layer)),
Collections.emptyList(),
partitions)
.toCompletableFuture();When you publish updates to a versioned layer, you need to provide a base version, which is the current version of the catalog at the time of publication. The base version is used to ensure that the catalog version has not changed during the publication processing, which could lead to data inconsistency.
To publish you an update to a versioned layer, add the following:
val nextPublishPartitions =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.deletedPartition(
partition = partitionId2,
layer = versionedLayerId
)
)
)
// for subsequent publications catalog base version needs to be provided
val nextPublish: Future[Done] =
for {
baseVersion <- publishApi.getBaseVersion()
_ <- publishApi.publishBatch2(
baseVersion = baseVersion,
Some(Seq(versionedLayerId)),
dependencies = Seq.empty,
partitions = nextPublishPartitions
)
} yield Done// prepare a list of partitions to publish
CommitPartition nextPublishPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition deletePartition2 =
new CommitPartition.Builder()
.deletePartition()
.withPartition(partitionId2)
.withLayer(layer)
.build();
ArrayList<CommitPartition> nextPublishPartitionList = new ArrayList<>();
partitionList.add(nextPublishPartition1);
partitionList.add(deletePartition2);
Source<CommitPartition, NotUsed> nextPublishPartitions = Source.from(nextPublishPartitionList);
// for subsequent publications catalog base version needs to be provided
CompletableFuture<Done> futureNextPublish =
publishApi
.getBaseVersion()
.thenCompose(
baseVersion ->
publishApi.publishBatch2(
baseVersion,
Optional.of(Arrays.asList(layer)),
Collections.emptyList(),
nextPublishPartitions))
.toCompletableFuture();Base versions and version dependencies
When you start a batch, you must provide a base version, to which the
publication handled by the batch is relative. The base version must be the
latest version of the catalog. The base version is used to ensure that the
publication is relative to the version your batch assumed, and no concurrent
publication to the same layer was applied.
Consistency is important not only within a catalog but between all the catalogs
in a data processing pipeline. To ensure consistency between catalogs, each
batch can contain a list of VersionDependency objects that show the catalogs
and versions used to produce the publication.
NoteVersion dependencies should include catalogs only if its versioned layer was used to produce a new version of a given catalog. For example, if you read from a single stream layer and publish data to a versioned layer, then the dependencies should be empty.
To add a version dependency, add the following:
val dependencies =
Seq(
VersionDependency(
hrn = upstreamCatalogHrn,
version = 5L,
direct = true
)
)
for {
baseVersion <- publishApi.getBaseVersion()
_ <- publishApi.publishBatch2(
baseVersion = baseVersion,
Some(Seq(versionedLayerId)),
dependencies = dependencies,
partitions = partitions
)
} yield DoneArrayList<VersionDependency> dependencies = new ArrayList<>();
VersionDependency dependency = new VersionDependency(upstreamCatalogHrn, 5L, true);
dependencies.add(dependency);
CompletableFuture<Done> futurePublishWithDeps =
publishApi
.getBaseVersion()
.thenCompose(
baseVersion ->
publishApi.publishBatch2(
baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions))
.toCompletableFuture();Simplified metadata publication process
Every publication to this type of layer requires a batch token even if the
batch job is performed in a single publication request. You need to start a
batch, receive a batch token and provide that batch token for every
subsequent publication. Once all publications are submitted, the batch needs to
be finalized, after which the server starts processing the submitted
publications. A batched publication is considered completed once all
publications have been processed and a new version is published to a catalog.
The Data Client Library provides helpers to perform your batched publication
with a single step. The helpers automatically start a batch, publish the data
using that batch, and finalize the batch. The call completes once the data has
been processed and is available in the catalog.
To add publish a batch, add the following:
// create publishApi for a catalog
val publishApi = DataClient().publishApi(catalogHrn)
// list of dependencies for this publication
val dependencies = Seq.empty[VersionDependency]
val partitions: Source[CommitPartition, NotUsed] =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.newCommitPartition(
partition = partitionId2,
layer = versionedLayerId,
dataHandle = "example-data-handle"
)
)
)
publishApi.publishBatch2(
baseVersion = None,
Some(Seq(versionedLayerId)),
dependencies = dependencies,
partitions = partitions
)// create publishApi for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
// list of dependencies for this publication
List<VersionDependency> dependencies = Collections.emptyList();
CommitPartition newPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition newPartition2 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId2)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);
Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);
CompletableFuture<Done> futurePublish =
publishApi
.publishBatch2(baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions)
.toCompletableFuture();Distributed publications
The HERE platform enables you to process and publish a large number of partitions in a distributed manner.
For versioned layers, this is a three-step process:
- initiate the publication process by starting a new batch publication and
receiving a
batch token, normally this operation happens on the master or driver node in the cluster. - different workers upload partitions attaching them to the same
batch token - once all data is sent to server, you need to complete the batch upload, normally this operation happens on the master or driver node in the cluster.
Upon receiving the complete batch request, the server starts processing publications to create the next catalog version.
To publish multiple requests to a versioned layer, add the following:
// create publishApi and writeEngine for source catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)
val writeEngine = DataEngine().writeEngine(catalogHrn)
// start batch publication
publishApi
.startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty)
.flatMap { batchToken =>
//start worker 1 with upload data and publishing metadata
val worker1 =
publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism = 2) {
partition =>
writeEngine.put(partition)
})
//start worker 2 with upload data and publishing metadata
val worker2 =
publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism = 2) {
partition =>
writeEngine.put(partition)
})
// wait until workers are done uploading data/metadata
for {
_ <- worker1
_ <- worker2
} yield batchToken
}
.flatMap { batchToken =>
//signal to server complete batch publication
publishApi.completeBatch(batchToken)
}// create publishApi and writeEngine for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);
// start a batch, publish partitions, complete batch
CompletableFuture<Done> futurePublish =
publishApi
.startBatch2(baseVersion, Optional.empty(), Collections.emptyList())
.thenCompose(
batchToken -> {
// start worker 1 with upload data and publishing metadata
Source<PendingPartition, NotUsed> partitionsOnWorker1 =
arbitraryPendingPartitions1;
CompletableFuture<Done> worker1 =
publishApi
.publishToBatch(
batchToken,
partitionsOnWorker1.mapAsync(
2, partition -> writeEngine.put(partition)))
.toCompletableFuture();
// start worker 2 with upload data and publishing metadata
Source<PendingPartition, NotUsed> partitionsOnWorker2 =
arbitraryPendingPartitions2;
CompletableFuture<Done> worker2 =
publishApi
.publishToBatch(
batchToken,
partitionsOnWorker2.mapAsync(
2, partition -> writeEngine.put(partition)))
.toCompletableFuture();
// wait until workers are done upload
return worker1.thenCombine(worker2, (done, done2) -> batchToken);
})
.thenCompose(
batchToken -> {
return publishApi.completeBatch(batchToken);
})
.toCompletableFuture();For stream and volatile layers distributed publication is not different from normal publication process.
Index layer
If your partition data is already uploaded to Blobstore and you have dataHandle of this uploaded data, you can index the partition in Index layer with the specified keys.
To index partition in index layer, add the following:
// How to index partition with PublishApi.publishIndex
publishApi.publishIndex(indexLayerId, commitPartitions)// How to index partition with PublishApi.publishIndex
publishApi.publishIndex(indexLayerId, commitPartitions);Updated 22 days ago