How to integrate Flink connector with version layers
How to integrate Flink connector with version layers
Create table source for version layer
The main entry point of the Flink Connector API is OlpStreamConnectorHelper.
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelperimport com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;An instance of OlpStreamConnectorHelper used to create flink.table.api.Schema
and build SQL statement. The following code snippet shows how to create an
instance of OlpStreamConnectorHelper, build flink.table.api.Schema and create
table with given schema and options:
// define the properties
val properties = Map(
"olp.layer.query" -> "mt_version==LATEST"
)
// create the Table Source
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-versioned-layer", properties)
// register the Table Source
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));The source factory supports the following properties for Version layers:
olp.layer.query: a string written in the RSQL query language to query the version layer. If it is not defined, the value "mt_version==LATEST" will be used by default, and it would mean that all the partitions present in the latest version will be read.olp.connector.download-parallelism: the maximum number of blobs that are being read in parallel in one flink task. The number of tasks corresponds to the set parallelism. As a result, the number of blobs that your pipeline can read in parallel equals the parallelism level times the value of this property. The default value is 10.olp.connector.download-timeout: the overall timeout in milliseconds that is applied when reading a blob from the Blob API. The default value is 300000 milliseconds.
Before creating a table schema, the helper fetches the catalog configuration
using the passed HRN. Then it checks the data format and schema if it exists
for the passed layerId. As the last step, Flink Connector automatically
translates the layer schema into a flink.table.api.Schema.
Data formats
The Flink Connector supports the following data formats for Version layer payload:
- Raw. The decoding and encoding logic is not applied and you get your data payload as an array of bytes. Your Table schema appears as follows:
root
|-- data: Array[Byte]
|-- mt_partition: String
|-- mt_version: Long
|-- mt_timestamp: Long
|-- mt_checksum: String
|-- mt_crc: String
|-- mt_dataSize: Long
|-- mt_compressedDataSize: Long
The column with the payload data is called data. The metadata columns follow
the data column and have the mt_ prefix. In the list of metadata columns,
only mt_partition is a required column. The rest are optional and you can
use null as a value for them.
This format is used if your layer content type is configured as
application/octet-stream.
- Protobuf. Flink uses the attached Protobuf schema (that you specify in your layer configuration) to derive a Flink Table schema.
root
|-- protobuf_field_1: String
|-- protobuf_field_2: String
|-- probobuf_field_3.nested_column: Long
|-- ...
|-- mt_partition: String
|-- mt_version: Long
|-- mt_timestamp: Long
|-- mt_checksum: String
|-- mt_crc: String
|-- mt_dataSize: Long
|-- mt_compressedDataSize: Long
The Flink Connector puts the top level protobuf fields as the top level Row
columns, then the metadata columns follow.
This format is used if your layer content type is configured as
application/x-protobuf and you have a specified schema. If the schema is not
specified, an error will be thrown.
NoteSelf-referencing protobuf fields are not supported because there is no way to represent them in the Flink TypeInformation-based schema.
- Avro. Flink uses the passed Avro schema (that you specify in the factory Map) to derive a Flink Table schema.
root
|-- avro_field_1: String
|-- avro_field_2: String
|-- ...
|-- mt_partition: String
|-- mt_version: Long
|-- mt_timestamp: Long
|-- mt_checksum: String
|-- mt_crc: String
|-- mt_dataSize: Long
|-- mt_compressedDataSize: Long
The Flink Connector puts the top level Avro fields as the top level Row
columns, then the metadata columns follow.
This format is used if your layer content type is configured as
application/x-avro-binary and you have a specified schema. If the schema is
not specified, an error will be thrown.
WARNING: New version of connector is not support metadata columns for avro data type.
- Parquet. Flink uses the passed Avro schema (that you specify in the factory Map) to derive a Flink Table schema.
root
|-- parquet_field_1: String
|-- parquet_field_2: String
|-- ...
|-- mt_partition: String
|-- mt_version: Long
|-- mt_timestamp: Long
|-- mt_checksum: String
|-- mt_crc: String
|-- mt_dataSize: Long
|-- mt_compressedDataSize: Long
The Flink Connector puts the top level parquet fields as the top level Row
columns, then the metadata columns follow.
This format is used if your layer content type is configured as
application/x-parquet and you have a specified schema. If the schema is not
specified, an error will be thrown.
WARNING: New version of connector is not support metadata columns for parquet data type.
The Apache parquet-avro module expects the hadoop client to be available in the class path.
The hadoop client is not provided by the streaming environment at the moment. As a result, if you want to use the parquet-format you have to include the hadoop client dependency in your fat jar:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>libraryDependencies ++=
Seq("org.apache.hadoop" % "hadoop-client" % "2.7.3" exclude ("org.apache.htrace", "htrace-core"))-
Other formats
If your layer uses a format other than the described formats, an error will be thrown.
Table Source have the same schema for the same layer.
You can always print your Table schema using the standard Flink API:
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema()// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema();Read raw data
Using SQL:
/// [create-table-source]
// define the properties
val properties = Map(
"olp.layer.query" -> "mt_version==LATEST"
)
// create the Table Source
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-versioned-layer", properties)
// register the Table Source
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
/// [create-table-source]// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
/// [create-table-source-java]
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
/// [create-table-source-java]
Table result =
tEnv.sqlQuery(
"SELECT tileId, mt_timestamp, mt_checksum, mt_dataSize, mt_crc FROM InputTable");Read protobuf data
Using SQL:
val sourceProperties = Map[String, String]("olp.layer.query" -> "mt_version==LATEST",
"olp.connector.metadata-columns" -> "true")
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-versioned-layer", sourceProperties)
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
tEnv.from("InputTable").printSchema()
val result: Table = tEnv.sqlQuery("""
SELECT
tileId,
mt_timestamp,
mt_checksum,
mt_dataSize,
mt_crc
FROM InputTable
""")
tEnv
.toDataStream(result)
.print()StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
Table result =
tEnv.sqlQuery(
"SELECT tileId, messages, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");
DataStream<Row> stream = tEnv.toDataStream(result);
stream.print();Read Avro data
Using SQL:
val tEnv = StreamTableEnvironment.create(env)
val layerSchema =
"""
{
"type" : "record",
"name" : "Event",
"namespace" : "my.example",
"fields" : [
{"name" : "city", "type" : "string"},
{"name" : "event_timestamp", "type" : "long"},
{"name" : "latitude", "type" : "double"},
{"name" : "longitude", "type" : "double"}
]
}
"""
val sourceProperties = Map(
"olp.catalog.layer-schema" -> layerSchema,
"olp.layer.query" -> "mt_version==LATEST"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "version-layer-avro-output", sourceProperties)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
val result: Table = tEnv.sqlQuery("""
SELECT
city,
event_timestamp,
latitude,
longitude
FROM InputTable
""")
tEnv
.toDataStream(result)
.print()StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
Table result = tEnv.sqlQuery("SELECT 'Berlin', refs, messages FROM InputTable");
DataStream<Row> stream = tEnv.toDataStream(result);
stream.print();Read Parquet data
Using SQL:
val tEnv = StreamTableEnvironment.create(env)
val inputLayerSchema = """
{
"type" : "record",
"name" : "Event",
"namespace" : "my.example",
"fields" : [
{"name" : "event_timestamp", "type" : "long"},
{"name" : "latitude", "type" : "double"},
{"name" : "longitude", "type" : "double"}
]
}
"""
val sourceProperties = Map(
"olp.catalog.layer-schema" -> inputLayerSchema,
"olp.layer.query" -> "mt_version==LATEST"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn),
"version-layer-parquet-input",
sourceProperties)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
val result: Table = tEnv.sqlQuery("""
SELECT
city,
event_timestamp,
latitude,
longitude
FROM InputTable
""")
tEnv
.toDataStream(result)
.print()StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
Table result = tEnv.sqlQuery("SELECT 'Berlin', refs, messages FROM InputTable");
DataStream<Row> stream = tEnv.toDataStream(result);
stream.print();Updated 22 days ago