GuidesChangelogData Inspector Library API Reference
Guides

Flink 2.2.x migration guide

Flink Connector Migration Guide (Flink 1.19 -> 2.2)

After Flink upgrade from 1.19 to 2.2, several deprecated APIs that were announced in the Flink 1.19 migration guide have been removed. This guide covers all breaking changes that affect users of the Data Client Library's Flink Connector and Flink Support modules.

Overview of breaking changes

ChangeImpactAffected API
flink-java artifact removedMerged into flink-streaming-javaDependencies
SourceFunction / RichSourceFunction removedaddSource() no longer existsFlink Support (DataStream API)
SinkFunction / RichSinkFunction removedaddSink() no longer existsFlink Support (DataStream API)
open(Configuration) signature changedRich functions use open(OpenContext)Both
fromCollection() removedUse fromData()Both
RestartStrategies removedUse Configuration + RestartStrategyOptionsBoth
getSerializerConfig returns interfaceCast to SerializerConfigImpl for Kryo registrationBoth

Note

Flink Connector (Table API / SQL) If you only use the Flink Connector via OlpStreamConnectorHelper and SQL CREATE TABLE / INSERT INTO statements, most of these changes do not apply to you. The Table API surface is largely unchanged. Review the Dependencies, open(Configuration), and Removed deprecated APIs sections only.

Dependencies

Flink core artifacts

Flink 2.2 continues to be Scala-free. Use single % (Maven artifactId without the _2.13 suffix):

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" % "flink-clients" % "2.2.0" % "provided"

Table API planner

The flink-table-planner artifact no longer carries a Scala suffix:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" % "flink-table-planner" % "2.2.0" % "provided"

Flink Java library removed

The flink-java artifact has been removed in Flink 2.0. All its functionality has been merged into flink-streaming-java. If your project previously depended on flink-java, simply remove that dependency — the classes you need are now available in flink-streaming-java (which is typically already included via flink-clients or flink-runtime).

Before (Flink 1.x):

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.19.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.19.0</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" % "flink-java" % "1.19.0" % "provided",
"org.apache.flink" % "flink-streaming-java" % "1.19.0" % "provided"

After (Flink 2.x):

    
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>
// flink-java removed, functionality merged into flink-streaming-java
"org.apache.flink" % "flink-streaming-java" % "2.2.0" % "provided"

Note

In most cases, you don't need to explicitly declare flink-streaming-java as it's transitively included by flink-clients. Only add it if your build specifically requires it.

Scala API support

If your project uses Scala and relies on Flink's implicit conversions (DataStream extensions, createTypeInformation, etc.), add the community flink-scala-api library:

    <dependency>
        <groupId>org.flinkextended</groupId>
        <artifactId>flink-scala-api-2_2.13</artifactId>
        <version>2.2.0</version>
    </dependency>
"org.flinkextended" %% "flink-scala-api-2" % "2.2.0"

Source API changes (SourceFunction → FLIP-27 Source)

In the Flink 1.19 migration guide, we noted that SourceFunction, RichSourceFunction, and ParallelSourceFunction were deprecated and would be removed in Flink 2.0. They are now removed.

The Data Client Library has been updated internally to use the new FLIP-27 Source API. The public API methods (queryApi.subscribe(...), queryApi.queryIndex(...), queryApi.getVolatilePartitions(...), etc.) now return org.apache.flink.api.connector.source.Source instead of SourceFunction.

Replace addSource() with fromSource()

The StreamExecutionEnvironment.addSource() method has been removed. Use fromSource() with a WatermarkStrategy instead.

Before (Flink 1.19):

val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// Stream layer subscription
val partitions: DataStream[Partition] =
  env.addSource(queryApi.subscribe(streamLayer, ConsumerSettings(groupName = "my-job")))

// Index layer query
val indexPartitions: DataStream[IndexPartition] =
  env.addSource(queryApi.queryIndex(indexLayer, Some(queryString)))

// Volatile layer query
val volatilePartitions: DataStream[Partition] =
  env.addSource(queryApi.getVolatilePartitions(volatileLayer, filter))
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

// Stream layer subscription
ConsumerSettings consumerSettings = new ConsumerSettings.Builder().withGroupName("my-job").build();
DataStream<Partition> partitions =
    env.addSource(queryApi.subscribe(streamLayer, consumerSettings));

