How to read and write volatile data
How to read and write volatile data
Read partitions from a volatile layer into dataStream
The snippet below shows how to retrieve partitions of a volatile layer in a catalog.
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)
// subscribe to a volatile layer
import com.here.platform.data.client.common.VolatilePartitionsFilter._
// specific timestamp
val timestamp = 0L
val partition1 = "somePartition1"
val partition2 = "somePartition2"
val volatilePartitions: DataStream[Partition] =
env.fromSource(
queryApi.getVolatilePartitions(volatileLayer,
since(timestamp) and byIds(Set(partition1, partition2))),
WatermarkStrategy.noWatermarks(),
"volatile-partitions"
)FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);
Long timestamp = 0L;
String partition1 = "somePartition1";
String partition2 = "somePartition2";
Set<String> ids = new HashSet<>(Arrays.asList(partition1, partition2));
VolatilePartitionsFilter filter =
new VolatilePartitionsFilter.Builder().withIds(ids).withSinceTimestamp(timestamp).build();
// retrieve partitions of a volatile layer
DataStream<Partition> partitions =
env.fromSource(
queryApi.getVolatilePartitions(volatileLayer, filter, Collections.emptySet()),
WatermarkStrategy.noWatermarks(),
"volatile-partitions");Notes:
- If the VolatilePartitionsFilter is not set then "VolatilePartitionsFilter.empty" will be used by default, and it would mean that all the partitions will be read.
Publish volatile data
Use BlobstoreIdGenerator with a deterministic function to rewrite data.
// deterministic function that always generate same dataHandle for same partition
val idGenerator: BlobIdGenerator =
new StableBlobIdGenerator(version = 0L)
// given a stream of PendingPartitions to be uploaded
val pendingPartitions: DataStream[PendingPartition] =
getPendingPartitionsStream()
// add our publish sink to upload data
pendingPartitions.sinkTo(new FlinkDataClient().writeEngine(hrn, idGenerator).publish())// deterministic function that always generate same dataHandle for same partition
BlobIdGenerator idGenerator = new StableBlobIdGenerator.Builder().withVersion(0L).build();
// given a stream of PendingPartitions to be uploaded
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();
// add our publish sink to upload data
pendingPartitions.sinkTo(new FlinkDataClient().writeEngine(hrn, idGenerator).publish());Updated 21 days ago