AWS Kinesis Data Streams
The Amazon Kinesis Data Streams data connector is implemented for source and sink connections.
This is a minimal example of the Kinesis connector URI scheme:
kinesis://default?arn=arn:aws:kinesis:eu-west-1:123456789123:stream/stream-name
Authentication
- Default authentication
This method is used when credentials aren't provided in the connector URI, for example:
kinesis://default?arn=...In that case, the default AWS SDK credentials provider (reading environment variables, system properties,~/.aws/credentials) is used. - Explicit (static) credentials provider
This method is used when the credentials are provided explicitly in the connector URI, for example:
kinesis://aws-access-id:aws-secret-key@default?arn=...
Configuration
Use the following parameters to configure the connector.
Parameters supported for both source and sink
The arn parameter
arn parameterMandatory parameter. Defines the Amazon Resource Name (ARN) to uniquely identify the Kinesis Stream.
The endpoint parameter
endpoint parameterOptional parameter. Enables the use of an alternatively hosted Kinesis API, such as a LocalStack deployment, for example: kinesis://default?arn=arn:aws:kinesis:eu-west-1:123456789123:stream/stream-name&endpoint=http://localhost:4566
Parameters supported for the source
The offset parameter
offset parameterOptional parameter. Defines the position in the shard from which to start reading records.
Only LATEST, TRIM_HORIZON, and AT_TIMESTAMP values are supported.
The default value is LATEST.
To learn more about the offset and shard iterator types, see the Kinesis API reference.
The offset_ts parameter
offset_ts parameterMandatory parameter if offset=AT_TIMESTAMP. Defines the timestamp of the message to start reading from.
Supported formats are:
- double value of epoch timestamp in seconds, for example:
1735603199.000or1735603199 - w3c timestamp, for example:
2024-12-30T23:59:59.000Z
The retry_attempts parameter
retry_attempts parameterOptional parameter.
Defines the number of retries if a recoverable exception happened while reading
(for example, ProvisionedThroughputExceededException).
The default value is 3.
The retry_min_delay parameter
retry_min_delay parameterOptional parameter.
Defines the minimum delay in milliseconds before the next attempt to read.
The default value is 300.
The retry_max_delay parameter
retry_max_delay parameterOptional parameter.
Defines the maximum delay in milliseconds before the next attempt to read.
The default value is 1000.
The last three retry_xxx parameters are working as the retry policy with exponential backoff.
That means, for the default values, there will be three retry attempts:
- at 300 ms (equal to
retry_min_delay) - at 600 ms (equal to
retry_min_delayx 2) - at 1000 ms (maximum between previous delay of 600 ms x 2 = 1200 and
retry_max_delay, so 1000 ms)
Parameters supported for sink
The fail_on_error parameter
fail_on_error parameterOptional parameter.
Enables the failure-intolerant mode, where the Flink job will fail immediately upon encountering any write error without attempting to retry.
The default value is false.
You can set it to true if you are resolving a connection troubleshooting
issue to receive error log messages more quickly.
The max_batch_size parameter
max_batch_size parameterOptional parameter.
Defines the maximum number of elements that may be passed in a list to be written downstream.
The default value is 500.
The max_in_flight_requests parameter
max_in_flight_requests parameterOptional parameter.
Defines the maximum number of uncompleted calls to submitRequestEntries.
Once this number has been reached, writes and callbacks to add elements
to the buffer may block until one or more requests to submitRequestEntries complete.
You may want to consider reducing this value when using a weak network connection.
The default value is 50.
The max_buffered_requests parameter
max_buffered_requests parameterOptional parameter.
Defines the maximum buffer length. You may consider reducing this value to make more backpressure from the sink job in the Flink job graph.
The default value is 10000.
The max_batch_size_in_bytes parameter
max_batch_size_in_bytes parameterOptional parameter.
Defines the maximum batch size that can be written at once.
You may consider reducing this value on a weak network connection to avoid
such errors java.io.IOException: The connection was closed during the request.
However, it should not be lower than max_record_size_in_bytes.
The default value is 5242880: 5 * 1024 * 1025.
The max_time_in_buffer_ms parameter
max_time_in_buffer_ms parameterOptional parameter.
Defines the maximum amount of time an element may remain in the buffer.
The default value is 5000.
The max_record_size_in_bytes parameter
max_record_size_in_bytes parameterOptional parameter.
Defines the maximum size of each record in bytes. If a record larger
than this is passed to the sink, it will throw an IllegalArgumentException.
The default value is 1048576: 1 * 1024 * 1024.
Kinesis data stream
Kinesis data streams helps in collection, processing, and analysis of large streams of data records in real-time.
Kinesis data streams are internally built as shards. Probe data messages/records are routed to a certain shard as per the partition key provided in the data record.
Within a shard, messages that belong to same partition key are additionally sorted using the sequence number.
Note
- To learn more about partition key and sequence number, see Kinesis key concepts.
- To learn more about Amazon Kinesis Data Streams, see the related Amazon documentation.
Updated 26 days ago