GuidesChangelogData Inspector Library API Reference
Guides

Flink 1.19.x migration guide

Flink Connector Migration Guide (Flink 1.13 -> 1.19)

Note

This guide covers the migration from Flink 1.13 to 1.19. If you are migrating to Flink 2.2, see the Flink 2.2 migration guide — the APIs marked as deprecated below have been removed in Flink 2.0+.

After Flink upgrade from 1.13 to 1.19 version for Flink Connector there are some changes in API.

Dependencies

  1. Flink 1.19 has removed its dependency on Scala, so rather than:
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.13</artifactId>
        <version>1.13.5</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" %% "flink-clients" % "1.13.5" % "provided"

use:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.19.1</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" % "flink-clients" % "1.19.1" % "provided"

please note one % instead of %% and that the suffix _2.13 needs to be removed.

  1. Since Flink 1.14, the "Blink" planner became the default and only planner for Table API and SQL queries. Consequently, the separate flink-table-planner-blink module was deprecated, and its contents were merged into flink-table-planner. This change requires updating your project's dependencies to replace flink-table-planner-blink with flink-table-planner.
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.13</artifactId>
        <version>1.19.1</version>
        <scope>test</scope>
    </dependency>
"org.apache.flink" %% "flink-table-planner" % "1.19.1" % "provided"

Additionally, if your project uses the Table API, ensure you include the appropriate API bridge dependency:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>1.19.1</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" % "flink-table-api-java-bridge" % "1.19.1" % "provided"

Upgrade imports

Flink 1.19 introduces more features in its Java classes compared to the corresponding Scala ones. Using Scala classes will restrict your functionality and they are set to be removed in Flink 2.0. Therefore, instead of :

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;

you should import

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

from module "org.apache.flink" % "flink-streaming-java" % "1.19.1".

Deprecation updates

org.apache.flink.streaming.api.functions.source.SourceFunction

The FlinkQueryApi and FlinkAdminApi still rely on the deprecated org.apache.flink.streaming.api.functions.source.SourceFunction in their interfaces. While the new API offers improved control over parallelism, updating to it would require significant changes to the Data Client library, which cannot be accommodated within the current time constraints. Additionally, the deprecated classes:

  • org.apache.flink.streaming.api.functions.source.SourceFunction
  • org.apache.flink.streaming.api.functions.source.RichSourceFunction
  • org.apache.flink.streaming.api.functions.source.ParallelSourceFunction

Will not be fully removed until Flink 2.0. This issue will be addressed in future releases.

If your project enforces strict policies against using deprecated features, please consider allowing the use of deprecated classes in Flink-related modules with:

    <build>
      <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.13.0</version>
          <configuration>
            <compilerArgs>
              
              <arg>-Xlint:-deprecation</arg>
            </compilerArgs>
          </configuration>
        </plugin>
      </plugins>
    </build>
    .settings(
      scalacOptions ++= Seq(
        "-Wconf:cat=deprecation:silent"
      )
    )

RestartStrategies

org.apache.flink.api.common.restartstrategy.RestartStrategies class is deprecated. so rather than:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(RestartStrategies.noRestart())
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

you should use:

val env = {
  val config = new Configuration()
  config.set(RestartStrategyOptions.RESTART_STRATEGY, "none")
  StreamExecutionEnvironment.getExecutionEnvironment(config)
}
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

and rather than:

env.setRestartStrategy(
      RestartStrategies
        .fixedDelayRestart(10, 
                            Time.of(5, TimeUnit.MINUTES),
                            Time.of(10, TimeUnit.SECONDS)
         )
)
env.setRestartStrategy(
      RestartStrategies
        .fixedDelayRestart(10, 
                            Time.of(5, TimeUnit.MINUTES),
                            Time.of(10, TimeUnit.SECONDS)
         )
);

you should use:

env.setRestartStrategy(
val env = {
  val config = new Configuration()
  config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay")
  config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.valueOf(1))
  config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10))
  StreamExecutionEnvironment.getExecutionEnvironment(config)
}
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.valueOf(1));
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

the same applies for:

env.setRestartStrategy(
      RestartStrategies
        .failureRateRestart(10, 
                            Time.of(5, TimeUnit.MINUTES),
                            Time.of(10, TimeUnit.SECONDS)
        )
)
env.setRestartStrategy(
      RestartStrategies
        .failureRateRestart(10, 
                            Time.of(5, TimeUnit.MINUTES),
                            Time.of(10, TimeUnit.SECONDS)
        )
);

which should be replaced with:

