Read and Write Objectstore Data
Read and Write to the Objectstore layer
Read data from the Objectstore Layer into DataStream
The snippet below shows how to retrieve data from the Objectstore layer in a catalog.
// create dataclient
val client = new FlinkDataClient()
val readEngine = client.readEngine(hrn)
// keys in Objectstore
val key1 = "someKey1"
val key2 = "someKey2"
// Read objects as DataStream
val objects: DataStream[Array[Byte]] = {
val seq = Seq(
readEngine.getObjectDataAsBytes2(objectStoreLayer, key1, applyDecompression = false),
readEngine.getObjectDataAsBytes2(objectStoreLayer, key2, applyDecompression = false)
)
env.fromData(seq: _*)
}FlinkDataClient client = new FlinkDataClient();
FlinkReadEngine readEngine = client.readEngine(hrn);
// keys in Objectstore
String key1 = "someKey1";
String key2 = "someKey2";
// Read objects as DataStream
List<byte[]> dataCollection = new ArrayList<>();
dataCollection.add(
readEngine.getObjectDataAsBytes2(objectStoreLayer, key1, false, ByteRange.all()));
dataCollection.add(
readEngine.getObjectDataAsBytes2(objectStoreLayer, key2, false, ByteRange.all()));
DataStream<byte[]> objects = env.fromData(dataCollection.toArray(new byte[0][]));Upload data to Objectstore layer
The snipper below shows how to upload data to the Objectstore layer.
// given a stream of objects to be uploaded
val objects: DataStream[ObjectStoreMessage] =
streamOfObjectStoreMessages()
// add our custom sink to upload data
objects.sinkTo(new UploadObjectStoreDataSink(hrn, objectStoreLayer))
/** Sink with access to DataClient.
*
* This example writes an object into the Objectstore layer */
class UploadObjectStoreDataSink(hrn: HRN, layerId: String)
extends Sink[ObjectStoreMessage]
with Serializable {
override def createWriter(context: WriterInitContext): SinkWriter[ObjectStoreMessage] =
new SinkWriter[ObjectStoreMessage] {
@transient
private lazy val dataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val writeEngine: FlinkWriteEngine =
dataClient.writeEngine(hrn)
override def write(value: ObjectStoreMessage, context: SinkWriter.Context): Unit = {
val key = value.key
val data = value.data
writeEngine.uploadObject2(layerId, key, data)
}
override def flush(endOfInput: Boolean): Unit = ()
override def close(): Unit =
dataClient.terminate()
}
}
case class ObjectStoreMessage(key: String, data: NewPartition.Blob)/**
* Sink with access to DataClient.
*
* <p>This example writes an object into the Objectstore layer
*/
class UploadObjectStoreDataSink implements Sink<JObjectStoreMessage>, Serializable {
private final HRN hrn;
private final String layerId;
public UploadObjectStoreDataSink(HRN hrn, String layerId) {
this.hrn = hrn;
this.layerId = layerId;
}
@Override
public SinkWriter<JObjectStoreMessage> createWriter(WriterInitContext context) {
return new SinkWriter<JObjectStoreMessage>() {
private transient FlinkDataClient dataClient = new FlinkDataClient();
private transient FlinkWriteEngine writeEngine = dataClient.writeEngine(hrn);
@Override
public void write(JObjectStoreMessage message, Context context) {
writeEngine.uploadObject2(
layerId, message.getKey(), message.getData(), Optional.empty(), Optional.empty());
}
@Override
public void flush(boolean endOfInput) {}
@Override
public void close() {
// terminate DataClient
dataClient.terminate();
}
};
}
}
// given a stream of PendingPartitions to be uploaded
DataStream<JObjectStoreMessage> objects = getObjectStreamMessages();
// add our custom sink to upload data
objects.sinkTo(new UploadObjectStoreDataSink(hrn, objectStoreLayer));
public class JObjectStoreMessage {
private String key;
private NewPartition.Blob data;
public JObjectStoreMessage(String key, NewPartition.Blob data) {
this.key = key;
this.data = data;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public NewPartition.Blob getData() {
return data;
}
public void setData(NewPartition.Blob data) {
this.data = data;
}
}Updated 22 days ago