// Index layer query
DataStream<IndexPartition> indexPartitions =
    env.addSource(queryApi.queryIndex(indexLayer, Optional.of(queryString)));

// Volatile layer query
DataStream<Partition> volatilePartitions =
    env.addSource(queryApi.getVolatilePartitions(volatileLayer, filter));

After (Flink 2.2):

import org.apache.flink.api.common.eventtime.WatermarkStrategy

val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// Stream layer subscription
val partitions: DataStream[Partition] =
  env.fromSource(
    queryApi.subscribe(streamLayer, ConsumerSettings(groupName = "my-job")),
    WatermarkStrategy.noWatermarks(),
    "StreamLayerSubscription")

// Index layer query
val indexPartitions: DataStream[IndexPartition] =
  env.fromSource(
    queryApi.queryIndex(indexLayer, Some(queryString)),
    WatermarkStrategy.noWatermarks(),
    "IndexLayerQuery")

// Volatile layer query
val volatilePartitions: DataStream[Partition] =
  env.fromSource(
    queryApi.getVolatilePartitions(volatileLayer, filter),
    WatermarkStrategy.noWatermarks(),
    "VolatileLayerQuery")
import org.apache.flink.api.common.eventtime.WatermarkStrategy;

FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

// Stream layer subscription
ConsumerSettings consumerSettings = new ConsumerSettings.Builder().withGroupName("my-job").build();
DataStream<Partition> partitions =
    env.fromSource(
        queryApi.subscribe(streamLayer, consumerSettings),
        WatermarkStrategy.noWatermarks(),
        "StreamLayerSubscription");

// Index layer query
DataStream<IndexPartition> indexPartitions =
    env.fromSource(
        queryApi.queryIndex(indexLayer, Optional.of(queryString)),
        WatermarkStrategy.noWatermarks(),
        "IndexLayerQuery");

// Volatile layer query
DataStream<Partition> volatilePartitions =
    env.fromSource(
        queryApi.getVolatilePartitions(volatileLayer, filter),
        WatermarkStrategy.noWatermarks(),
        "VolatileLayerQuery");

The key differences:

  1. addSource(source)fromSource(source, watermarkStrategy, operatorName)
  2. WatermarkStrategy.noWatermarks() is the simplest strategy when you don't need event-time processing. Use WatermarkStrategy.forMonotonousTimestamps() or a custom strategy if you need event-time semantics.
  3. An operator name (third parameter) is now required — use a descriptive string.

Admin API sources

The FlinkAdminApi.listCatalogs() and FlinkAdminApi.listSubscriptions() methods also return the new Source type. Apply the same fromSource() pattern:

Before (Flink 1.19):

val adminApi = client.adminApi()
val catalogs: DataStream[CatalogSummary] =
  env.addSource(adminApi.listCatalogs())
FlinkAdminApi adminApi = client.adminApi();
DataStream<CatalogSummary> catalogs =
    env.addSource(adminApi.listCatalogs());

After (Flink 2.2):

val adminApi = client.adminApi()
val catalogs: DataStream[CatalogSummary] =
  env.fromSource(adminApi.listCatalogs(), WatermarkStrategy.noWatermarks(), "ListCatalogs")
FlinkAdminApi adminApi = client.adminApi();
DataStream<CatalogSummary> catalogs =
    env.fromSource(adminApi.listCatalogs(), WatermarkStrategy.noWatermarks(), "ListCatalogs");

Sink API changes (SinkFunction → Sink V2)

SinkFunction, RichSinkFunction, and DataStream.addSink() have been removed in Flink 2.0. The Data Client Library now uses the Sink V2 API (org.apache.flink.api.connector.sink2.Sink).

Replace addSink() with sinkTo()

Before (Flink 1.19):

val writeEngine = client.writeEngine(hrn)
pendingPartitions.addSink(writeEngine.publish())
FlinkWriteEngine writeEngine = client.writeEngine(hrn);
pendingPartitions.addSink(writeEngine.publish());

After (Flink 2.2):

val writeEngine = client.writeEngine(hrn)
pendingPartitions.sinkTo(writeEngine.publish())
FlinkWriteEngine writeEngine = client.writeEngine(hrn);
pendingPartitions.sinkTo(writeEngine.publish());

Custom RichSinkFunction implementations

