GuidesChangelogData Inspector Library API Reference
Guides

Flink entry point

Flink entry point

FlinkDataClient is the main entry point for the API. FlinkDataClient is a heavyweight object that needs to be created once, reused and terminated. If you do not terminate this object, the job may never finish and result in exceptions like ClassNotFound.

In the driver, you need to terminate the client after execution as shown in the snippet below.

val env: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

val client = new FlinkDataClient()

// create sources
// apply functions
// add sinks
// ...

// block until job finish
env.execute()

// terminate the client on finish
client.terminate()
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkDataClient client = new FlinkDataClient();

// create sources
// apply functions
// add sinks
// ...

// block until job finish
env.execute();

// terminate the client on finish
client.terminate();

Use the close() callback provided by the Rich classes in Flink (RichMapFunction, RichFlatMapFunction, RichFilterFunction, or others) to terminate the Flink functions. For custom sinks, call terminate() in your SinkWriter.close() method (see the Flink 2.2 migration guide for details on migrating to the Sink V2 API).

/** Flink function with access to DataClient. */
abstract class CustomFunction extends RichFunction with Serializable {
  // initialize DataClient
  @transient
  private lazy val flinkDataClient: FlinkDataClient =
    new FlinkDataClient()

  // terminate DataClient
  override def close(): Unit =
    flinkDataClient.terminate()
}
/** Flink function with access to DataClient. */
abstract class CustomFunction implements RichFunction, Serializable {
  private transient FlinkDataClient flinkDataClient;

  @Override
  public void open(OpenContext openContext) throws Exception {
    flinkDataClient = new FlinkDataClient();
  }

  @Override
  public void close() throws Exception {
    flinkDataClient.terminate();
  }
}