Flink 2.2.x migration guide
Flink Connector Migration Guide (Flink 1.19 -> 2.2)
After Flink upgrade from 1.19 to 2.2, several deprecated APIs that were announced in the Flink 1.19 migration guide have been removed. This guide covers all breaking changes that affect users of the Data Client Library's Flink Connector and Flink Support modules.
Overview of breaking changes
| Change | Impact | Affected API |
|---|---|---|
flink-java artifact removed | Merged into flink-streaming-java | Dependencies |
SourceFunction / RichSourceFunction removed | addSource() no longer exists | Flink Support (DataStream API) |
SinkFunction / RichSinkFunction removed | addSink() no longer exists | Flink Support (DataStream API) |
open(Configuration) signature changed | Rich functions use open(OpenContext) | Both |
fromCollection() removed | Use fromData() | Both |
RestartStrategies removed | Use Configuration + RestartStrategyOptions | Both |
getSerializerConfig returns interface | Cast to SerializerConfigImpl for Kryo registration | Both |
NoteFlink Connector (Table API / SQL) If you only use the Flink Connector via
OlpStreamConnectorHelperand SQLCREATE TABLE/INSERT INTOstatements, most of these changes do not apply to you. The Table API surface is largely unchanged. Review the Dependencies, open(Configuration), and Removed deprecated APIs sections only.
Dependencies
Flink core artifacts
Flink 2.2 continues to be Scala-free. Use single % (Maven artifactId without
the _2.13 suffix):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>"org.apache.flink" % "flink-clients" % "2.2.0" % "provided"Table API planner
The flink-table-planner artifact no longer carries a Scala suffix:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>"org.apache.flink" % "flink-table-planner" % "2.2.0" % "provided"Flink Java library removed
The flink-java artifact has been removed in Flink 2.0. All its functionality
has been merged into flink-streaming-java. If your project previously depended
on flink-java, simply remove that dependency — the classes you need are now
available in flink-streaming-java (which is typically already included via
flink-clients or flink-runtime).
Before (Flink 1.x):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.19.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.19.0</version>
<scope>provided</scope>
</dependency>"org.apache.flink" % "flink-java" % "1.19.0" % "provided",
"org.apache.flink" % "flink-streaming-java" % "1.19.0" % "provided"After (Flink 2.x):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>// flink-java removed, functionality merged into flink-streaming-java
"org.apache.flink" % "flink-streaming-java" % "2.2.0" % "provided"
NoteIn most cases, you don't need to explicitly declare
flink-streaming-javaas it's transitively included byflink-clients. Only add it if your build specifically requires it.
Scala API support
If your project uses Scala and relies on Flink's implicit conversions
(DataStream extensions, createTypeInformation, etc.), add the community
flink-scala-api library:
<dependency>
<groupId>org.flinkextended</groupId>
<artifactId>flink-scala-api-2_2.13</artifactId>
<version>2.2.0</version>
</dependency>"org.flinkextended" %% "flink-scala-api-2" % "2.2.0"Source API changes (SourceFunction → FLIP-27 Source)
SourceFunction → FLIP-27 Source)In the Flink 1.19 migration guide, we
noted that SourceFunction, RichSourceFunction, and ParallelSourceFunction
were deprecated and would be removed in Flink 2.0. They are now removed.
The Data Client Library has been updated internally to use the new
FLIP-27 Source API.
The public API methods (queryApi.subscribe(...), queryApi.queryIndex(...),
queryApi.getVolatilePartitions(...), etc.) now return
org.apache.flink.api.connector.source.Source instead of SourceFunction.
Replace addSource() with fromSource()
addSource() with fromSource()The StreamExecutionEnvironment.addSource() method has been removed. Use
fromSource() with a WatermarkStrategy instead.
Before (Flink 1.19):
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)
// Stream layer subscription
val partitions: DataStream[Partition] =
env.addSource(queryApi.subscribe(streamLayer, ConsumerSettings(groupName = "my-job")))
// Index layer query
val indexPartitions: DataStream[IndexPartition] =
env.addSource(queryApi.queryIndex(indexLayer, Some(queryString)))
// Volatile layer query
val volatilePartitions: DataStream[Partition] =
env.addSource(queryApi.getVolatilePartitions(volatileLayer, filter))FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);
// Stream layer subscription
ConsumerSettings consumerSettings = new ConsumerSettings.Builder().withGroupName("my-job").build();
DataStream<Partition> partitions =
env.addSource(queryApi.subscribe(streamLayer, consumerSettings));
// Index layer query
DataStream<IndexPartition> indexPartitions =
env.addSource(queryApi.queryIndex(indexLayer, Optional.of(queryString)));
// Volatile layer query
DataStream<Partition> volatilePartitions =
env.addSource(queryApi.getVolatilePartitions(volatileLayer, filter));After (Flink 2.2):
import org.apache.flink.api.common.eventtime.WatermarkStrategy
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)
// Stream layer subscription
val partitions: DataStream[Partition] =
env.fromSource(
queryApi.subscribe(streamLayer, ConsumerSettings(groupName = "my-job")),
WatermarkStrategy.noWatermarks(),
"StreamLayerSubscription")
// Index layer query
val indexPartitions: DataStream[IndexPartition] =
env.fromSource(
queryApi.queryIndex(indexLayer, Some(queryString)),
WatermarkStrategy.noWatermarks(),
"IndexLayerQuery")
// Volatile layer query
val volatilePartitions: DataStream[Partition] =
env.fromSource(
queryApi.getVolatilePartitions(volatileLayer, filter),
WatermarkStrategy.noWatermarks(),
"VolatileLayerQuery")import org.apache.flink.api.common.eventtime.WatermarkStrategy;
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);
// Stream layer subscription
ConsumerSettings consumerSettings = new ConsumerSettings.Builder().withGroupName("my-job").build();
DataStream<Partition> partitions =
env.fromSource(
queryApi.subscribe(streamLayer, consumerSettings),
WatermarkStrategy.noWatermarks(),
"StreamLayerSubscription");
// Index layer query
DataStream<IndexPartition> indexPartitions =
env.fromSource(
queryApi.queryIndex(indexLayer, Optional.of(queryString)),
WatermarkStrategy.noWatermarks(),
"IndexLayerQuery");
// Volatile layer query
DataStream<Partition> volatilePartitions =
env.fromSource(
queryApi.getVolatilePartitions(volatileLayer, filter),
WatermarkStrategy.noWatermarks(),
"VolatileLayerQuery");The key differences:
addSource(source)→fromSource(source, watermarkStrategy, operatorName)WatermarkStrategy.noWatermarks()is the simplest strategy when you don't need event-time processing. UseWatermarkStrategy.forMonotonousTimestamps()or a custom strategy if you need event-time semantics.- An operator name (third parameter) is now required — use a descriptive string.
Admin API sources
The FlinkAdminApi.listCatalogs() and FlinkAdminApi.listSubscriptions()
methods also return the new Source type. Apply the same fromSource()
pattern:
Before (Flink 1.19):
val adminApi = client.adminApi()
val catalogs: DataStream[CatalogSummary] =
env.addSource(adminApi.listCatalogs())FlinkAdminApi adminApi = client.adminApi();
DataStream<CatalogSummary> catalogs =
env.addSource(adminApi.listCatalogs());After (Flink 2.2):
val adminApi = client.adminApi()
val catalogs: DataStream[CatalogSummary] =
env.fromSource(adminApi.listCatalogs(), WatermarkStrategy.noWatermarks(), "ListCatalogs")FlinkAdminApi adminApi = client.adminApi();
DataStream<CatalogSummary> catalogs =
env.fromSource(adminApi.listCatalogs(), WatermarkStrategy.noWatermarks(), "ListCatalogs");Sink API changes (SinkFunction → Sink V2)
SinkFunction → Sink V2)SinkFunction, RichSinkFunction, and DataStream.addSink() have been removed
in Flink 2.0. The Data Client Library now uses the
Sink V2 API
(org.apache.flink.api.connector.sink2.Sink).
Replace addSink() with sinkTo()
addSink() with sinkTo()Before (Flink 1.19):
val writeEngine = client.writeEngine(hrn)
pendingPartitions.addSink(writeEngine.publish())FlinkWriteEngine writeEngine = client.writeEngine(hrn);
pendingPartitions.addSink(writeEngine.publish());After (Flink 2.2):
val writeEngine = client.writeEngine(hrn)
pendingPartitions.sinkTo(writeEngine.publish())FlinkWriteEngine writeEngine = client.writeEngine(hrn);
pendingPartitions.sinkTo(writeEngine.publish());Custom RichSinkFunction implementations
RichSinkFunction implementationsIf you have custom RichSinkFunction implementations (e.g., for writing to
ObjectStore layers), you must migrate them to the Sink V2 API.
Before (Flink 1.19):
class UploadObjectStoreSink(hrn: HRN, layerId: String)
extends RichSinkFunction[ObjectStoreMessage] {
@transient
private lazy val dataClient = new FlinkDataClient()
@transient
private lazy val writeEngine = dataClient.writeEngine(hrn)
override def invoke(message: ObjectStoreMessage, context: SinkFunction.Context): Unit =
writeEngine.uploadObject2(layerId, message.key, message.data, ...)
override def close(): Unit =
dataClient.terminate()
}
// Usage:
stream.addSink(new UploadObjectStoreSink(hrn, layerId))class UploadObjectStoreSink extends RichSinkFunction<ObjectStoreMessage> {
private transient FlinkDataClient dataClient;
private transient FlinkWriteEngine writeEngine;
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
writeEngine = dataClient.writeEngine(hrn);
}
@Override
public void invoke(ObjectStoreMessage message, Context context) throws Exception {
writeEngine.uploadObject2(layerId, message.getKey(), message.getData(), ...);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
}
// Usage:
stream.addSink(new UploadObjectStoreSink());After (Flink 2.2):
import org.apache.flink.api.connector.sink2.{Sink, SinkWriter}
class UploadObjectStoreSink(hrn: HRN, layerId: String)
extends Sink[ObjectStoreMessage] {
override def createWriter(context: Sink.InitContext): SinkWriter[ObjectStoreMessage] =
new UploadObjectStoreSinkWriter(hrn, layerId)
}
class UploadObjectStoreSinkWriter(hrn: HRN, layerId: String)
extends SinkWriter[ObjectStoreMessage] {
private val dataClient = new FlinkDataClient()
private val writeEngine = dataClient.writeEngine(hrn)
override def write(message: ObjectStoreMessage, context: SinkWriter.Context): Unit =
writeEngine.uploadObject2(layerId, message.key, message.data, ...)
override def flush(endOfInput: Boolean): Unit = {}
override def close(): Unit =
dataClient.terminate()
}
// Usage:
stream.sinkTo(new UploadObjectStoreSink(hrn, layerId))import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
class UploadObjectStoreSink implements Sink<ObjectStoreMessage> {
private final HRN hrn;
private final String layerId;
public UploadObjectStoreSink(HRN hrn, String layerId) {
this.hrn = hrn;
this.layerId = layerId;
}
@Override
public SinkWriter<ObjectStoreMessage> createWriter(InitContext context) {
return new UploadObjectStoreSinkWriter(hrn, layerId);
}
}
class UploadObjectStoreSinkWriter implements SinkWriter<ObjectStoreMessage> {
private final FlinkDataClient dataClient;
private final FlinkWriteEngine writeEngine;
public UploadObjectStoreSinkWriter(HRN hrn, String layerId) {
dataClient = new FlinkDataClient();
writeEngine = dataClient.writeEngine(hrn);
}
@Override
public void write(ObjectStoreMessage message, Context context) throws Exception {
writeEngine.uploadObject2(layerId, message.getKey(), message.getData(), ...);
}
@Override
public void flush(boolean endOfInput) throws Exception {}
@Override
public void close() throws Exception {
dataClient.terminate();
}
}
// Usage:
stream.sinkTo(new UploadObjectStoreSink(hrn, layerId));Key changes for custom sink migration:
Old (RichSinkFunction) | New (Sink V2) |
|---|---|
| Single class | Split into Sink (factory) + SinkWriter (logic) |
open(Configuration) | Constructor of SinkWriter (or createWriter()) |
invoke(value, context) | write(value, context) |
snapshotState(...) | flush(endOfInput) |
close() | SinkWriter.close() |
stream.addSink(sink) | stream.sinkTo(sink) |
open(Configuration)
The open(Configuration) method on all Rich functions (RichMapFunction,
RichFlatMapFunction, RichFilterFunction, RichAsyncFunction,
RichAllWindowFunction, etc.) has been replaced by open(OpenContext).
Before (Flink 1.19):
import org.apache.flink.configuration.Configuration
class ReadDataMapFunction(hrn: HRN)
extends RichMapFunction[Partition, (Partition, Array[Byte])] {
@transient private lazy val dataClient = new FlinkDataClient()
@transient private lazy val readEngine = dataClient.readEngine(hrn)
override def map(partition: Partition): (Partition, Array[Byte]) = ...
override def close(): Unit = dataClient.terminate()
}import org.apache.flink.configuration.Configuration;
class ReadDataMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>> {
private transient FlinkDataClient dataClient;
private transient FlinkReadEngine readEngine;
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
readEngine = dataClient.readEngine(hrn);
}
@Override
public Tuple2<Partition, byte[]> map(Partition partition) { ... }
@Override
public void close() throws Exception { dataClient.terminate(); }
}After (Flink 2.2):
import org.apache.flink.api.common.functions.OpenContext
class ReadDataMapFunction(hrn: HRN)
extends RichMapFunction[Partition, (Partition, Array[Byte])] {
@transient private lazy val dataClient = new FlinkDataClient()
@transient private lazy val readEngine = dataClient.readEngine(hrn)
override def map(partition: Partition): (Partition, Array[Byte]) = ...
override def close(): Unit = dataClient.terminate()
}import org.apache.flink.api.common.functions.OpenContext;
class ReadDataMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>> {
private transient FlinkDataClient dataClient;
private transient FlinkReadEngine readEngine;
@Override
public void open(OpenContext openContext) throws Exception {
dataClient = new FlinkDataClient();
readEngine = dataClient.readEngine(hrn);
}
@Override
public Tuple2<Partition, byte[]> map(Partition partition) { ... }
@Override
public void close() throws Exception { dataClient.terminate(); }
}
NoteScala users: If you use
@transient private lazy valfor initialization (the recommended Scala pattern), you may not have anopen()override at all. In that case, no changes are needed for this item.
Removed deprecated APIs
The following APIs, previously deprecated in Flink 1.19, are now removed in Flink 2.0+ and must be updated:
fromCollection
StreamExecutionEnvironment.fromCollection() has been removed. Use fromData()
instead (see
Flink 1.19 migration guide
for examples).
RestartStrategies
org.apache.flink.api.common.restartstrategy.RestartStrategies has been
removed. Use Configuration with RestartStrategyOptions instead (see
Flink 1.19 migration guide
for examples).
toAppendStream
StreamTableEnvironment.toAppendStream() has been removed. Use toDataStream()
instead (see
Flink 1.19 migration guide
for examples).
SerializerConfig via ExecutionConfig
In Flink 2.x, env.getConfig.getSerializerConfig returns the SerializerConfig
interface instead of the concrete SerializerConfigImpl class. Methods such
as addDefaultKryoSerializer() and registerTypeWithKryoSerializer() are only
available on SerializerConfigImpl, so you must cast the result.
Scala — Before (Flink 1.x):
env.getConfig.addDefaultKryoSerializer(classOf[MyType], classOf[MySerializer])
// or
val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(classOf[MyType], classOf[MySerializer])Scala — After (Flink 2.x):
import import org.apache.flink.api.common.serialization.SerializerConfigImpl
val serializerConfig = env.getConfig.getSerializerConfig.asInstanceOf[SerializerConfigImpl]
serializerConfig.addDefaultKryoSerializer(classOf[MyType], classOf[MySerializer])Java — Before (Flink 1.x):
env.getConfig().addDefaultKryoSerializer(MyType.class, MySerializer.class);
// or
SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(MyType.class, MySerializer.class);Java — After (Flink 2.x):
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
SerializerConfigImpl serializerConfig =
(SerializerConfigImpl) env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(MyType.class, MySerializer.class);The same cast is required for
registerTypeWithKryoSerializer(),registerKryoType(), and other Kryo-related configuration methods.
References
For more information, review the Flink release notes:
Updated 21 days ago