How to write to stream data through FlinkKafkaPublisher
How to write to stream data through FlinkKafkaPublisher
Use as one line
The snippet below shows how to publish to a stream layer by one line.
// give an iterator of PendingPartition`s
val pendingPartitions: Iterator[NewPartition] =
Iterator.single(
NewPartition("partition", "layer-id", NewPartition.ByteArrayData(Array(1.toByte))))
// publish partitions into streaming layer
FlinkKafkaPublisher(hrn, pendingPartitions)// give an iterator of PendingPartition`s
Iterator<PendingPartition> pendingPartitions = getPendingPartitions();
// publish partitions into streaming layer
FlinkKafkaPublisher.apply(hrn, pendingPartitions);
NoteUsage specifics
FlinkKafkaPublisherclass is a non-standard Flink solution. It will reduce the performance of a processing stream. Recommend to use general approach for writing partitions to stream layerwriteEngine.publish()
Use when need publish multiple times with better performance
The snippet below shows how to publish to a stream layer with better performance. Where it needs to publish multiple times and terminate at the finish.
new RichMapFunction[String, String]() {
lazy val publisher = new FlinkKafkaPublisher(hrn)
override def map(value: String): String = {
// give an iterator of PendingPartition`s
val pendingPartitions: Iterator[NewPartition] =
Iterator.single(
NewPartition("partition", "layer-id", NewPartition.ByteArrayData(Array(1.toByte))))
// publish partitions into streaming layer
publisher.publish(pendingPartitions)
"some data"
}
override def close(): Unit = publisher.terminate()
}class SomeMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>>
implements Serializable {
private final HRN hrn;
private transient FlinkKafkaPublisher publisher;
public SomeMapFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(OpenContext openContext) throws Exception {
// initialize publisher
publisher = new FlinkKafkaPublisher(hrn);
}
@Override
public void close() throws Exception {
// close publisher
publisher.terminate();
}
@Override
public Tuple2<Partition, byte[]> map(Partition partition) throws Exception {
byte[] data = "some data".getBytes();
Iterator<PendingPartition> pendingPartitions = getPendingPartitions();
// publish partitions into streaming layer
FlinkKafkaPublisher.apply(hrn, pendingPartitions);
return new Tuple2<Partition, byte[]>(partition, data);
}
}Updated 21 days ago