How to read and write indexed data
How to read and write indexed data
Read partitions from index layers into DataStream
To retrieve data from the index layer, you must first call the method
QueryApi.queryIndex. QueryApi.queryIndex returns the IndexPartitions that
match a given query. To query indexed data, you must provide some search
criteria in the Query an Index layer.
The snippet below shows query an index layer in a catalog.
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)
// RSQL expression
val queryString = "intkey==42"
// query an index layer
val indexPartitions: DataStream[IndexPartition] = {
// with some query string and all parts
env.fromSource(queryApi.queryIndex(indexLayer, Some(queryString)),
WatermarkStrategy.noWatermarks(),
"Source")
// -- or -- with default query ("timestamp=ge=0") and all parts
env.fromSource(queryApi.queryIndex(indexLayer), WatermarkStrategy.noWatermarks(), "Source")
// -- or -- with some query string and specific part
env.fromSource(queryApi.queryIndex(indexLayer, Some(queryString), partId = Some("123")),
WatermarkStrategy.noWatermarks(),
"Source")
}FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);
// RSQL expression
String queryString = "intKey==42";
// query to an index layer
DataStream<IndexPartition> partitions;
// with some query string and all parts
partitions =
env.fromSource(
queryApi.queryIndex(indexLayer, Optional.of(queryString)),
WatermarkStrategy.noWatermarks(),
"query-index");
// -- or -- with default query ("timestamp=ge=0") and all parts
partitions =
env.fromSource(
queryApi.queryIndex(indexLayer, Optional.empty()),
WatermarkStrategy.noWatermarks(),
"query-index-default");
// -- or -- with some query string and specific part
partitions =
env.fromSource(
queryApi.queryIndex(indexLayer, Optional.of(queryString), "123"),
WatermarkStrategy.noWatermarks(),
"query-index-part");Notes:
- If the query string is not defined, the value "timestamp=ge=0" will be used by default, and it would mean that all the partitions will be read.
Query index partitions as stream:
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)
// RSQL expression
val queryString = "intkey==42"
// query an index layer
val parts = Some(100)
val indexPartitions: DataStream[IndexPartition] =
queryApi.queryIndexAsStream(indexLayer, Some(queryString), parts)FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);
// RSQL expression
String queryString = "intKey==42";
// query to an index layer
DataStream<IndexPartition> partitions =
queryApi.queryIndexAsStream(env, indexLayer, Optional.of(queryString));Write partitions to index layer
Data published to an index layer is not versioned but is indexed. To publish and
index the data, you must separately write and index the data by calling the
methods WriteEngine.put and PublishApi.publishIndex.
The snippet below uses Flink Window to publish a new partition of an index layer every minute.
/** Map function that upload data and return metadata */
class UploadDataFunction(hrn: HRN)
extends RichMapFunction[PendingPartition, CommitPartition]
with Serializable {
// initialize DataClient
@transient
private lazy val dataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val writeEngine: FlinkWriteEngine =
dataClient.writeEngine(hrn)
// terminate DataClient
override def close(): Unit =
dataClient.terminate()
// read data and publish a tuple with partition and data
override def map(pendingPartition: PendingPartition): CommitPartition =
writeEngine.put(pendingPartition)
}
/** Window function that publish an index for all CommitPartition in the window */
class PublishIndexWindowFunction(hrn: HRN, indexLayer: String)
extends RichAllWindowFunction[CommitPartition, String, TimeWindow]
with Serializable {
// initialize DataClient
@transient
private lazy val flinkDataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val publishApi: FlinkPublishApi =
flinkDataClient.publishApi(hrn)
// terminate DataClient
override def close(): Unit =
flinkDataClient.terminate()
override def apply(window: TimeWindow,
partitions: JIterable[CommitPartition],
out: Collector[String]): Unit = {
publishApi.publishIndex(indexLayer, partitions.iterator.asScala)
out.collect(s"indexing a new partition of $indexLayer layer is a success")
}
}
// given a stream of new partitions
val pendingPartitions: DataStream[PendingPartition] =
getPendingPartitionsStream()
// for each input: upload data and collect metadata
val newPartitions: DataStream[CommitPartition] =
pendingPartitions.map(new UploadDataFunction(hrn))
// group metadata by 1 minute window
val newPartitionsWindow: AllWindowedStream[CommitPartition, TimeWindow] =
newPartitions.windowAll(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
// index new partition on window trigger
val results: DataStream[String] =
newPartitionsWindow.apply(new PublishIndexWindowFunction(hrn, indexLayer))/** Map function that upload data and return metadata */
class UploadDataFunction extends RichMapFunction<PendingPartition, CommitPartition>
implements Serializable {
private HRN hrn;
private transient FlinkDataClient dataClient;
private transient FlinkWriteEngine writeEngine;
public UploadDataFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(OpenContext openContext) throws Exception {
dataClient = new FlinkDataClient();
writeEngine = dataClient.writeEngine(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public CommitPartition map(PendingPartition pendingPartition) throws Exception {
return writeEngine.put(pendingPartition);
}
}
/** Window function that publish an index for all CommitPartition in the window */
class PublishIndexWindowFunction
extends RichAllWindowFunction<CommitPartition, String, TimeWindow> implements Serializable {
private HRN hrn;
private String indexLayer;
private transient FlinkDataClient dataClient;
private transient FlinkPublishApi publishApi;
public PublishIndexWindowFunction(HRN hrn, String indexLayer) {
this.hrn = hrn;
this.indexLayer = indexLayer;
}
@Override
public void open(OpenContext openContext) throws Exception {
dataClient = new FlinkDataClient();
publishApi = dataClient.publishApi(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public void apply(
TimeWindow window, Iterable<CommitPartition> commitPartitions, Collector<String> out)
throws Exception {
publishApi.publishIndex(indexLayer, commitPartitions.iterator());
out.collect("indexing a new partition of " + indexLayer + " layer is a success");
}
}
// given a stream of new partitions
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();
// for each input: upload data and collect metadata
DataStream<CommitPartition> newPartitions = pendingPartitions.map(new UploadDataFunction(hrn));
// group metadata by 1 minute window
AllWindowedStream<CommitPartition, TimeWindow> newPartitionsWindow =
newPartitions.windowAll(TumblingEventTimeWindows.of(Duration.ofMinutes(1)));
// index new partition on window trigger
DataStream<String> results =
newPartitionsWindow.apply(new PublishIndexWindowFunction(hrn, indexLayer));Update and delete partitions of index layer
For updating and deleting partitions of index layer the FlinkDataClient
PublishApi supports the same functions as the DataClient PublishApi. Refer to
the related chapters of DataClient PublishApi for
Update Index Data and
Delete Index Data.
Updated 21 days ago