Guides
Guides

Apache Kafka

The Apache Kafka data connector is implemented for source and sink connections.

This is the Kafka data connector URI scheme:

kafka://broker.host.name:port/topic

Sink connector setup

This is an example of a sink connector URI:

kafka://some.broker.host:9092/topic-to-produce?listener_name=INTERNAL

The sink connector supports the listener_name parameter and all parameters used to configure the SSL security protocol.

listener_name parameter

Sets the name of the listener to connect to.

The value must be a valid name of one of the available Kafka broker config listeners.

This is an optional parameter. If not set, a {BROKER_HOST}:{PORT} name is used.

Source connector setup

Here is an example of a source connector URI:

kafka://some.broker.host:9092/topic-to-consume?partitions=0,1,3&offset=earliest&listener_name=INTERNAL

This example sets up the Kafka consumer to use the INTERNAL://some.broker.host:9092 broker servers and consume data from the topic-to-consume topic starting from the earliest existing offset and reading only from partitions 0, 1, and 3.

If the source topic doesn't exist, the source connector fails with the following error:

org.apache.kafka.common.errors.UnknownTopicOrPartitionException:
This server does not host this topic-partition

Also, the Source connector supports all the parameters for specifying SSL security protocol.

listener_name parameter

This parameter has the same meaning as in the Sink connector setup chapter.

offset parameter

Configures the auto.offset.reset parameter of the Kafka consumer configuration.

The only supported values are earliest and latest. The default value is latest.

partitions parameter

Configures a list of partitions. Kafka partition IDs are simple integers starting from 0. For a topic with four partitions, partition IDs 0, 1, 2, and 3 are valid.

By default, the connector consumes all partitions of the topic.

Kafka consumer group ID

The Kafka consumer group ID parameter can't be changed. It's a randomly generated name that follows this format: HERE-Anonymizer-KafkaSourceConnector-{RANDOM_STRING}.

Every instance has its own unique consumer group ID.

Using multi-partitioned sources

HERE Anonymizer Self-Hosted allows all probe data of a single vehicle to be placed into a single dedicated partition of a multi-partitioned topic. This means that there's always one partition that gets data from a specific vehicle.

If the data can be categorized by a vehicle/trajectory/trace identifier or by a geospatial identifier, such as a state or a big enough map tile, all Kafka messages must be directed into a single partition that is mapped to this specific identifier.

The easiest way to fulfill this requirement is to have all messages keyed using keys that are equal to category identifiers. For example, all messages for vehicle A must have the key A and all messages from the region California must have the California key.

To learn more, see the Apache Kafka documentation.

If the sink topic doesn't exist, a queue creates it automatically as a single partition topic.

kafka_props_file parameter

Specifies the path to a Java properties file containing advanced Kafka Consumer Config or Producer Config.

Parameters in this file override those set in the connector URI.

For example, with the URI kafka://broker:9092/topic?listener_name=PLAINTEXT&kafka_props_file=/path/to/kafka.properties and a kafka.properties file:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
bootstrap.servers=SASL_PLAINTEXT://broker1:9092,SASL_PLAINTEXT://broker2:9092
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="kafka" password\="kafka";

the connector uses the configuration from kafka.properties and ignores listener_name and broker:9092 from the URI. If you want to connect to multiple brokers, you must define the bootstrap.servers parameter in the kafka.properties file as a comma-separated list of broker URIs, as shown in the example above.

📘

Note

When using Java properties files, you must escape certain characters in values to ensure they are interpreted correctly.

These include:

  • Line terminators: Newline (\n), carriage return (\r), and form feed (\f) characters are escaped with a preceding backslash (e.g., \n for newline).

  • Special characters for keys/separators:

    The characters #, !, =, and : have special meaning in properties files (comments, key-value separators). If these characters are intended to be part of a value, they must be escaped with a preceding backslash (e.g., #, !, =, :).

  • Backslash:

    The backslash character (\) itself needs to be escaped with another backslash when it is part of a value (e.g., \ to represent a single backslash).

  • Unicode characters:

    Characters outside of the ISO-8859-1 character set (Latin-1) should be escaped using Unicode escape sequences (e.g., \uXXXX, where XXXX is the four-digit hexadecimal representation of the Unicode character). Leading Spaces in Values: Leading space characters in a value must be escaped with a preceding backslash to prevent them from being trimmed during loading (e.g., \ value). Embedded or trailing spaces typically do not require escaping.

