GuidesChangelogData Inspector Library API Reference
Guides

Read and Write Objectstore Data

Read and Write to the Objectstore layer

Read data from the Objectstore Layer into DataStream

The snippet below shows how to retrieve data from the Objectstore layer in a catalog.

// create dataclient
val client = new FlinkDataClient()
val readEngine = client.readEngine(hrn)

// keys in Objectstore
val key1 = "someKey1"
val key2 = "someKey2"

// Read objects as DataStream
val objects: DataStream[Array[Byte]] = {
  val seq = Seq(
    readEngine.getObjectDataAsBytes2(objectStoreLayer, key1, applyDecompression = false),
    readEngine.getObjectDataAsBytes2(objectStoreLayer, key2, applyDecompression = false)
  )
  env.fromData(seq: _*)
}
FlinkDataClient client = new FlinkDataClient();
FlinkReadEngine readEngine = client.readEngine(hrn);

// keys in Objectstore
String key1 = "someKey1";
String key2 = "someKey2";

// Read objects as DataStream
List<byte[]> dataCollection = new ArrayList<>();
dataCollection.add(
    readEngine.getObjectDataAsBytes2(objectStoreLayer, key1, false, ByteRange.all()));
dataCollection.add(
    readEngine.getObjectDataAsBytes2(objectStoreLayer, key2, false, ByteRange.all()));
DataStream<byte[]> objects = env.fromData(dataCollection.toArray(new byte[0][]));

Upload data to Objectstore layer

The snipper below shows how to upload data to the Objectstore layer.

// given a stream of objects to be uploaded
val objects: DataStream[ObjectStoreMessage] =
  streamOfObjectStoreMessages()

// add our custom sink to upload data
objects.sinkTo(new UploadObjectStoreDataSink(hrn, objectStoreLayer))
/** Sink with access to DataClient.
  *
  * This example writes an object into the Objectstore layer */
class UploadObjectStoreDataSink(hrn: HRN, layerId: String)
    extends Sink[ObjectStoreMessage]
    with Serializable {

  override def createWriter(context: WriterInitContext): SinkWriter[ObjectStoreMessage] =
    new SinkWriter[ObjectStoreMessage] {
      @transient
      private lazy val dataClient: FlinkDataClient =
        new FlinkDataClient()

      @transient
      private lazy val writeEngine: FlinkWriteEngine =
        dataClient.writeEngine(hrn)

      override def write(value: ObjectStoreMessage, context: SinkWriter.Context): Unit = {
        val key = value.key
        val data = value.data
        writeEngine.uploadObject2(layerId, key, data)
      }

      override def flush(endOfInput: Boolean): Unit = ()

      override def close(): Unit =
        dataClient.terminate()
    }
}
case class ObjectStoreMessage(key: String, data: NewPartition.Blob)
/**
 * Sink with access to DataClient.
 *
 * <p>This example writes an object into the Objectstore layer
 */
class UploadObjectStoreDataSink implements Sink<JObjectStoreMessage>, Serializable {
  private final HRN hrn;
  private final String layerId;

  public UploadObjectStoreDataSink(HRN hrn, String layerId) {
    this.hrn = hrn;
    this.layerId = layerId;
  }

  @Override
  public SinkWriter<JObjectStoreMessage> createWriter(WriterInitContext context) {
    return new SinkWriter<JObjectStoreMessage>() {
      private transient FlinkDataClient dataClient = new FlinkDataClient();
      private transient FlinkWriteEngine writeEngine = dataClient.writeEngine(hrn);

      @Override
      public void write(JObjectStoreMessage message, Context context) {
        writeEngine.uploadObject2(
            layerId, message.getKey(), message.getData(), Optional.empty(), Optional.empty());
      }

      @Override
      public void flush(boolean endOfInput) {}

      @Override
      public void close() {
        // terminate DataClient
        dataClient.terminate();
      }
    };
  }
}
// given a stream of PendingPartitions to be uploaded
DataStream<JObjectStoreMessage> objects = getObjectStreamMessages();

// add our custom sink to upload data
objects.sinkTo(new UploadObjectStoreDataSink(hrn, objectStoreLayer));
public class JObjectStoreMessage {
  private String key;
  private NewPartition.Blob data;

  public JObjectStoreMessage(String key, NewPartition.Blob data) {
    this.key = key;
    this.data = data;
  }

  public String getKey() {
    return key;
  }

  public void setKey(String key) {
    this.key = key;
  }

  public NewPartition.Blob getData() {
    return data;
  }

  public void setData(NewPartition.Blob data) {
    this.data = data;
  }
}