Flink 1.19.x migration guide
Flink Connector Migration Guide (Flink 1.13 -> 1.19)
NoteThis guide covers the migration from Flink 1.13 to 1.19. If you are migrating to Flink 2.2, see the Flink 2.2 migration guide — the APIs marked as deprecated below have been removed in Flink 2.0+.
After Flink upgrade from 1.13 to 1.19 version for Flink Connector there are some changes in API.
Dependencies
- Flink 1.19 has removed its dependency on Scala, so rather than:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.13</artifactId>
<version>1.13.5</version>
<scope>provided</scope>
</dependency>"org.apache.flink" %% "flink-clients" % "1.13.5" % "provided"use:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.19.1</version>
<scope>provided</scope>
</dependency>"org.apache.flink" % "flink-clients" % "1.19.1" % "provided"please note one % instead of %% and that the suffix _2.13 needs to be
removed.
- Since Flink 1.14, the "Blink" planner became the default and only planner for Table API and SQL queries. Consequently, the separate flink-table-planner-blink module was deprecated, and its contents were merged into flink-table-planner. This change requires updating your project's dependencies to replace flink-table-planner-blink with flink-table-planner.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.13</artifactId>
<version>1.19.1</version>
<scope>test</scope>
</dependency>"org.apache.flink" %% "flink-table-planner" % "1.19.1" % "provided"Additionally, if your project uses the Table API, ensure you include the appropriate API bridge dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.19.1</version>
<scope>provided</scope>
</dependency>"org.apache.flink" % "flink-table-api-java-bridge" % "1.19.1" % "provided"Upgrade imports
Flink 1.19 introduces more features in its Java classes compared to the corresponding Scala ones. Using Scala classes will restrict your functionality and they are set to be removed in Flink 2.0. Therefore, instead of :
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;you should import
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.java.StreamTableEnvironmentimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;from module "org.apache.flink" % "flink-streaming-java" % "1.19.1".
Deprecation updates
org.apache.flink.streaming.api.functions.source.SourceFunction
The FlinkQueryApi and FlinkAdminApi still rely on the deprecated org.apache.flink.streaming.api.functions.source.SourceFunction in their interfaces. While the new API offers improved control over parallelism, updating to it would require significant changes to the Data Client library, which cannot be accommodated within the current time constraints. Additionally, the deprecated classes:
- org.apache.flink.streaming.api.functions.source.SourceFunction
- org.apache.flink.streaming.api.functions.source.RichSourceFunction
- org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
Will not be fully removed until Flink 2.0. This issue will be addressed in future releases.
If your project enforces strict policies against using deprecated features, please consider allowing the use of deprecated classes in Flink-related modules with:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<compilerArgs>
<arg>-Xlint:-deprecation</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build> .settings(
scalacOptions ++= Seq(
"-Wconf:cat=deprecation:silent"
)
)RestartStrategies
org.apache.flink.api.common.restartstrategy.RestartStrategies class is
deprecated. so rather than:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(RestartStrategies.noRestart())StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());you should use:
val env = {
val config = new Configuration()
config.set(RestartStrategyOptions.RESTART_STRATEGY, "none")
StreamExecutionEnvironment.getExecutionEnvironment(config)
}Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);and rather than:
env.setRestartStrategy(
RestartStrategies
.fixedDelayRestart(10,
Time.of(5, TimeUnit.MINUTES),
Time.of(10, TimeUnit.SECONDS)
)
)env.setRestartStrategy(
RestartStrategies
.fixedDelayRestart(10,
Time.of(5, TimeUnit.MINUTES),
Time.of(10, TimeUnit.SECONDS)
)
);you should use:
env.setRestartStrategy(
val env = {
val config = new Configuration()
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay")
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.valueOf(1))
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10))
StreamExecutionEnvironment.getExecutionEnvironment(config)
}Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.valueOf(1));
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);the same applies for:
env.setRestartStrategy(
RestartStrategies
.failureRateRestart(10,
Time.of(5, TimeUnit.MINUTES),
Time.of(10, TimeUnit.SECONDS)
)
)env.setRestartStrategy(
RestartStrategies
.failureRateRestart(10,
Time.of(5, TimeUnit.MINUTES),
Time.of(10, TimeUnit.SECONDS)
)
);which should be replaced with:
val env = {
val config = new Configuration()
config.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate")
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL,
Integer.valueOf(10))
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
Duration.ofMinutes(5))
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
Duration.ofSeconds(10))
StreamExecutionEnvironment.getExecutionEnvironment(config)
}Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL,
Integer.valueOf(10));
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
Duration.ofMinutes(5));
config.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
Duration.ofSeconds(10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);The purpose of this deprecation is to establish the restart strategy using the configuration object rather than modifying the stream environment directly.
StreamExecutionEnvironment.fromCollection
StreamExecutionEnvironment.fromCollection is deprecated. so rather than:
val seq = Seq(1, 2, 3)
val stream = environment.fromCollection(seq)List<Integer> javaList = Arrays.asList(1, 2, 3);
DataStream<Integer> stream = environment.fromCollection(javaList);you should use:
val seq = Seq(1, 2, 3)
val stream = environment.fromData(seq: _*)List<Integer> javaList = Arrays.asList(1, 2, 3);
DataStream<Integer> stream = environment.fromData(javaList);StreamTableEnvironment.toAppendStream
StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz) is
deprecated now so rather than:
val stream = tEnv.toAppendStream[Row](table)DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);you should use:
val stream = tEnv.toDataStream(table)DataStream<Row> stream = tEnv.toDataStream(table);env.getConfig.addDefaultKryoSerializer(..) / registerTypeWithKryoSerializer(..)
Custom serializers should now be defined using SerializerConfig instead of
ExecutionConfig.
so rather than:
env.getConfig.addDefaultKryoSerializer(
SomeClass.getClass,
ClassOf[SomeClassCustomSerializer]
)env.getConfig.addDefaultKryoSerializer(
SomeClass.class,
SomeClassCustomSerializer.class
);you should use:
val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
SomeClass.getClass,
ClassOf[SomeClassCustomSerializer]
)SerializerConfig serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
SomeClass.class,
SomeClassCustomSerializer.class
);Same applies for env.getConfig.registerTypeWithKryoSerializer().
Custom serializers
Kryo does not natively support Scala's singletons and utility types (e.g., Map, List) used during Partitions serialization, requiring explicit configuration. To address this, register all relevant types and ensure consistent registration to prevent unexpected serialization errors.
None serializer
If you encounter the exception scala.MatchError: None (of class scala.None$),
resolve it by importing the custom serializer for the singleton None using:
import com.here.platform.data.client.flink.serializers.ScalaNoneSerializerimport com.here.platform.data.client.flink.serializers.ScalaNoneSerializer;register it with:
val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
None.getClass,
classOf[ScalaNoneSerializer]
)SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(None$.class, ScalaNoneSerializer.class);Nil Serializer
import com.here.platform.data.client.flink.serializers.ScalaNilSerializerimport com.here.platform.data.client.flink.serializers.ScalaNilSerializer;register it with:
val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
None.getClass,
classOf[ScalaNilSerializer]
)SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(Nil$.class, ScalaNilSerializer.class);Some serializer
If you encounter the exception
Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: scala.Some,
resolve it by importing the custom serializer for the scala.Some using:
import com.here.platform.data.client.flink.serializers.ScalaSomeSerializerimport com.here.platform.data.client.flink.serializers.ScalaSomeSerializer;register it with:
val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
scala.Some.getClass,
classOf[ScalaSomeSerializer]
)SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(scala.Some.class, ScalaSomeSerializer.class);Java List serializer
Index partitions include fields with generic types like java.util.List and
java.util.Map, which Kryo may serialize incorrectly, particularly when they
are empty, sometimes treating them as null. A custom serializer for Lists
resolves this issue, and you can import it using:
import com.here.platform.data.client.flink.serializers.JavaListSerializerimport com.here.platform.data.client.flink.serializers.JavaListSerializer;register it with:
val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
classOf[java.util.List[_]],
classOf[JavaListSerializer]
)SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(java.util.List.class, JavaListSerializer.class);Java Map serializer
The same applies to Maps, particularly in the context of Index layers. You can import the custom serializer with:
import com.here.platform.data.client.flink.serializers.JavaMapSerializerimport com.here.platform.data.client.flink.serializers.JavaMapSerializer;register it with:
val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(
classOf[java.util.Map[_]],
classOf[JavaMapSerializer]
)SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(java.util.Map.class, JavaMapSerializer.class);References
For more information you can review the release notes for each version from 1.14 to 1.19 to understand incremental changes:
Reasons behind removing Scala classes:
Updated 22 days ago