Security protocols

This connector currently supports PLAINTEXT, SSL, and SASL security protocols.

To learn more, see the Apache Kafka documentation.

PLAINTEXT protocol configuration

By default, sink and source connectors use the PLAINTEXT protocol. No additional URI parameters are required.

SSL protocol configuration

To use the SSL protocol for sink or source connectors, you must define the required ssl parameters in the kafka.properties file. To learn more, see the Apache Kafka SSL Configuration.

Sink URI example:

kafka://host:9092/topic-to-produce?listener_name=SSL_SECURED&kafka_props_file=/path/to/kafka.properties

That will write to the topic-to-produce using broker servers SSL_SECURED://host:9092 and using SSL context from given files.

Source URI example:

kafka://host:9092/topic-to-read?partitions=0,2&offset=earliest&listener_name=SSL_SECURED&kafka_props_file=/path/to/kafka.properties

That will read the topic-to-read using broker servers SSL_SECURED://host:9092, reading only two partitions (0 and 2) with the earliest offset and using SSL context from given files. kafka.properties file example:

security.protocol=SSL
ssl.keystore.type=PEM
ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE-----\n....\n...\n...\n-----END CERTIFICATE-----\n
ssl.keystore.key=-----BEGIN PRIVATE KEY-----\n...\n....\n...\n-----END PRIVATE KEY-----\n
ssl.truststore.certificates=-----BEGIN CERTIFICATE-----\n...\n...\n...\n-----END CERTIFICATE-----\n
ssl.truststore.type=PEM

SASL protocol configuration

To use the SASL protocol for sink or source connectors, you must define the required SASL parameters in the kafka.properties file. To learn more, see the Apache Kafka SASL Configuration.

Sink URI example:

kafka://host:9092/topic-to-produce?kafka_props_file=/path/to/kafka.properties

Source URI example:

  kafka://host:9092/topic-to-read?kafka_props_file=/path/to/kafka.properties

kafka.properties file example:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="kafka" password\="kafka";

Using Kafka connectors with Azure Event Hubs

To use the Kafka connectors with the Azure Event Hubs, you must:

  • Authenticate using a connection string. To learn more, see Get an Event Hubs connection string.

  • Configure the connector URI as follows:

    • host

      Must be the fully qualified namespace which is <NamespaceName>.servicebus.windows.net

    • port

      Must be set to 9093. To learn more, see Authentication using SAS in the Azure Event Hubs documentation.

Sink connector URI example for connecting to the Azure Event Hubs:

kafka://mynamespace.servicebus.windows.net:9093/event-hub-to-produce?kafka_props_file=/path/to/kafka.properties

Source connector URI example for connecting to the Azure Event Hubs:

kafka://mynamespace.servicebus.windows.net:9093/event-hub-to-read?partitions=0,2&offset=earliest&kafka_props_file=/path/to/kafka.properties

kafka.properties file example for connecting to the Azure Event Hubs:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="$ConnectionString" password\="Endpoint\=sb\://mynamespace.servicebus.windows.net/;SharedAccessKeyName\=SharedAccessKeyName;SharedAccessKey\=SharedAccessKey";

Using Kafka connectors with Amazon Managed Streaming for Apache Kafka (MSK)

To use the Kafka connectors with the Amazon MSK, you must authenticate using:

  • IAM access control authentication method.

    Sink URI example:

    kafka://host:9098/topic-to-produce?kafka_props_file=/path/to/kafka.properties

    Source URI example:

    kafka://host:9098/topic-to-read?partitions=0,2&offset=earliest&kafka_props_file=/path/to/kafka.properties

    kafka.properties file example:

    security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  • SASL/SCRAM authentication method.

    Sink URI example:

    kafka://host:9096/topic-to-produce?kafka_props_file=/path/to/kafka.properties

    Source URI example:

    kafka://host:9096/topic-to-read?partitions=0,2&offset=earliest&kafka_props_file=/path/to/kafka.properties

    kafka.properties file example:

    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=<USERNAME> password=<PASSWORD>;