How to write to versioned data
How to write to versioned data
Write to a versioned layer
When you publish a batch publication, follow the steps below.
- upload the data using
WriteEngine.put() - upload metadata
Since batch publication requires version consistency, you need to aggregate the metadata (for instance, in time windows) and publish on a single node.
/** Map function that upload data and return metadata */
class UploadDataFunction(hrn: HRN)
extends RichMapFunction[PendingPartition, CommitPartition]
with Serializable {
// initialize DataClient
@transient
private lazy val dataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val writeEngine: FlinkWriteEngine =
dataClient.writeEngine(hrn)
// terminate DataClient
override def close(): Unit =
dataClient.terminate()
// read data and publish a tuple with partition and data
override def map(pendingPartition: PendingPartition): CommitPartition =
writeEngine.put(pendingPartition)
}
/** Window function that publish a new batch version for all CommitPartition in the window */
class PublishBatchWindowFunction(hrn: HRN, layerId: String)
extends RichAllWindowFunction[CommitPartition, String, TimeWindow]
with Serializable {
// initialize DataClient
@transient
private lazy val flinkDataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val publishApi: FlinkPublishApi =
flinkDataClient.publishApi(hrn)
// terminate DataClient
override def close(): Unit =
flinkDataClient.terminate()
override def apply(window: TimeWindow,
partitions: JIterable[CommitPartition],
out: Collector[String]): Unit = {
val baseVersion = publishApi.getBaseVersion()
publishApi.publishBatch2(baseVersion,
Some(Seq(layerId)),
dependencies = Nil,
partitions.iterator.asScala)
out.collect(s"commit on $baseVersion success")
}
}
// given a stream of new partitions
val pendingPartitions: DataStream[PendingPartition] =
getPendingPartitionsStream()
// for each input: upload data and collect metadata
val newPartitions: DataStream[CommitPartition] =
pendingPartitions.map(new UploadDataFunction(hrn))
// group metadata by 1 minute window
val newPartitionsWindow: AllWindowedStream[CommitPartition, TimeWindow] =
newPartitions.windowAll(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
// publish new version on window trigger, propagating the base version of the commit
val results: DataStream[String] =
newPartitionsWindow.apply(new PublishBatchWindowFunction(hrn, batchLayer))/** Map function that upload data and return metadata */
class UploadDataFunction extends RichMapFunction<PendingPartition, CommitPartition>
implements Serializable {
private HRN hrn;
private transient FlinkDataClient dataClient;
private transient FlinkWriteEngine writeEngine;
public UploadDataFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(OpenContext openContext) throws Exception {
dataClient = new FlinkDataClient();
writeEngine = dataClient.writeEngine(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public CommitPartition map(PendingPartition pendingPartition) throws Exception {
return writeEngine.put(pendingPartition);
}
}
/** Window function that publish a new batch version for all CommitPartition in the window */
class PublishBatchWindowFunction
extends RichAllWindowFunction<CommitPartition, String, TimeWindow> implements Serializable {
private HRN hrn;
private transient FlinkDataClient dataClient;
private transient FlinkPublishApi publishApi;
public PublishBatchWindowFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(OpenContext openContext) throws Exception {
dataClient = new FlinkDataClient();
publishApi = dataClient.publishApi(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public void apply(
TimeWindow window, Iterable<CommitPartition> commitPartitions, Collector<String> out)
throws Exception {
OptionalLong baseVersion = publishApi.getBaseVersion();
publishApi.publishBatch2(
baseVersion, Optional.empty(), Collections.emptyList(), commitPartitions.iterator());
out.collect("commit on " + baseVersion + " success");
}
}
// given a stream of new partitions
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();
// for each input: upload data and collect metadata
DataStream<CommitPartition> newPartitions = pendingPartitions.map(new UploadDataFunction(hrn));
// group metadata by 1 minute window
AllWindowedStream<CommitPartition, TimeWindow> newPartitionsWindow =
newPartitions.windowAll(TumblingEventTimeWindows.of(Duration.ofMinutes(1)));
// publish new version on window trigger, propagating the base version of the commit
DataStream<String> results = newPartitionsWindow.apply(new PublishBatchWindowFunction(hrn));Updated 22 days ago