val env = {
  val config = new Configuration()
  config.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate")
  config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL,
             Integer.valueOf(10))
  config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
             Duration.ofMinutes(5))
  config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
             Duration.ofSeconds(10))
  StreamExecutionEnvironment.getExecutionEnvironment(config)
}
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL,
           Integer.valueOf(10));
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
           Duration.ofMinutes(5));
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
           Duration.ofSeconds(10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

The purpose of this deprecation is to establish the restart strategy using the configuration object rather than modifying the stream environment directly.

StreamExecutionEnvironment.fromCollection

StreamExecutionEnvironment.fromCollection is deprecated. so rather than:

val seq = Seq(1, 2, 3)
val stream = environment.fromCollection(seq)
List<Integer> javaList = Arrays.asList(1, 2, 3);
DataStream<Integer> stream = environment.fromCollection(javaList);

you should use:

val seq = Seq(1, 2, 3)
val stream = environment.fromData(seq: _*)
List<Integer> javaList = Arrays.asList(1, 2, 3);
DataStream<Integer> stream = environment.fromData(javaList);

StreamTableEnvironment.toAppendStream

StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz) is deprecated now so rather than:

val stream = tEnv.toAppendStream[Row](table)
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);

you should use:

val stream = tEnv.toDataStream(table)
DataStream<Row> stream = tEnv.toDataStream(table);

env.getConfig.addDefaultKryoSerializer(..) / registerTypeWithKryoSerializer(..)

Custom serializers should now be defined using SerializerConfig instead of ExecutionConfig.

so rather than:

env.getConfig.addDefaultKryoSerializer(
  SomeClass.getClass,
  ClassOf[SomeClassCustomSerializer]
)
env.getConfig.addDefaultKryoSerializer(
  SomeClass.class,
  SomeClassCustomSerializer.class
);

you should use:

val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
  SomeClass.getClass,
  ClassOf[SomeClassCustomSerializer]
)
SerializerConfig serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
  SomeClass.class,
  SomeClassCustomSerializer.class
);

Same applies for env.getConfig.registerTypeWithKryoSerializer().

Custom serializers

Kryo does not natively support Scala's singletons and utility types (e.g., Map, List) used during Partitions serialization, requiring explicit configuration. To address this, register all relevant types and ensure consistent registration to prevent unexpected serialization errors.

None serializer

If you encounter the exception scala.MatchError: None (of class scala.None$), resolve it by importing the custom serializer for the singleton None using:

import com.here.platform.data.client.flink.serializers.ScalaNoneSerializer
import com.here.platform.data.client.flink.serializers.ScalaNoneSerializer;

register it with:

val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
  None.getClass,
  classOf[ScalaNoneSerializer]
)
SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(None$.class, ScalaNoneSerializer.class);

Nil Serializer

import com.here.platform.data.client.flink.serializers.ScalaNilSerializer
import com.here.platform.data.client.flink.serializers.ScalaNilSerializer;

register it with:

val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
  None.getClass,
  classOf[ScalaNilSerializer]
)
SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(Nil$.class, ScalaNilSerializer.class);

Some serializer

If you encounter the exception Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: scala.Some, resolve it by importing the custom serializer for the scala.Some using:

import com.here.platform.data.client.flink.serializers.ScalaSomeSerializer
import com.here.platform.data.client.flink.serializers.ScalaSomeSerializer;

register it with:

val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
scala.Some.getClass,
classOf[ScalaSomeSerializer]
)
SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(scala.Some.class, ScalaSomeSerializer.class);

Java List serializer

Index partitions include fields with generic types like java.util.List and java.util.Map, which Kryo may serialize incorrectly, particularly when they are empty, sometimes treating them as null. A custom serializer for Lists resolves this issue, and you can import it using:

import com.here.platform.data.client.flink.serializers.JavaListSerializer
import com.here.platform.data.client.flink.serializers.JavaListSerializer;

register it with:

val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
  classOf[java.util.List[_]],
  classOf[JavaListSerializer]
)
SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(java.util.List.class, JavaListSerializer.class);

Java Map serializer

The same applies to Maps, particularly in the context of Index layers. You can import the custom serializer with:

import com.here.platform.data.client.flink.serializers.JavaMapSerializer
import com.here.platform.data.client.flink.serializers.JavaMapSerializer;

register it with:

val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
  classOf[java.util.Map[_]],
  classOf[JavaMapSerializer]
)
SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(java.util.Map.class, JavaMapSerializer.class);

References

For more information you can review the release notes for each version from 1.14 to 1.19 to understand incremental changes:

Reasons behind removing Scala classes: