GuidesChangelogData Inspector Library API Reference
Guides

How to use Kafka Metrics for connectivity status

How to use Kafka Metrics for connectivity status

The Data Client Library provides the capability to have programmatic access to Kafka consumer metrics, which can be used to assess connectivity status.

Usage

You can access the underlying Kafka consumer metrics via SubscriptionControl (which you can either get directly or through a Subscription instance).

// Create QueryApi
val queryApi = DataClient().queryApi(catalogHrn)

// Define Kafka consumer settings assigning an id for the consumer (if no id is specified, a random UUID is assigned)
val consumerSettings =
  ConsumerSettings(groupName = "test-consumer", consumerId = "consumer-id")

// Subscribe to a layer
val subscription: Future[Subscription] = queryApi.subscribe("stream-layer", consumerSettings)

// Retrieve Kafka metrics
val kafkaMetrics: Future[Map[String, String]] =
  subscription.flatMap(s => s.subscriptionControl.getKafkaMetrics())
// Create QueryApi
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// Define Kafka consumer settings assigning an id for the consumer (if no id is specified, a
// random UUID is assigned)
ConsumerSettings consumerSettings =
    new ConsumerSettings.Builder()
        .withGroupName("test-consumer")
        .withConsumerId("consumer-id")
        .build();

// Subscribe to a layer
CompletionStage<Subscription> subscriptionFuture =
    queryApi.subscribe("stream-layer", consumerSettings);

// Retrieve Kafka metrics
subscriptionFuture.thenApply(
    subscription -> {
      scala.collection.immutable.List<String>
          stringList = // import scala.collection.JavaConverters;
          JavaConverters.asScalaBufferConverter(new ArrayList<String>()).asScala().toList();
      return subscription.getSubscriptionControl().getKafkaMetrics(stringList);
    });

Instead of retrieving all available metrics, you can request only a specific subset of them, by adding each metric name to the list passed in getKafkaMetrics (see: Scala API Reference or Java API Reference.

Connectivity status

You can find an overview of all Kafka metrics in the official Apache Kafka Documentation.

In order to assess consumer connectivity status, the following global connectivity metrics can be used:

Metric nameDescription
connection-countThe current number of active connections.
connection-creation-rateNew connections established per second in the window.
connection-close-rateTotal connections closed in the window.
io-ratioThe fraction of time the I/O thread spent doing I/O.
io-time-ns-avgThe average length of time for I/O per select call in nanoseconds.
io-wait-ratioThe fraction of time the I/O thread spent waiting.
io-wait-time-ns-avgThe average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.
select-rateNumber of times the I/O layer checked for new I/O to perform per second.

Note

The metrics indicated above have the following MBean name kafka.consumer:type=consumer-metrics,client-id=([-.w]+).

Flink Kafka metrics

If your application uses Flink together with Kafka, you can access Kafka producer and consumer metrics exported through the Prometheus reporter, and use them to create dashboards in Grafana. You can find additional information by following the link: Flink Metrics.

Note

In order to enable reporting Kafka metrics in Flink, you will need to set the configuration value of setting here.platform.data-client.enable-flink-kafka-metrics to true (see Configuration).