How to use Kafka Metrics for connectivity status
How to use Kafka Metrics for connectivity status
The Data Client Library provides the capability to have programmatic access to Kafka consumer metrics, which can be used to assess connectivity status.
Usage
You can access the underlying Kafka consumer metrics via SubscriptionControl
(which you can either get directly or through a Subscription instance).
// Create QueryApi
val queryApi = DataClient().queryApi(catalogHrn)
// Define Kafka consumer settings assigning an id for the consumer (if no id is specified, a random UUID is assigned)
val consumerSettings =
ConsumerSettings(groupName = "test-consumer", consumerId = "consumer-id")
// Subscribe to a layer
val subscription: Future[Subscription] = queryApi.subscribe("stream-layer", consumerSettings)
// Retrieve Kafka metrics
val kafkaMetrics: Future[Map[String, String]] =
subscription.flatMap(s => s.subscriptionControl.getKafkaMetrics())// Create QueryApi
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
// Define Kafka consumer settings assigning an id for the consumer (if no id is specified, a
// random UUID is assigned)
ConsumerSettings consumerSettings =
new ConsumerSettings.Builder()
.withGroupName("test-consumer")
.withConsumerId("consumer-id")
.build();
// Subscribe to a layer
CompletionStage<Subscription> subscriptionFuture =
queryApi.subscribe("stream-layer", consumerSettings);
// Retrieve Kafka metrics
subscriptionFuture.thenApply(
subscription -> {
scala.collection.immutable.List<String>
stringList = // import scala.collection.JavaConverters;
JavaConverters.asScalaBufferConverter(new ArrayList<String>()).asScala().toList();
return subscription.getSubscriptionControl().getKafkaMetrics(stringList);
});Instead of retrieving all available metrics, you can request only a specific
subset of them, by adding each metric name to the list passed in
getKafkaMetrics (see:
Scala API Reference
or
Java API Reference.
Connectivity status
You can find an overview of all Kafka metrics in the official Apache Kafka Documentation.
In order to assess consumer connectivity status, the following global connectivity metrics can be used:
| Metric name | Description |
|---|---|
| connection-count | The current number of active connections. |
| connection-creation-rate | New connections established per second in the window. |
| connection-close-rate | Total connections closed in the window. |
| io-ratio | The fraction of time the I/O thread spent doing I/O. |
| io-time-ns-avg | The average length of time for I/O per select call in nanoseconds. |
| io-wait-ratio | The fraction of time the I/O thread spent waiting. |
| io-wait-time-ns-avg | The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds. |
| select-rate | Number of times the I/O layer checked for new I/O to perform per second. |
NoteThe metrics indicated above have the following MBean name
kafka.consumer:type=consumer-metrics,client-id=([-.w]+).
Flink Kafka metrics
If your application uses Flink together with Kafka, you can access Kafka producer and consumer metrics exported through the Prometheus reporter, and use them to create dashboards in Grafana. You can find additional information by following the link: Flink Metrics.
NoteIn order to enable reporting Kafka metrics in Flink, you will need to set the configuration value of setting
here.platform.data-client.enable-flink-kafka-metricstotrue(see Configuration).
Updated 18 days ago