GuidesChangelogData Inspector Library API Reference
Guides

How to read and write stream data

How to read and write stream data

Reading partitions from a stream layer

The snippet below shows how to subscribe to a stream layer in a catalog.

// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// subscribe to a stream layer
val partitions: DataStream[Partition] =
  env.fromSource(queryApi.subscribe(streamLayer, ConsumerSettings(groupName = "my-job")),
                 WatermarkStrategy.noWatermarks(),
                 "Source")
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

// subscribe to a stream layer
ConsumerSettings consumerSettings =
    new ConsumerSettings.Builder().withGroupName("my-job").build();
DataStream<Partition> partitions =
    env.fromSource(
        queryApi.subscribe(streamLayer, consumerSettings),
        WatermarkStrategy.noWatermarks(),
        "subscribe-stream");

Reading data from partitions

Use FlinkReadEngine.asDataFlow or a custom map function with FlinkReadEngine.getDataAsBytes to fetch data.

// create dataclient
val client = new FlinkDataClient()
val readEngine = client.readEngine(hrn)

// give a stream of partitions
val partitions: DataStream[Partition] =
  getPartitionsStream

// convert into a stream of data
val data: DataStream[Array[Byte]] =
  partitions.map(readEngine.asDataFlow())
FlinkDataClient client = new FlinkDataClient();
FlinkReadEngine readEngine = client.readEngine(hrn);

// give a stream of partitions
DataStream<Partition> partitions = getPartitionsStream();

// convert into a stream of data
DataStream<byte[]> data = partitions.map(readEngine.asDataFlow());

Custom reading map functions

The snippet below illustrates how to implement a function to return a tuple of Partition and Data.

/** Map function with access to DataClient. */
class ReadDataMapFunction(hrn: HRN)
    extends RichMapFunction[Partition, (Partition, Array[Byte])]
    with Serializable {
  // initialize DataClient
  @transient
  private lazy val dataClient: FlinkDataClient =
    new FlinkDataClient()

  @transient
  private lazy val readEngine: FlinkReadEngine =
    dataClient.readEngine(hrn)

  // terminate DataClient
  override def close(): Unit =
    dataClient.terminate()

  // read data and publish a tuple with partition and data
  override def map(partition: Partition): (Partition, Array[Byte]) =
    partition -> readEngine.getDataAsBytes(partition)
}
// given a stream of partitions
val partitions: DataStream[Partition] =
  getPartitionsStream

// apply our custom map function that uses DataClient
val data: DataStream[(Partition, Array[Byte])] =
  partitions.map(new ReadDataMapFunction(hrn))
/** Map function with access to DataClient. */
class ReadDataMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>>
    implements Serializable {
  private final HRN hrn;

  private transient FlinkDataClient dataClient;

  private transient FlinkReadEngine readEngine;

  public ReadDataMapFunction(HRN hrn) {
    this.hrn = hrn;
  }

  @Override
  public void open(OpenContext openContext) throws Exception {
    dataClient = new FlinkDataClient();
    readEngine = dataClient.readEngine(hrn);
  }

  @Override
  public void close() throws Exception {
    dataClient.terminate();
  }

  @Override
  public Tuple2<Partition, byte[]> map(Partition partition) throws Exception {
    byte[] data = readEngine.getDataAsBytes(partition);
    return new Tuple2<Partition, byte[]>(partition, data);
  }
}
// give a stream of partitions
DataStream<Partition> partitions = getPartitionsStream();

// apply our custom map function that uses DataClient
DataStream<Tuple2<Partition, byte[]>> data = partitions.map(new ReadDataMapFunction(hrn));

Writing partitions to a stream layer

The snippet below shows how to publish to a stream layer in a catalog.

// create dataclient
val client = new FlinkDataClient()
val writeEngine = client.writeEngine(hrn)

// given a stream of new partitions
val pendingPartitions: DataStream[PendingPartition] =
  getPendingPartitionsStream

// add a sink to publish all partitions
pendingPartitions.sinkTo(writeEngine.publish())
FlinkDataClient client = new FlinkDataClient();
FlinkWriteEngine writeEngine = client.writeEngine(hrn);

// given a stream of new partitions
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();

// add a sink to publish all partitions
pendingPartitions.sinkTo(writeEngine.publish());