How to integrate Flink connector with interactive map layers
How to integrate Flink connector with interactive map layers
Create table sink and table source for interactive map layer
The main entry point of the Flink Connector API is OlpStreamConnectorHelper.
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelperimport com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;An instance of OlpStreamConnectorHelper used to create flink.table.api.Schema
and build SQL statement. The following code snippet shows how to create an
instance of OlpStreamConnectorHelper, build flink.table.api.Schema and create
table with given schema and options:
// define the properties
val sourceProperties =
Map(
"olp.connector.mode" -> "read",
"olp.connector-refresh-interval" -> "-1", // no continuous reading
"olp.connector.metadata-columns" -> "true"
)
// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), inputLayerId, sourceProperties)
// register the Table Source
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));The source factory supports the following properties for interactive map sources:
olp.connector.mode: readolp.layer.query: specifies an RSQL query that is used to query the volatile layer. If it is not defined or empty, all features within web mercator bounds (latitude between -85.05° and +85.05°) will be read.olp.connector-refresh-interval: interval in milliseconds for detecting changes in the layer. Use a value of -1 to disable continuous reading. The default value is 60000.olp.connector.max-features-per-request: limits the number of features requested from the interactive map layer by the connector in a single call. Adjust this if the layer contains very big features, default is 10000.olp.connector.ignore-invalid-partitions: if there are more features than allowed (via olp.connector.max-features-per-request property) at the same spot in the highest zoom level, the connector will not try to load them if set to true. The default value is false.olp.connector.download-parallelism: the maximum number of read requests executed in parallel in one flink task. As the number of tasks corresponds to the configured parallelism, the number of read requests the pipeline can execute in parallel is the value of this property multiplied by the task parallelism. The default value is 5.olp.connector.download-timeout: the overall timeout in milliseconds that is applied for reading from the interactive map layer. The default value is 300000.
You create a Table Sink the same way as Source with
OlpStreamConnectorHelper:
// define the properties
val sinkProperties =
Map(
"olp.connector.mode" -> "write",
"olp.connector.metadata-columns" -> "true"
)
// create the Table Connector Descriptor Sink
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(outputCatalogHrn), outputLayerId, sinkProperties)
// register the Table Sink
tEnv.executeSql(
s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));The sink factory supports the following properties for interactive map sinks:
olp.connector.mode: writeolp.connector.aggregation-window: defines the time interval in milliseconds for updating the interactive map layer with features modified by the pipeline. The default value is 1000 milliseconds.olp.connector.upload-parallelism: the maximum number of write requests executed in parallel in one flink task. As the number of tasks corresponds to the configured parallelism, the number of read requests the pipeline can execute in parallel is the value of this property multiplied by the task parallelism. The default value is 5.olp.connector.upload-timeout: the overall timeout in milliseconds that is applied for writing to the interactive map layer. The default value is 300000.
For a general description of the flink connector configuration, see here.
Data schemas
Before creating a table schema, the helper fetches the catalog configuration
using the passed HRN. Then it checks the data format for the passed layerId.
As the last step, Flink Connector automatically derives flink.table.api.Schema.
Table Source and Sink have different schemas for the same interactive map layer:
Table source schema contains all columns
root
|-- geometry: ROW<`type` STRING, `coordinates` STRING>
|-- properties: MAP<STRING, STRING>
|-- customMembers: MAP<STRING, STRING>
|-- mt_id: STRING
|-- mt_tags: ARRAY<STRING>
|-- mt_datahub: ROW<`mt_updatedAt` BIGINT, `mt_createdAt` BIGINT>
Table sink schema contains writable columns only
root
|-- geometry: ROW<`type` STRING, `coordinates` STRING>
|-- properties: MAP<STRING, STRING>
|-- customMembers: MAP<STRING, STRING>
|-- mt_id: STRING
|-- mt_tags: ARRAY<STRING>
The columns are derived from geojson feature fields
| Column | Description |
|---|---|
| geometry.type | Point, LineString, Polygon, MultiPoint, MultiLineString or MultiPolygon |
| geometry.coordinates | Stringified geojson coordinates. For the meaning of geojson coordinates, see chapter Geo-coordinates. |
| properties | Map of geojson feature's properties, each value as stringified json |
| customMembers | Map of members not described in geojson specification, each value as stringified json |
| mt_id | Feature id |
| mt_tags | Array of tags associated with the feature |
| mt_datahub.mt_updatedAt | Unix timestamp (in milliseconds) of last update in interactive map layer |
| mt_datahub.mt_createdAt | Unix timestamp (in milliseconds) of creation in interactive map layer |
SQL functions to encode or decode JSON text
Stringified JSON can be converted to typed data by user defined functions
tEnv.executeSql(
"CREATE FUNCTION fromJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.FromJsonString'")
tEnv.executeSql(
"CREATE FUNCTION toJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.ToJsonString'")tEnv.createTemporaryFunction("fromJsonString", FromJsonString.class);
tEnv.createTemporaryFunction("toJsonString", ToJsonString.class);Flink SQL examples
Read interactive map data
val table = tEnv.sqlQuery("SELECT fromJsonString(geometry.coordinates) FROM InputTable")tEnv.sqlQuery("SELECT fromJsonString(geometry.coordinates) FROM InputTable");Write interactive map data
// column order is geometry, properties, customMembers, mt_id, mt_tags
tEnv.executeSql("""INSERT OVERWRITE OutputTable VALUES
|(('Point','[8.0, 50.0]'),
| MAP[
| 'p1', toJsonString(1),
| 'p2', toJsonString('value2'),
| 'p3', '{
| "a": false,
| "b": 3.2,
| "c": "p3.3"
| }'
| ],
| MAP['r1', toJsonString(true)],
| 'id1',
| ARRAY['tagA','tagB']
|)""".stripMargin)// column order is geometry, properties, customMembers, mt_id, mt_tags
tEnv.executeSql(
"INSERT OVERWRITE OutputTable VALUES "
+ "(('Point','[8.0, 50.0]'), "
+ " MAP[ "
+ " 'p1', toJsonString(1), "
+ " 'p2', toJsonString('value2'), "
+ " 'p3', '{ "
+ " \"a\": false, "
+ " \"b\": 3.2, "
+ " \"c\": \"p3.3\" "
+ " }' "
+ " ], "
+ " MAP['r1', toJsonString(true)], "
+ " 'id1', "
+ " ARRAY['tagA','tagB'] "
+ ")");Read and write interactive map data
tEnv.executeSql(
"INSERT INTO OutputTable " +
"SELECT geometry, properties, customMembers, mt_id, mt_tags FROM InputTable")tEnv.executeSql(
"INSERT INTO OutputTable "
+ "SELECT geometry, properties, customMembers, mt_id, mt_tags FROM InputTable");Updated 22 days ago