How to use Flink connector to read and write data
How to use Flink connector to read and write data
Objectives: Understand how to use the Flink Connector to read and write data from different layers and data formats in a catalog.
Complexity: Beginner
Time to complete: 40 min
Prerequisites: Organize your work in projects
Source code: Download
The examples in this tutorial demonstrate how to use the Flink Connector provided by the Data Client Library. This provides support for interacting with Flink for stream processing workloads, allowing the use of all standard APIs and functions in Flink to read, write and delete data. For batch processing workloads, you should use the provided Spark Connector instead.
In the main part of the tutorial, we will cover the following usages:
- Subscribe to streaming layer in protobuf format
- Transform data from Table API to DataStream API and change the structure
- Print data from stream and use index layer as destination for stream
- Transfer data from one layer to another with different types
As a preparation step, you will need to create your data destination catalog with appropriate layers, so that these are in place when it comes to the main part of this tutorial. The dataset used will be sourced from the HERE Sample SDII Messages catalog, and contains simulated streaming sensor data in form of SDII messages.
Set up the Maven project
Create the following folder structure for the project:
flink-connector
└── src
└── main
├── java
└── resources
└── scala
You can do this with a single bash command:
mkdir -p flink-connector/src/main/{java,resources,scala}Create the output catalog
Create a file named pipeline-config.conf, and populate it with the contents
below, replacing $YOUR_OUTPUT_CATALOG_HRN with the
HRN to the catalog you created in Organize your work in projects.
pipeline.config {
output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }
input-catalogs {
sensorData { hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2" }
}
}
The Maven POM file is similar to the file in the Verify Maven Settings example. The parent POM and dependencies sections are updated as follows:
Parent POM:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-stream-bom_2.13</artifactId>
<version>2.85.8</version>
<relativePath/>
</parent>
Dependencies:
<dependencies>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>flink-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.flinkextended</groupId>
<artifactId>flink-scala-api-2_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
You will need to create the output catalog. You can perform these steps using the OLP Command Line Interface (CLI).
You should use a unique identifier name for the catalog, for example $YOUR_USERNAME-flink-connector-output.
For the output catalog, you can name the file flink-connector-ouput.json with the contents below.
{
"id": "flink-connector-output",
"name": "Simulated sensor data archive (from tutorial) flink-connector-output",
"summary": "Archive of simulated sensor data for the FlinkConnector tutorial",
"description": "Archive of simulated sensor data",
"tags": [
"Tutorial",
"Simulated"
],
"layers": [
{
"id": "volatile-layer-avro-data",
"name": "volatile-layer-avro-data",
"summary": "Simulated sensor data for the FlinkConnector tutorial",
"description": "Simulated sensor data for the FlinkConnector tutorial",
"contentType": "application/x-avro-binary",
"layerType": "volatile",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "index-layer-parquet-data",
"name": "index-layer-parquet-data",
"summary": "Simulated sensor data for the FlinkConnector tutorial",
"description": "Simulated sensor data for the FlinkConnector tutorial",
"contentType": "application/x-parquet",
"layerType": "index",
"indexProperties": {
"indexDefinitions": [
{
"name": "tile_id",
"type": "int"
},
{
"name": "time_window",
"duration": 600000,
"type": "timewindow"
}
],
"ttl": "unlimited"
},
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
Replace $YOUR_CATALOG_ID below with your own identifier and then run the following
command (on Windows, this works in Cygwin and git bash; otherwise you can run
olp.bat):
olp catalog create {{YOUR_CATALOG_ID}} \
"Simulated sensor data output from tutorial ({{YOUR_USERNAME}})" \
--config flink-connector-output.json \
--scope {{YOUR_PROJECT_HRN}}
NoteIf a billing tag is required in your realm, update the config file by adding the
billingTags: ["YOUR_BILLING_TAG"]property to thelayersection.
Implement the Flink Connector application
This application uses the public data source to read from the stream layer in protobuf data format, performing some transformations on the received data, and writing to the output volatile layer from the previously created catalog.
/*
* Copyright (c) 2018-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.olp.util.quad.factory.HereQuadFactory
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.serializers.ScalaNoneSerializer
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SerializerConfigImpl
import org.apache.flink.configuration.{Configuration, RestartStrategyOptions}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.slf4j.LoggerFactory
import java.time.Duration
object StreamToVolatileLayerScalaPipeline extends App {
private val logger = LoggerFactory.getLogger(StreamToVolatileLayerScalaPipeline.getClass)
private val pipelineContext = new PipelineContext
// Source and output catalogs / layers
val sensorDataCatalogHrn = pipelineContext.config.inputCatalogs("sensorData")
val outputCatalogHrn = pipelineContext.config.outputCatalog
val streamingLayer = "sample-streaming-layer"
val outputVolatileLayer = "volatile-layer-avro-data"
// Configure stream execution environment settings
val env = {
val configuration = new Configuration()
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay")
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS,
Integer.valueOf(2))
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ofMillis(10000L))
StreamExecutionEnvironment.getExecutionEnvironment(configuration)
}
env.setParallelism(1)
env.enableCheckpointing(5000)
env.getConfig.getSerializerConfig
.asInstanceOf[SerializerConfigImpl]
.addDefaultKryoSerializer(None.getClass, classOf[ScalaNoneSerializer])
// Define the properties
val properties = Map(
"olp.kafka.group-name" -> "protobuf-streaming",
"olp.kafka.offset" -> "earliest",
)
// create the Table Connector Descriptor Source
val helper = OlpStreamConnectorHelper(sensorDataCatalogHrn, streamingLayer, properties)
// Register the TableSource
val tEnv = StreamTableEnvironment.create(env)
val schema = helper.prebuiltSchema(tEnv).build()
tEnv.executeSql(s"CREATE TABLE SensorDataTable $schema WITH ${helper.options}")
// Register the user-defined functions to be used in the SQL query
tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction())
tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction())
// Define a Table containing the fields to be used in computing results
val observationsTable: Table =
tEnv.sqlQuery("""SELECT
| assignUuid(envelope.version) AS eventId,
| timeStampUTC_ms AS timestampUtc,
| computeHereTile(latitude_deg, longitude_deg) AS tile
|FROM
| SensorDataTable CROSS JOIN UNNEST(positionEstimate)
|""".stripMargin)
// Domain case classes
case class PositionEvent(eventId: String, timestampUtc: Long, tile: Long)
case class PositionStatistics(timestampUtc: Long, tile: Long, totalObservations: Int)
// Create watermark strategy
val watermarkStrategy: WatermarkStrategy[PositionEvent] = WatermarkStrategy
.forBoundedOutOfOrderness[PositionEvent](Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner[PositionEvent] {
override def extractTimestamp(event: PositionEvent, recordTimestamp: Long): Long =
event.timestampUtc
})
val outputStream: DataStream[PositionStatistics] = tEnv
.toDataStream(observationsTable)
.map(
row =>
PositionEvent(
row.getField(0).asInstanceOf[String], // Adjust field indices accordingly
row.getField(1).asInstanceOf[Long],
row.getField(2).asInstanceOf[Long]
)) // Convert the records to a stream of PositionEvent
.assignTimestampsAndWatermarks(watermarkStrategy) // Define how the event time is assigned
.map(v => (v.timestampUtc, v.tile, 1): (Long, Long, Int))
.keyBy((tuple: (Long, Long, Int)) => tuple._2) // Key by the second element (tile) of the tuple
.window(SlidingEventTimeWindows.of(Duration.ofSeconds(15), Duration.ofSeconds(5))) // Define the time window to use
.sum(2)
.map(PositionStatistics.tupled(_)) // Map the sum result to PositionStatistics
outputStream.print()
tEnv.createTemporaryView("PositionStatsTable", outputStream)
// Define the avro schema in which the output data will be written
val outputAvroSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"},
| {"name" : "timestampUtc", "type" : "long"}
| ]
|}
|""".stripMargin
// Create TableSink for the output
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(outputCatalogHrn,
outputVolatileLayer,
Map("olp.catalog.layer-schema" -> outputAvroSchema))
tEnv.executeSql(
s"CREATE TABLE OutputIndexTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
// Write the result into the output table, indexed by timestamp and HERE tile ID
tEnv.executeSql("""INSERT INTO
| OutputIndexTable
|SELECT
| 'Berlin' AS city,
| totalObservations AS total,
| timestampUtc,
| CAST(tile AS STRING) AS mt_partition
|FROM
| PositionStatsTable
|""".stripMargin)
logger.info(s"Stream to $outputCatalogHrn executed")
env.executeAsync()
}
class ComputeTileFunction() extends ScalarFunction {
private val tileLevel = 14
// java.lang.Double is used to avoid DOUBLE NOT NULL semantics of the function
// https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type-extraction
def eval(latitude: java.lang.Double, longitude: java.lang.Double): Long =
HereQuadFactory.INSTANCE
.getMapQuadByLocation(latitude, longitude, tileLevel)
.getLongKey
}
class AssignUuidFunction() extends ScalarFunction {
def eval(input: String): String =
java.util.UUID.randomUUID.toString
}/*
* Copyright (c) 2018-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.hrn.HRN;
import com.here.olp.util.quad.factory.HereQuadFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.data.client.flink.serializers.ScalaNoneSerializer;
import com.here.platform.pipeline.PipelineContext;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.None$;
public class StreamToVolatileLayerPipeline {
private static final PipelineContext pipelineContext = new PipelineContext();
// Source for the sensor data to be used as input
private static final HRN sensorDataCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("sensorData");
private static final HRN outputCatalogHrn = pipelineContext.getConfig().getOutputCatalog();
private static final String streamingLayer = "sample-streaming-layer";
private static final String outputIndexLayer = "volatile-layer-avro-data";
public static void main(String[] args) throws Exception {
Logger logger = LoggerFactory.getLogger(StreamToVolatileLayerPipeline.class);
// Configure stream execution environment settings
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS, 2);
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(10000L));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
env.enableCheckpointing(5000);
SerializerConfigImpl serializerConfig =
(SerializerConfigImpl) env.getConfig().getSerializerConfig();
serializerConfig.registerTypeWithKryoSerializer(None$.class, ScalaNoneSerializer.class);
// Define the properties
Map<String, String> properties = new HashMap<>();
properties.put("olp.kafka.group-name", "protobuf-streaming");
properties.put("olp.kafka.offset", "earliest");
// create the Table Connector Descriptor Source
OlpStreamConnectorHelper helper =
OlpStreamConnectorHelper.create(sensorDataCatalogHrn, streamingLayer, properties);
// Register the TableSource
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Schema schema = helper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE SensorDataTable %s WITH %s", schema, helper.options()));
// Register the user-defined functions to be used in the SQL query
tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction());
tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction());
// Define a Table containing the fields to be used in computing results
Table observationsTable =
tEnv.sqlQuery(
"SELECT"
+ " assignUuid(envelope.version) AS eventId, "
+ " timeStampUTC_ms AS timestampUtc, "
+ " computeHereTile(latitude_deg, longitude_deg) AS tile "
+ "FROM SensorDataTable "
+ "CROSS JOIN UNNEST(positionEstimate)");
// Create watermark strategy
WatermarkStrategy<Tuple3<String, Long, Long>> watermarkStrategy =
WatermarkStrategy.<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(
Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.f1);
// Compute values for the number of observations by tile and time window
DataStream<PositionStatistics> outputStream =
tEnv.toAppendStream(
observationsTable,
new TupleTypeInfo<Tuple3<String, Long, Long>>(Types.STRING, Types.LONG, Types.LONG))
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(
new MapFunction<Tuple3<String, Long, Long>, Tuple3<Long, Long, Integer>>() {
@Override
public Tuple3<Long, Long, Integer> map(Tuple3<String, Long, Long> observation) {
return new Tuple3<>(observation.f1, observation.f2, 1);
}
})
.keyBy(selector -> selector.f1)
.window(SlidingEventTimeWindows.of(Duration.ofSeconds(15), Duration.ofSeconds(5)))
.sum(2)
.map(
new MapFunction<Tuple3<Long, Long, Integer>, PositionStatistics>() {
@Override
public PositionStatistics map(Tuple3<Long, Long, Integer> result) {
return new PositionStatistics(result.f0, result.f1, result.f2);
}
});
outputStream.print();
tEnv.createTemporaryView("PositionStatsTable", outputStream);
// Define the avro schema in which the output data will be written
String outputAvroSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"},\n"
+ " {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
+ " ]\n"
+ " }\n";
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputAvroSchema);
// Create TableSink for the output
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(outputCatalogHrn, outputIndexLayer, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format(
"CREATE TABLE OutputIndexTable %s WITH %s", sinkSchema, sinkHelper.options()));
// Write the result into the output table, indexed by timestamp and HERE tile ID
tEnv.executeSql(
"INSERT INTO"
+ " OutputIndexTable "
+ "SELECT"
+ " 'Berlin' AS city,"
+ " totalObservations AS total,"
+ " timestampUtc,"
+ " CAST(tile AS STRING) AS mt_partition "
+ "FROM"
+ " PositionStatsTable");
logger.info("Stream to {} executed", outputCatalogHrn);
env.execute();
}
public static class ComputeTileFunction extends ScalarFunction {
private final int tileLevel = 14;
public long eval(Double latitude, Double longitude) {
return HereQuadFactory.INSTANCE
.getMapQuadByLocation(latitude, longitude, tileLevel)
.getLongKey();
}
}
public static class AssignUuidFunction extends ScalarFunction {
public String eval(String input) {
return UUID.randomUUID().toString();
}
}
public static class PositionStatistics {
public long tile;
public long timestampUtc;
public int totalObservations;
public PositionStatistics() {}
public PositionStatistics(long timestampUtc, long tile, int totalObservations) {
this.tile = tile;
this.timestampUtc = timestampUtc;
this.totalObservations = totalObservations;
}
@Override
public String toString() {
return tile + " " + timestampUtc + " " + totalObservations;
}
}
}Another example uses the volatile layer filled in the previous step to read the data in avro data format, performing some transformations on the received data, and writing to the output index layer from the previously created catalog.
/*
* Copyright (c) 2018-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.serializers.ScalaNoneSerializer
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.serialization.SerializerConfigImpl
import org.apache.flink.configuration.{Configuration, RestartStrategyOptions}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.slf4j.LoggerFactory
import java.time.Duration
object VolatileToIndexLayerScalaPipeline extends App {
private val logger = LoggerFactory.getLogger(VolatileToIndexLayerScalaPipeline.getClass)
private val pipelineContext = new PipelineContext
// Source and output catalogs / layers
val catalogHrn = pipelineContext.config.outputCatalog
val inputVolatileLayer = "volatile-layer-avro-data"
val outputIndexLayer = "index-layer-parquet-data"
// Configure stream execution environment settings
val env = {
val configuration = new Configuration()
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay")
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS,
Integer.valueOf(2))
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ofMillis(10000L))
StreamExecutionEnvironment.getExecutionEnvironment(configuration)
}
env.setParallelism(1)
env.enableCheckpointing(5000)
env.getConfig.getSerializerConfig
.asInstanceOf[SerializerConfigImpl]
.addDefaultKryoSerializer(None.getClass, classOf[ScalaNoneSerializer])
val inputSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"},
| {"name" : "timestampUtc", "type" : "long"}
| ]
|}
|""".stripMargin
val tiles = List("377894440", "377894441", "377894442")
// Define the properties
val properties =
Map(
"olp.layer.query" -> s"mt_partition=in=${tiles.mkString("(", ", ", ")")}",
"olp.catalog.layer-schema" -> inputSchema,
"olp.connector-refresh-interval" -> "-1"
)
// create the Table Connector Descriptor Source
val helper = OlpStreamConnectorHelper(catalogHrn, inputVolatileLayer, properties)
// Register the TableSource
val tEnv = StreamTableEnvironment.create(env)
val schema = helper.prebuiltSchema(tEnv).build()
tEnv.executeSql(s"CREATE TABLE TableSource $schema WITH ${helper.options}")
tEnv.toDataStream(tEnv.from("TableSource")).print()
val outputSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"}
| ]
|}
|""".stripMargin
// Create TableSink for the output
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(catalogHrn,
outputIndexLayer,
Map("olp.catalog.layer-schema" -> outputSchema))
tEnv.executeSql(
s"CREATE TABLE Sink ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql("""INSERT INTO Sink
|SELECT
| city,
| total,
| CAST(mt_partition as BIGINT) as idx_tile_id,
| timestampUtc as idx_time_window
|FROM TableSource
|""".stripMargin)
logger.info(s"Stream to $catalogHrn executed")
env.executeAsync()
}/*
* Copyright (c) 2018-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.hrn.HRN;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.data.client.flink.serializers.ScalaNoneSerializer;
import com.here.platform.pipeline.PipelineContext;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.None$;
public class VolatileToIndexLayerPipeline {
private static final PipelineContext pipelineContext = new PipelineContext();
// Source and output catalogs / layers
private static final HRN catalogHrn = pipelineContext.config().getOutputCatalog();
private static final String inputVolatileLayer = "volatile-layer-avro-data";
private static final String outputIndexLayer = "index-layer-parquet-data";
public static void main(String[] args) throws Exception {
Logger logger = LoggerFactory.getLogger(VolatileToIndexLayerPipeline.class);
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS, 2);
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(10000L));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(1);
env.enableCheckpointing(5000);
SerializerConfigImpl serializerConfig =
(SerializerConfigImpl) env.getConfig().getSerializerConfig();
serializerConfig.registerTypeWithKryoSerializer(None$.class, ScalaNoneSerializer.class);
String inputAvroSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"},\n"
+ " {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
+ " ]\n"
+ " }\n";
String[] tiles = (new String[] {"377894440", "377894441", "377894442"});
String tilesQueryString = Arrays.stream(tiles).collect(Collectors.joining(", ", "(", ")"));
Map<String, String> properties = new HashMap<>();
properties.put("olp.layer.query", "mt_partition=in=" + tilesQueryString);
properties.put("olp.catalog.layer-schema", inputAvroSchema);
properties.put("olp.connector-refresh-interval", "-1");
// create the Table Connector Descriptor Source
OlpStreamConnectorHelper helper =
OlpStreamConnectorHelper.create(catalogHrn, inputVolatileLayer, properties);
// Register the TableSource
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Schema schema = helper.prebuiltSchema(tEnv).build();
tEnv.executeSql(String.format("CREATE TABLE TableSource %s WITH %s", schema, helper.options()));
tEnv.toDataStream(tEnv.from("TableSource"), Row.class).print();
// Define the parquet schema in which the output data will be written
String outputParquetSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"}\n"
+ " ]\n"
+ " }\n";
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputParquetSchema);
// Create TableSink for the output
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(catalogHrn, outputIndexLayer, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE Sink %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql(
"INSERT INTO Sink "
+ "SELECT"
+ " city,"
+ " total,"
+ " CAST(mt_partition as BIGINT) as idx_tile_id,"
+ " timestampUtc as idx_time_window "
+ "FROM TableSource");
logger.info("Stream to {} executed", catalogHrn);
env.execute();
}
}Compile and run locally
To run the application locally, execute the following command:
mvn compile exec:exec \
-Dexec.args="-cp %classpath -Dpipeline-config.file=pipeline-config.conf -Dpipeline-job.file=pipeline-job.conf StreamToVolatileLayerScalaPipeline"mvn compile exec:exec \
-Dexec.args="-cp %classpath -Dpipeline-config.file=pipeline-config.conf -Dpipeline-job.file=pipeline-job.conf StreamToVolatileLayerPipeline"to run the second application, execute the following command:
mvn compile exec:exec \
-Dexec.args="-cp %classpath -Dpipeline-config.file=pipeline-config.conf -Dpipeline-job.file=pipeline-job.conf VolatileToIndexLayerScalaPipeline"mvn compile exec:exec \
-Dexec.args="-cp %classpath -Dpipeline-config.file=pipeline-config.conf -Dpipeline-job.file=pipeline-job.conf VolatileToIndexLayerPipeline"Note that these applications run continuously and will not exit on their own.
To stop execution, you need to cancel the process manually by pressing Ctrl + C in the terminal.
Further information
For more details on the topics covered in this tutorial, you can refer to the following sources:
- For information on the different layer types and configurations, you can check the Data Service documentation.
- To know more about the interactive querying and manipulation of catalogs, you should check the OLP CLI documentation and the Data section.
- For details on the Scala and Java APIs to access the services, you can check the Data Client Library Developer Guide.
Updated 19 days ago