If you have custom RichSinkFunction implementations (e.g., for writing to ObjectStore layers), you must migrate them to the Sink V2 API.

Before (Flink 1.19):

class UploadObjectStoreSink(hrn: HRN, layerId: String)
    extends RichSinkFunction[ObjectStoreMessage] {

  @transient
  private lazy val dataClient = new FlinkDataClient()
  @transient
  private lazy val writeEngine = dataClient.writeEngine(hrn)

  override def invoke(message: ObjectStoreMessage, context: SinkFunction.Context): Unit =
    writeEngine.uploadObject2(layerId, message.key, message.data, ...)

  override def close(): Unit =
    dataClient.terminate()
}

// Usage:
stream.addSink(new UploadObjectStoreSink(hrn, layerId))
class UploadObjectStoreSink extends RichSinkFunction<ObjectStoreMessage> {
    private transient FlinkDataClient dataClient;
    private transient FlinkWriteEngine writeEngine;

    @Override
    public void open(Configuration parameters) throws Exception {
        dataClient = new FlinkDataClient();
        writeEngine = dataClient.writeEngine(hrn);
    }

    @Override
    public void invoke(ObjectStoreMessage message, Context context) throws Exception {
        writeEngine.uploadObject2(layerId, message.getKey(), message.getData(), ...);
    }

    @Override
    public void close() throws Exception {
        dataClient.terminate();
    }
}

// Usage:
stream.addSink(new UploadObjectStoreSink());

After (Flink 2.2):

import org.apache.flink.api.connector.sink2.{Sink, SinkWriter}

class UploadObjectStoreSink(hrn: HRN, layerId: String)
    extends Sink[ObjectStoreMessage] {

  override def createWriter(context: Sink.InitContext): SinkWriter[ObjectStoreMessage] =
    new UploadObjectStoreSinkWriter(hrn, layerId)
}

class UploadObjectStoreSinkWriter(hrn: HRN, layerId: String)
    extends SinkWriter[ObjectStoreMessage] {

  private val dataClient = new FlinkDataClient()
  private val writeEngine = dataClient.writeEngine(hrn)

  override def write(message: ObjectStoreMessage, context: SinkWriter.Context): Unit =
    writeEngine.uploadObject2(layerId, message.key, message.data, ...)

  override def flush(endOfInput: Boolean): Unit = {}

  override def close(): Unit =
    dataClient.terminate()
}

// Usage:
stream.sinkTo(new UploadObjectStoreSink(hrn, layerId))
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

class UploadObjectStoreSink implements Sink<ObjectStoreMessage> {
    private final HRN hrn;
    private final String layerId;

    public UploadObjectStoreSink(HRN hrn, String layerId) {
        this.hrn = hrn;
        this.layerId = layerId;
    }

    @Override
    public SinkWriter<ObjectStoreMessage> createWriter(InitContext context) {
        return new UploadObjectStoreSinkWriter(hrn, layerId);
    }
}

class UploadObjectStoreSinkWriter implements SinkWriter<ObjectStoreMessage> {
    private final FlinkDataClient dataClient;
    private final FlinkWriteEngine writeEngine;

    public UploadObjectStoreSinkWriter(HRN hrn, String layerId) {
        dataClient = new FlinkDataClient();
        writeEngine = dataClient.writeEngine(hrn);
    }

    @Override
    public void write(ObjectStoreMessage message, Context context) throws Exception {
        writeEngine.uploadObject2(layerId, message.getKey(), message.getData(), ...);
    }

    @Override
    public void flush(boolean endOfInput) throws Exception {}

    @Override
    public void close() throws Exception {
        dataClient.terminate();
    }
}

// Usage:
stream.sinkTo(new UploadObjectStoreSink(hrn, layerId));

Key changes for custom sink migration:

Old (RichSinkFunction)New (Sink V2)
Single classSplit into Sink (factory) + SinkWriter (logic)
open(Configuration)Constructor of SinkWriter (or createWriter())
invoke(value, context)write(value, context)
snapshotState(...)flush(endOfInput)
close()SinkWriter.close()
stream.addSink(sink)stream.sinkTo(sink)

open(Configuration)

The open(Configuration) method on all Rich functions (RichMapFunction, RichFlatMapFunction, RichFilterFunction, RichAsyncFunction, RichAllWindowFunction, etc.) has been replaced by open(OpenContext).

Before (Flink 1.19):

