How to use Data Archiving Library metrics
How to use Data Archiving Library metrics
The Data Archiving Library provides a following set of metrics to monitor the archiving process:
| Metric | Description |
|---|---|
| messageCounter | number of messages read by operator (should be same as number of messages consumed by source) |
| getDataSuccessCounter | number of messages successfully downloaded from blob-store |
| getDataFailCounter | number of messages failed to download from blob-store |
| sizeCounter | size of messages (byte array size) |
| getKeysNullCounter | number of times getKeys/getMultipleKeys/getSplittedKeys API returns null |
| getKeysFailCounter | number of times getKeys/getMultipleKeys/getSplittedKeys API results in error (null case is included in this metric) |
| timeWindowMissingCounter | number of times the value of timewindow type index attribute was null |
| timeWindowInvalidCounter | number of times the value of timewindow type index attribute was invalid (expects long value) |
| hereTileInvalidZoomLevelCounter | number of times the value of heretile type index attribute is associated with wrong zoom level |
| hereTileNonParsablelCounter | number of times the value of heretile type index attribute was invalid (expects long value) |
| invalidColumnValueCounter | number of times the value of any index attribute had unsupported data type (string, int, long, boolean are supported data types) |
| parserIgnoreCounter | number of messages ignored by the library because of validation error |
| parserDeadLetterCounter | number of messages marked by the library to archive in dead-letter storage because of validation error |
| keyPartitionCounter | number of unique index attribute groups in a time-window |
| aggregateSuccessMessageCounter | number of messages successfully processed by aggregate API |
| aggregateSuccessSizeCounter | size of messages successfully processed by aggregate API |
| aggregateNullMessageCounter | number of messages for which aggregate API returns null |
| aggregateNullSizeCounter | size of messages for which aggregate API returns null |
| aggregateFailMessageCounter | number of messages for which aggregate API results in error (null case is included in this metric) |
| aggregateFailSizeCounter | size of messages for which aggregate API results in error (null case is included in this metric) |
| aggregateIgnoreCounter | number of times the result of aggregate API is ignored |
| aggregateDeadLetterCounter | number of times the messages passed to aggregate API are aggregated for dead-letter storage |
| windowDeadLetterMessageCounter | number of messages setup for archival in dead-letter storage |
| windowDeadLetterSizeCounter | size of messages setup for archival in dead-letter storage |
| transformDeadletterSuccessCounter | number of messages successfully transformed using transformForDeadLetter API for dead-letter storage |
| transformDeadletterFailCounter | number of messages failed to transform using transformForDeadLetter API for dead-letter storage |
| blobSuccessRequestCounter | number of files successfully uploaded to blob-store |
| blobSizeCounter | size of files successfully uploaded to blob-store |
| blobTimeCounter | time taken by the library to upload the files to blob-store |
| indexInvokeCounter | number of index records received by the sink operator (should be same as blobSuccessRequestCounter) |
| indexUploadCounter | number of index records successfully published to index-store |
| indexValidCounter | number of index records received by the sink operator for normal archival storage |
| indexDeadLetterCounter | number of index records received by the sink operator for special dead-letter storage |
| indexSuccessRequestCounter | number of successful requests to index-store (1 request can have N number of index records) |
| indexTimeCounter | time taken by the library to publish the index records to index-store |
| indexBatchLimitCounter | number of times the sink operator reached batch limit for number of index records to be published in 1 request |
| indexTimeoutCounter | number of times the request to publish index records was triggered by timeout |
Add user-defined metric
While the Data Archiving Library provides a comprehensive set of the archiving process metrics, the user has the option to define their own metrics using the standard Flink APIs. To define a new custom metric:
- Add your custom metric as an instance variable in your UDF implementation
public class MyExampleImplementation implements UDF {
private Counter customMetric;
...
}
- Override the
open(Configuration parameters, RuntimeContext runtimeContext)API and register the custom metric which you added in the previous step with theMetricGroupof the providedRuntimeContext. For example:
...
@Override
public void open(Configuration parameters, RuntimeContext runtimeContext) {
customMetric = runtimeContext.getMetricGroup().counter("myCustomMetric");
}
- Record the metric in your UDF implementation:
customMetric.inc();
- The following dependencies should be added with
providedscope. Note that this example is for maven build tool.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<scope>provided</scope>
</dependency>Refer to com/here/platform/data/archive/example/AvroSimpleKeyExample.java in the HERE Data SDK for Java & Scala Data Archive Avro Example for an example implementation.
Monitoring metrics
Metrics registered with the Flink runtime context will be available in Grafana prefixed with flink_taskmanager_job_task_operator_.
For example:
flink_taskmanager_job_task_operator_messageCounter
flink_taskmanager_job_task_operator_myCustomMetric
Refer to the Flink documention to learn more about the scope and naming of metrics.
Updated 22 days ago