# Stream processing best practices The HERE platform pipelines use the [Apache Flink](https://flink.apache.org/) framework to implement stream data processing. The following section examines the best practices for developers creating stream processing pipelines for the HERE platform using Flink. ## Flink application development To use the Flink framework during application development, a [Maven Archetype](https://maven.apache.org/guides/introduction/introduction-to-archetypes.html) is available that provides a pre-configured project structure for creating a new stream data processing application. The archetype itself is provided by the [HERE Data SDK for Java & Scala](https://docs.here.com/workspace/docs/tutorials-readme). For more information, see the following guides: * [Develop a Flink application](https://docs.here.com/workspace/docs/tutorials-develop-flink-application-readme) * [Creating pipeline applications](https://docs.here.com/workspace/docs/develop-pipelines#creating-pipeline-applications) The following libraries provided by the [HERE Data SDK for Java & Scala](https://docs.here.com/workspace/docs/tutorials-readme) are designed for use with the Flink applications: * Data Client Library This is a multi-module library that uses both lower-level building blocks and higher-level APIs to provide streaming, backpressure, parallelism support, built-in retries and resumes, asynchronous APIs, and [Akka Streams](https://doc.akka.io/libraries/akka-core/current/stream/index.html) connectors. To run an application that uses the Data Client Library within a stream pipeline, use the `flink-support` module as a dependency in your project. Don't add any other Data Client Library modules to your project, as this will cause dependency conflicts that will cause your stream pipeline to fail. For additional information, see the [Data Client Library Developer Guide](https://docs.here.com/workspace/docs/dcl-readme). * Data Client Base Library This library provides Scala/Java libraries that you can use in your projects to access the HERE platform. It provides a low-level, stateless and thread-safe programmatic access to the platform Data and OLS (Open Location Services) APIs. For additional information, see the [Data Client Base Library Developer Guide](https://docs.here.com/workspace/docs/dcbl-readme). * Data Archiving Library This library assists with archiving messages ingested via a stream layer. It provides a data-agnostic workflow that you can customize to produce and archive stream data in a variety of formats, including [Google Protobuf](https://protobuf.dev/), [Apache Avro](https://avro.apache.org/docs/) and [Apache Parquet](https://parquet.apache.org/docs/). For more information, see the [Data Archiving Library Developer Guide](https://docs.here.com/workspace/docs/dal-readme). * Location Library This is a set of algorithms for location-based analysis, such as navigating a road network, accessing road attributes, and geospatial queries. It is used to create location-based programs that run in batch or stream pipelines. For more information, see the [Location Library Developer Guide](https://docs.here.com/workspace/docs/ll-readme). * Sensor Data Ingestion Platform Although not a library, the Sensor Data Ingestion Platform is a multi-functional web service for collecting and validating data messages from various sensors. The Platform's API allows you to post messages with sensor data to the Sensor Data Ingestion Platform. This data is then available for processing through a stream pipeline. ## Flink checkpointing Checkpointing is required to recover the state of a stream (Flink) pipeline. When enabled, Flink takes consistent snapshots of the pipeline (or, in Flink terms, the job graph) at specified intervals. This feature is disabled by default. To enable checkpointing, use the [`enableCheckpointing(interval)`](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing) method of the [`StreamExecutionEnvironment`](https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html) class, where `interval` is the checkpoint interval in milliseconds. > #### Info > > **Additional Cost**\ > Stream pipelines using checkpointing to read/write data from the underlying storage. It is important to enable checkpointing so that Flink can create savepoints when [pausing](running-a-pipeline#pause-pipeline-version) or [upgrading](managing-pipelines#upgrade-a-pipeline-version) the stream pipeline. A savepoint is a collection of related checkpoints, and if there are no checkpoints, the savepoint is empty. In addition, it is important to set `IDs` for each operator. Flink uses these `IDs` to compare the stored state with the operators upon restart. If the operators do not match, Flink will not be able to restore the state. For more information on how checkpoints work, refer to the following guides: * [Apache Flink documentation on checkpoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/) * [Apache Flink documentation on savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) * [Apache Flink documentation on data streaming fault tolerance](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/checkpointing/) ### Activate a stream pipeline from a snapshot If you are looking for more advanced functionality of the `activate` operation for stream pipeline versions, this is available with the [OLP CLI](https://docs.here.com/workspace/docs/olp-cli-topics-pipeline-workflows). Stream pipelines that use [Stream 7.0](https://docs.here.com/workspace/docs/tutorials-pipeline-environments#environment-for-stream-processing-70) runtime environment can be activated from a previously taken snapshot (or savepoint, in the Flink terminology). As mentioned above, snapshots are taken by the system for stream pipelines during [`pause`](https://docs.here.com/workspace/docs/running-a-pipeline#pause-pipeline-version) and [`upgrade`](managing-pipelines#upgrade-a-pipeline-version) operations. OLP CLI provides the capability to list snapshots available for a particular pipeline using the [`olp pipeline snapshot list`](https://docs.here.com/workspace/docs/olp-cli-topics-pipeline-snapshot-commands#pipeline-snapshot-list) command. During the pipeline version activation, the snapshot ID can be provided to run the pipeline from the state captured by the snapshot. To do this, use the `--snapshot-id` parameter of the [`olp pipeline version activate`](https://docs.here.com/workspace/docs/olp-cli-topics-pipeline-version-commands#pipeline-version-activate) command. ### Externalized checkpoint feature When you develop an application for your [Stream 7.0](https://docs.here.com/workspace/docs/tutorials-pipeline-environments#environment-for-stream-processing-70) pipeline, you can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their metadata out to persistent storage and are not automatically cleaned up when the job fails. This way, you will have a checkpoint around to proceed with if your job fails. To enable externalized checkpoints which are retained after a job cancellation, you need to add the following code to your pipeline application: ```java CheckpointConfig config = env.getCheckpointConfig(); config.setExternalizedCheckpointRetention( ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); ``` Once you have configured your application in this way, externalized checkpoints are stored in the underlying storage by the system - no further configuration is required. You can use such a checkpoint with the `--with-latest-checkpoint` parameter of the [`olp pipeline version activate`](https://docs.here.com/workspace/docs/olp-cli-topics-pipeline-version-commands#pipeline-version-activate) command. For more detail, refer to the [Apache Flink retained checkpoints documentation](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints). ### Incremental checkpointing feature Flink is purpose-built for stateful processing. To provide fault-tolerance, the state must be preserved during checkpointing. For applications with a large state, uploading the entire state on every checkpoint can be slow and resource-intensive. Flink introduced [incremental checkpointing feature](https://flink.apache.org/2018/01/30/managing-large-state-in-apache-flink-an-intro-to-incremental-checkpointing/) to alleviate this problem. With incremental checkpointing enabled, Flink maintains the differences (or ‘delta’) between each checkpoint and stores only the differences between the last checkpoint and the current state. Incremental checkpointing is recommended for the majority of HERE platform Stream pipelines and is enabled by default. Incremental checkpointing has an overhead cost, which sometimes outweighs the benefits provided by the feature. For example, pipelines with a rapidly changing state, the 'delta' between state at checkpoint `n` and `n-1` can be as large as the state at `n`, which erases the benefits of incremental checkpointing altogether. In this case, incremental checkpointing leads to slower checkpointing and recovery times as well as additional data transfer costs when compared to full checkpointing. Beginning with Stream-6, Flink UI's [checkpoint history tab](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/monitoring/checkpoint_monitoring/#history-tab) presents both `Checkpointed Data Size` (data persisted during given checkpoint) and `Full Checkpoint Data Size` (the total size of all data that makes up the checkpoint, including data persisted during previous checkpoints). If the values are consistently close to each other for a given pipeline, we recommend turning off incremental checkpointing for that pipeline via [stream configuration](https://docs.here.com/workspace/docs/stream-processing?isFramePreview=true#stream-configuration): ```properties state.backend.incremental=false ``` ## Flink parallelism and cluster configuration The Flink application consists of several tasks - operators, data sources, data sinks, etc. For execution, a task is split into multiple parallel instances and each parallel instance processes a subset of the task’s input data. The number of parallel instances of a task is called its parallelism. This chapter describes the dependency between the configuration of the stream pipeline cluster and the parallelism of the application. During the [pipeline deployment](https://docs.here.com/workspace/docs/portal-deployment), you have to configure enough cluster resources for your stream pipeline to support the [maximum parallelism](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism) set in the pipeline application code. At the same time, this maximum parallelism must correspond to the total number of task slots in the pipeline cluster, which is determined by the worker parameters (for stream pipelines, workers are also called TaskManagers). By default, in the HERE platform pipeline, each worker gets only one task slot. The number of task slots per worker can be set in the [`Runtime parameters`](https://docs.here.com/workspace/docs/portal-deployment#runtime-parameters) section during [pipeline version creation](https://docs.here.com/workspace/docs/olp-cli-topics-pipeline-version-commands#pipeline-version-create) using the `taskmanager.numberOfTaskSlots` parameter. Let's consider an example of a Flink pipeline that has 3 operators - a source, a map, and a sink with parallelism of 3, 8, and 3 respectively. In this example, to support the maximum parallelism of 8, the pipeline cluster should have at least 8 task slots. This can be achieved by specifying a combination of workers and task slots to equal the value of 8 - for example, setting 4 workers with 2 task slots or setting 8 workers with 1 task slot per worker. During the pipeline deployment it's possible to specify the amount of resources that will be allocated to workers. Resources are allocated as units, each containing a pre-defined number of CPUs, RAM size, and disk space. The amount of resources per unit is defined by the chosen [resource profile](portal-deployment#resource-profiles). Our recommendation is to use higher parallelism and fewer resources per worker, rather than to use fewer workers, each with a large amount of resources. It is also recommended that the number of task slots per worker be limited to the maximum number of CPUs allocated to the worker. For more information about TaskManagers and task slots, see the [Apache Flink architecture documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/flink-architecture/#taskmanagers). ## Stream configuration During the pipeline deployment, you can set up certain Flink configuration properties specifying them in the [`Runtime parameters`](https://docs.here.com/workspace/docs/portal-deployment#runtime-parameters) section. For example, set `taskmanager.numberOfTaskSlots=2` to configure two task slots per worker. The following is a list of Flink configuration properties supported by stream pipelines: ``` # JobManager configuration jobmanager.memory.heap.size jobmanager.memory.off-heap.size jobmanager.memory.jvm-metaspace.size jobmanager.memory.jvm-overhead.min jobmanager.memory.jvm-overhead.max jobmanager.memory.jvm-overhead.fraction # Taskmanager configuration taskmanager.numberOfTaskSlots taskmanager.heap.size taskmanager.memory.flink.size taskmanager.memory.jvm-metaspace.size taskmanager.memory.framework.heap.size taskmanager.memory.task.heap.size taskmanager.memory.managed.size taskmanager.memory.managed.fraction taskmanager.memory.framework.off-heap.size taskmanager.memory.task.off-heap.size taskmanager.memory.network.min taskmanager.memory.network.max taskmanager.memory.network.fraction taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction # Pekko configuration pekko.framesize pekko.ask.timeout pekko.lookup.timeout pekko.tcp.timeout client.timeout # State backends configuration state.storage.fs.memory-threshold state.backend.rocksdb.predefined-options state.backend.incremental # Stateful functions configuration statefun.message.serializer statefun.feedback.memory.size # Fault tolerance configuration heartbeat.interval heartbeat.timeout # Scheduling configuration cluster.evenly-spread-out-slots # Class loading configuration classloader.parent-first-patterns.additional ``` For more information, see the [Apache Flink documentation on Configuration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/) and [Stateful functions configuration](https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/configurations/). ## Enable notifications for stream pipelines In a planned outage, a stream pipeline can get affected. To prevent any unexpected interruptions in the data processing, it is recommended to update the [email address](https://docs.here.com/workspace/docs/portal-deployment#add-a-pipeline) associated with the pipeline so that a planned outage notification can be sent with clear details about the incident and expected user actions, such as: * Incident Summary * Realm * Pipeline ID * Pipeline Name * Pipeline Version ID * Requested Action with supporting instructions * Due Date and Time * System Operation to be performed by HERE platform if the Requested Action is not performed If the Requested Action is not performed by the due date and time, the System Operation listed in the outage email will be performed. Once the System Operation has been initiated on the affected stream pipeline, a second email is sent with the following details: * Realm * Pipeline ID * Pipeline Name * Pipeline Version ID * Incident Summary * System Operation Date and Time * System Operation Being Performed During the System Operation, an attempt is made to save the current state and start a new job with the saved state. If the Requested Action is performed by the due date and time, the System Operation will be dropped. The state will be recovered for only those stream pipelines that use [checkpointing](https://docs.here.com/workspace/docs/stream-processing#flink-checkpointing). Otherwise, if the requested action hasn't been performed by the due date and time, the pipeline job will be canceled and the pipeline will be activated again. Therefore, it is recommended to use checkpointing in stream pipelines. > #### Note > > **Time to complete a savepoint**\ > During an attempt to take a savepoint, 120 seconds are allocated to the pipeline to complete it. > If the pipeline is not able to complete the savepoint within that time, it is canceled and then activated again. If an affected stream pipeline does not have an email address associated with it, a notification cannot be sent and the System Operation will be performed as outlined in the planned outage notification. ## Restart strategies When a task failure happens, Flink needs to restart the failed and other affected tasks to recover the job to a normal state. Restart strategies and failover strategies are used to control the task restart. Restart strategies decide whether and when the failed/affected tasks can be restarted. Failover strategies decide which tasks should be restarted to recover the job. ### Stream 5.0 If checkpointing is enabled, the default restart strategy is [`Fixed Delay`](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy), otherwise the [`No Restart`](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/task_failure_recovery/#no-restart-strategy) restart strategy is used. ### Stream 6.0 The default restart strategy used is [`Failure Rate`](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/task_failure_recovery/#failure-rate-restart-strategy) with a 30-minute interval, 15-second delay and a maximum of 3 failures per interval. This strategy applies to jobs both with and without checkpointing enabled. To use a different strategy, it will need to be implemented in the pipeline application code. The following example shows how we can set a [`Failure Rate`](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/task_failure_recovery/#failure-rate-restart-strategy) restart strategy: ```java ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per interval Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay )); ``` In the event of a failure, the system will attempt to restart the job, but when failure rate is exceeded (`3` failures per `5 minutes`) , the job eventually fails. > #### Note > > Note that choosing a different restart strategy with low thresholds can result in reduced fault tolerance and job stability. For more information on how Flink supports different restart strategies, see the [Apache Flink documentation on restart strategies](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/task_failure_recovery/#restart-strategies). ## Use unique metric operator naming In earlier versions of Flink, metric operators were automatically named based on their associated task names. But this system was found to be failure prone. It has been replaced by explicitly named metric operators that are stored in the respective `streamConfig`. However, you must make your metric operator names unique to avoid naming conflicts that will cause the duplicate metrics to not be reported. It is also a good idea to limit the `operatorName` component of a metric name to no more than 80 characters. This avoids unwieldy metric names being reported in the logs. This also avoids the known risk of metrics being ignored by the Graphite Reporter when their names are too long. For more information on operator naming and metric reporters, refer to the following guides: * [Apache Flink documentation on assigning operator IDs](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/#assigning-operator-ids) * [Apache Flink documentation on metric reporters](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/metric_reporters/) ## Use same catalog for input and output Unlike batch pipelines, stream pipelines can use the same catalog as data source and data sink. However, the such a catalog must be configured with stream layers. This is achieved by the Flink's `End-to-End Exactly-Once Processing` feature. It extracts the common logic of the two-phase commit protocol and makes it possible to build end-to-end exactly-once applications with Flink and a selection of data sources and sinks, including Apache Kafka versions `0.11` and beyond. It provides a layer of abstraction and requires the user to implement only a handful of methods to achieve end-to-end exactly-once semantics. For a complete overview of this feature, see [An overview of End-to-End Exactly-Once Processing in Apache Flink](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html). And to learn more about how to put the `TwoPhaseCommitSinkFunction` to use, see the relevant [Apache Flink API documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html). ## Flink production readiness checklist The production readiness checklist provides an overview of configuration options that should be carefully considered before bringing a stream data processing application into production: * Setting an explicit maximal parallelism * Setting UUIDs for all operators for better savepointing and state restoring * Choosing a right state backend * Configuring high availability of `JobManager` The full checklist is available in the [Apache Flink documentation on production readiness](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/production_ready/). ## Multiple pipeline use There are two use cases where it is desirable to use multiple stream pipelines and a single data source. However, care must be taken in configuring these pipelines to achieve the desired results. > #### Info > > Apache Kafka is used by the stream pipelines as a data stream messaging system to enable real-time processing of streams. > This allows multiple pipelines to share a common input catalog data source where Kafka functions as a queuing model or as a publish-subscribe model. > For additional information, see the [Kafka as a messaging system](https://kafka.apache.org/intro/#kafka_mq) guide. ### Use Case 1 - Shared processing ![Use Case 1 - Shared processing](https://files.readme.io/ebcc81ba15ff9481cff9c03ee59447e5515d4e41a3b45301b1b162f3a88537a9-stream-use-case-1.png "Shared processing") In this use case, the idea is to have multiple pipelines sharing the processing load of a single streaming data source. Each pipeline processes data passed to it by the Kafka `Stream Processor`, so that each pipeline only has to process a portion of the input catalog and layer data stream. This allows new stream data to be processed more quickly as each pipeline only has to deal with a fraction of the total input data load. ### Use Case 2 - Shared data source ![Use Case 2 - Shared data source](https://files.readme.io/9ed84d11f8b760f4ceac77895bf577b5bba6466fcbd6c5e0690837872891128f-stream-use-case-2.png "Shared data source") In this use case, the multiple pipelines each apply a different data processing workflow to the same source data. The HERE platform Stream Services see the pipelines as unrelated except that they are using the same streaming data source. Each pipeline consumes the entire input catalog data stream. ### Configuration details Each use case is executable in the HERE platform, but has a different pipeline configuration. For the first [`shared processing`](https://docs.here.com/workspace/docs/stream-processing#use-case-1---shared-processing) use case, both pipelines have to be created in the **same** group or project in order to share the processing load of a single data source. For the second [`shared data source`](https://docs.here.com/workspace/docs/stream-processing#use-case-2---shared-data-source) use case, both pipelines have to be created in the **different** groups or projects to avoid partial data consumption per pipeline. If they are configured this way, every pipeline will consume all messages from the data source. It's also not recommended to override the Kafka consumer group ID and use different IDs for each pipeline because it can result in unexpected behavior. Note that a Kafka consumer group ID is automatically assigned to associate the Kafka cluster with the data consumer - data processing pipelines, in our case. ### See Also * [Best practices for high availability - Enable high availability option for stream pipelines](https://docs.here.com/workspace/docs/highly-available-pipelines#enable-high-availability-option-for-stream-pipelines)