GuidesChangelogData Inspector Library API Reference
Guides

How to read and write volatile data

How to read and write volatile data

Read partitions from a volatile layer into dataStream

The snippet below shows how to retrieve partitions of a volatile layer in a catalog.

// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// subscribe to a volatile layer
import com.here.platform.data.client.common.VolatilePartitionsFilter._

// specific timestamp
val timestamp = 0L
val partition1 = "somePartition1"
val partition2 = "somePartition2"

val volatilePartitions: DataStream[Partition] =
  env.fromSource(
    queryApi.getVolatilePartitions(volatileLayer,
                                   since(timestamp) and byIds(Set(partition1, partition2))),
    WatermarkStrategy.noWatermarks(),
    "volatile-partitions"
  )
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

Long timestamp = 0L;
String partition1 = "somePartition1";
String partition2 = "somePartition2";

Set<String> ids = new HashSet<>(Arrays.asList(partition1, partition2));
VolatilePartitionsFilter filter =
    new VolatilePartitionsFilter.Builder().withIds(ids).withSinceTimestamp(timestamp).build();

// retrieve partitions of a volatile layer
DataStream<Partition> partitions =
    env.fromSource(
        queryApi.getVolatilePartitions(volatileLayer, filter, Collections.emptySet()),
        WatermarkStrategy.noWatermarks(),
        "volatile-partitions");

Notes:

  • If the VolatilePartitionsFilter is not set then "VolatilePartitionsFilter.empty" will be used by default, and it would mean that all the partitions will be read.

Publish volatile data

Use BlobstoreIdGenerator with a deterministic function to rewrite data.

// deterministic function that always generate same dataHandle for same partition
val idGenerator: BlobIdGenerator =
  new StableBlobIdGenerator(version = 0L)

// given a stream of PendingPartitions to be uploaded
val pendingPartitions: DataStream[PendingPartition] =
  getPendingPartitionsStream()

// add our publish sink to upload data
pendingPartitions.sinkTo(new FlinkDataClient().writeEngine(hrn, idGenerator).publish())
// deterministic function that always generate same dataHandle for same partition
BlobIdGenerator idGenerator = new StableBlobIdGenerator.Builder().withVersion(0L).build();

// given a stream of PendingPartitions to be uploaded
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();

// add our publish sink to upload data
pendingPartitions.sinkTo(new FlinkDataClient().writeEngine(hrn, idGenerator).publish());