Flink entry point
Flink entry point
FlinkDataClient is the main entry point for the API. FlinkDataClient is a
heavyweight object that needs to be created once, reused and terminated. If you
do not terminate this object, the job may never finish and result in exceptions
like ClassNotFound.
In the driver, you need to terminate the client after execution as shown in the snippet below.
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val client = new FlinkDataClient()
// create sources
// apply functions
// add sinks
// ...
// block until job finish
env.execute()
// terminate the client on finish
client.terminate()StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkDataClient client = new FlinkDataClient();
// create sources
// apply functions
// add sinks
// ...
// block until job finish
env.execute();
// terminate the client on finish
client.terminate();Use the close() callback provided by the Rich classes in Flink
(RichMapFunction, RichFlatMapFunction, RichFilterFunction, or others) to
terminate the Flink functions. For custom sinks, call terminate() in your
SinkWriter.close() method (see the
Flink 2.2 migration guide for details on
migrating to the Sink V2 API).
/** Flink function with access to DataClient. */
abstract class CustomFunction extends RichFunction with Serializable {
// initialize DataClient
@transient
private lazy val flinkDataClient: FlinkDataClient =
new FlinkDataClient()
// terminate DataClient
override def close(): Unit =
flinkDataClient.terminate()
}/** Flink function with access to DataClient. */
abstract class CustomFunction implements RichFunction, Serializable {
private transient FlinkDataClient flinkDataClient;
@Override
public void open(OpenContext openContext) throws Exception {
flinkDataClient = new FlinkDataClient();
}
@Override
public void close() throws Exception {
flinkDataClient.terminate();
}
}Updated 22 days ago