Guides
Guides

AWS Kinesis Data Streams

The Amazon Kinesis Data Streams data connector is implemented for source and sink connections.

This is a minimal example of the Kinesis connector URI scheme:

kinesis://default?arn=arn:aws:kinesis:eu-west-1:123456789123:stream/stream-name

Authentication

  • Default authentication This method is used when credentials aren't provided in the connector URI, for example: kinesis://default?arn=... In that case, the default AWS SDK credentials provider (reading environment variables, system properties, ~/.aws/credentials) is used.
  • Explicit (static) credentials provider This method is used when the credentials are provided explicitly in the connector URI, for example: kinesis://aws-access-id:aws-secret-key@default?arn=...

Configuration

Use the following parameters to configure the connector.

Parameters supported for both source and sink

The arn parameter

Mandatory parameter. Defines the Amazon Resource Name (ARN) to uniquely identify the Kinesis Stream.

The endpoint parameter

Optional parameter. Enables the use of an alternatively hosted Kinesis API, such as a LocalStack deployment, for example: kinesis://default?arn=arn:aws:kinesis:eu-west-1:123456789123:stream/stream-name&endpoint=http://localhost:4566

Parameters supported for the source

The offset parameter

Optional parameter. Defines the position in the shard from which to start reading records.

Only LATEST, TRIM_HORIZON, and AT_TIMESTAMP values are supported.

The default value is LATEST.

To learn more about the offset and shard iterator types, see the Kinesis API reference.

The offset_ts parameter

Mandatory parameter if offset=AT_TIMESTAMP. Defines the timestamp of the message to start reading from.

Supported formats are:

  • double value of epoch timestamp in seconds, for example: 1735603199.000 or 1735603199
  • w3c timestamp, for example: 2024-12-30T23:59:59.000Z

The retry_attempts parameter

Optional parameter.

Defines the number of retries if a recoverable exception happened while reading (for example, ProvisionedThroughputExceededException).

The default value is 3.

The retry_min_delay parameter

Optional parameter.

Defines the minimum delay in milliseconds before the next attempt to read.

The default value is 300.

The retry_max_delay parameter

Optional parameter.

Defines the maximum delay in milliseconds before the next attempt to read.

The default value is 1000.

The last three retry_xxx parameters are working as the retry policy with exponential backoff.

That means, for the default values, there will be three retry attempts:

  • at 300 ms (equal to retry_min_delay)
  • at 600 ms (equal to retry_min_delay x 2)
  • at 1000 ms (maximum between previous delay of 600 ms x 2 = 1200 and retry_max_delay, so 1000 ms)

Parameters supported for sink

The fail_on_error parameter

Optional parameter.

Enables the failure-intolerant mode, where the Flink job will fail immediately upon encountering any write error without attempting to retry.

The default value is false.

You can set it to true if you are resolving a connection troubleshooting issue to receive error log messages more quickly.

The max_batch_size parameter

Optional parameter.

Defines the maximum number of elements that may be passed in a list to be written downstream.

The default value is 500.

The max_in_flight_requests parameter

Optional parameter.

Defines the maximum number of uncompleted calls to submitRequestEntries.

Once this number has been reached, writes and callbacks to add elements to the buffer may block until one or more requests to submitRequestEntries complete.

You may want to consider reducing this value when using a weak network connection.

The default value is 50.

The max_buffered_requests parameter

Optional parameter.

Defines the maximum buffer length. You may consider reducing this value to make more backpressure from the sink job in the Flink job graph.

The default value is 10000.

The max_batch_size_in_bytes parameter

Optional parameter.

Defines the maximum batch size that can be written at once.

You may consider reducing this value on a weak network connection to avoid such errors java.io.IOException: The connection was closed during the request.

However, it should not be lower than max_record_size_in_bytes.

The default value is 5242880: 5 * 1024 * 1025.

The max_time_in_buffer_ms parameter

Optional parameter.

Defines the maximum amount of time an element may remain in the buffer.

The default value is 5000.

The max_record_size_in_bytes parameter

Optional parameter.

Defines the maximum size of each record in bytes. If a record larger than this is passed to the sink, it will throw an IllegalArgumentException.

The default value is 1048576: 1 * 1024 * 1024.

Kinesis data stream

Kinesis data streams helps in collection, processing, and analysis of large streams of data records in real-time.

Kinesis data streams are internally built as shards. Probe data messages/records are routed to a certain shard as per the partition key provided in the data record.

Within a shard, messages that belong to same partition key are additionally sorted using the sequence number.

📘

Note