import org.apache.flink.configuration.Configuration

class ReadDataMapFunction(hrn: HRN)
    extends RichMapFunction[Partition, (Partition, Array[Byte])] {

  @transient private lazy val dataClient = new FlinkDataClient()
  @transient private lazy val readEngine = dataClient.readEngine(hrn)

  override def map(partition: Partition): (Partition, Array[Byte]) = ...

  override def close(): Unit = dataClient.terminate()
}
import org.apache.flink.configuration.Configuration;

class ReadDataMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>> {
    private transient FlinkDataClient dataClient;
    private transient FlinkReadEngine readEngine;

    @Override
    public void open(Configuration parameters) throws Exception {
        dataClient = new FlinkDataClient();
        readEngine = dataClient.readEngine(hrn);
    }

    @Override
    public Tuple2<Partition, byte[]> map(Partition partition) { ... }

    @Override
    public void close() throws Exception { dataClient.terminate(); }
}

After (Flink 2.2):

import org.apache.flink.api.common.functions.OpenContext

class ReadDataMapFunction(hrn: HRN)
    extends RichMapFunction[Partition, (Partition, Array[Byte])] {

  @transient private lazy val dataClient = new FlinkDataClient()
  @transient private lazy val readEngine = dataClient.readEngine(hrn)

  override def map(partition: Partition): (Partition, Array[Byte]) = ...

  override def close(): Unit = dataClient.terminate()
}
import org.apache.flink.api.common.functions.OpenContext;

class ReadDataMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>> {
    private transient FlinkDataClient dataClient;
    private transient FlinkReadEngine readEngine;

    @Override
    public void open(OpenContext openContext) throws Exception {
        dataClient = new FlinkDataClient();
        readEngine = dataClient.readEngine(hrn);
    }

    @Override
    public Tuple2<Partition, byte[]> map(Partition partition) { ... }

    @Override
    public void close() throws Exception { dataClient.terminate(); }
}

Note

Scala users: If you use @transient private lazy val for initialization (the recommended Scala pattern), you may not have an open() override at all. In that case, no changes are needed for this item.

Removed deprecated APIs

The following APIs, previously deprecated in Flink 1.19, are now removed in Flink 2.0+ and must be updated:

fromCollection

StreamExecutionEnvironment.fromCollection() has been removed. Use fromData() instead (see Flink 1.19 migration guide for examples).

RestartStrategies

org.apache.flink.api.common.restartstrategy.RestartStrategies has been removed. Use Configuration with RestartStrategyOptions instead (see Flink 1.19 migration guide for examples).

toAppendStream

StreamTableEnvironment.toAppendStream() has been removed. Use toDataStream() instead (see Flink 1.19 migration guide for examples).

SerializerConfig via ExecutionConfig

In Flink 2.x, env.getConfig.getSerializerConfig returns the SerializerConfig interface instead of the concrete SerializerConfigImpl class. Methods such as addDefaultKryoSerializer() and registerTypeWithKryoSerializer() are only available on SerializerConfigImpl, so you must cast the result.

Scala — Before (Flink 1.x):

env.getConfig.addDefaultKryoSerializer(classOf[MyType], classOf[MySerializer])
// or
val serializerConfig = env.getConfig.getSerializerConfig
serializerConfig.addDefaultKryoSerializer(classOf[MyType], classOf[MySerializer])

Scala — After (Flink 2.x):

import import org.apache.flink.api.common.serialization.SerializerConfigImpl

val serializerConfig = env.getConfig.getSerializerConfig.asInstanceOf[SerializerConfigImpl]
serializerConfig.addDefaultKryoSerializer(classOf[MyType], classOf[MySerializer])

Java — Before (Flink 1.x):

env.getConfig().addDefaultKryoSerializer(MyType.class, MySerializer.class);
// or
SerializerConfig serializerConfig = env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(MyType.class, MySerializer.class);

Java — After (Flink 2.x):

import org.apache.flink.api.common.serialization.SerializerConfigImpl;

SerializerConfigImpl serializerConfig =
    (SerializerConfigImpl) env.getConfig().getSerializerConfig();
serializerConfig.addDefaultKryoSerializer(MyType.class, MySerializer.class);

The same cast is required for registerTypeWithKryoSerializer(), registerKryoType(), and other Kryo-related configuration methods.

References

For more information, review the Flink release notes: