How to read and write stream data
How to read and write stream data
Reading partitions from a stream layer
The snippet below shows how to subscribe to a stream layer in a catalog.
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)
// subscribe to a stream layer
val partitions: DataStream[Partition] =
env.fromSource(queryApi.subscribe(streamLayer, ConsumerSettings(groupName = "my-job")),
WatermarkStrategy.noWatermarks(),
"Source")FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);
// subscribe to a stream layer
ConsumerSettings consumerSettings =
new ConsumerSettings.Builder().withGroupName("my-job").build();
DataStream<Partition> partitions =
env.fromSource(
queryApi.subscribe(streamLayer, consumerSettings),
WatermarkStrategy.noWatermarks(),
"subscribe-stream");Reading data from partitions
Use FlinkReadEngine.asDataFlow or a custom map function with
FlinkReadEngine.getDataAsBytes to fetch data.
// create dataclient
val client = new FlinkDataClient()
val readEngine = client.readEngine(hrn)
// give a stream of partitions
val partitions: DataStream[Partition] =
getPartitionsStream
// convert into a stream of data
val data: DataStream[Array[Byte]] =
partitions.map(readEngine.asDataFlow())FlinkDataClient client = new FlinkDataClient();
FlinkReadEngine readEngine = client.readEngine(hrn);
// give a stream of partitions
DataStream<Partition> partitions = getPartitionsStream();
// convert into a stream of data
DataStream<byte[]> data = partitions.map(readEngine.asDataFlow());Custom reading map functions
The snippet below illustrates how to implement a function to return a tuple of Partition and Data.
/** Map function with access to DataClient. */
class ReadDataMapFunction(hrn: HRN)
extends RichMapFunction[Partition, (Partition, Array[Byte])]
with Serializable {
// initialize DataClient
@transient
private lazy val dataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val readEngine: FlinkReadEngine =
dataClient.readEngine(hrn)
// terminate DataClient
override def close(): Unit =
dataClient.terminate()
// read data and publish a tuple with partition and data
override def map(partition: Partition): (Partition, Array[Byte]) =
partition -> readEngine.getDataAsBytes(partition)
}
// given a stream of partitions
val partitions: DataStream[Partition] =
getPartitionsStream
// apply our custom map function that uses DataClient
val data: DataStream[(Partition, Array[Byte])] =
partitions.map(new ReadDataMapFunction(hrn))/** Map function with access to DataClient. */
class ReadDataMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>>
implements Serializable {
private final HRN hrn;
private transient FlinkDataClient dataClient;
private transient FlinkReadEngine readEngine;
public ReadDataMapFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(OpenContext openContext) throws Exception {
dataClient = new FlinkDataClient();
readEngine = dataClient.readEngine(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public Tuple2<Partition, byte[]> map(Partition partition) throws Exception {
byte[] data = readEngine.getDataAsBytes(partition);
return new Tuple2<Partition, byte[]>(partition, data);
}
}
// give a stream of partitions
DataStream<Partition> partitions = getPartitionsStream();
// apply our custom map function that uses DataClient
DataStream<Tuple2<Partition, byte[]>> data = partitions.map(new ReadDataMapFunction(hrn));Writing partitions to a stream layer
The snippet below shows how to publish to a stream layer in a catalog.
// create dataclient
val client = new FlinkDataClient()
val writeEngine = client.writeEngine(hrn)
// given a stream of new partitions
val pendingPartitions: DataStream[PendingPartition] =
getPendingPartitionsStream
// add a sink to publish all partitions
pendingPartitions.sinkTo(writeEngine.publish())FlinkDataClient client = new FlinkDataClient();
FlinkWriteEngine writeEngine = client.writeEngine(hrn);
// given a stream of new partitions
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();
// add a sink to publish all partitions
pendingPartitions.sinkTo(writeEngine.publish());Updated 22 days ago