GuidesAPI Reference
Guides

Send data using input stream

Device telemetry ingestions to Tracking service are made with the ingestion REST API or through HERE platform catalog stream layer. Ingesting telemetry through input stream layer provides more efficient way of ingesting large amounts of telemetry data.

In addition to device telemetry, external location events can also be ingested to input stream.

Prerequisites

Prerequisites for making ingestions using input streams are:

The layer ID for the input stream layer is tc-ingest.

The input stream catalog and tc-ingest layer for it are automatically created for the project when the input stream feature is enabled for the project.

Before your application can use the stream you need to grant the application access to the stream's catalog. For more information, see how to add project members.

Input stream catalog is a resource in the project and project members have access to project's resources.

Input stream data format

The message data published to tc-ingest input stream layer has to be in JSON format and contain the following properties:

  • requestId: <uuid>, ID used for identifying requests. Used for logging and error reporting.
  • type: <string>, Type of the input data, identifies the used data format. Only values event and telemetry are supported.
  • version: <string>, Data format version. Only value 1.0 is supported.
  • data: <object>, Input stream telemetry ingestion uses the same data objects as /v3 ingestion API requests with the difference that id is a mandatory property here. Input stream external location event ingestion uses an array of external location events, their definition can be found by downloading the input stream ingestion schema.

There is an example of input stream data object in the following Usage with Kafka section.

📘

Note

When developing a program which ingests data to input stream, it is a good practice to verify the validity of the request data format using the /v3 ingestion API which returns descriptive error messages for invalid requests.

Usage with Kafka

In principle the steps for using input streams with Kafka are as follows:

The following code snippet uses HERE Data SDK for TypeScript to demonstrate a minimal program that implements these steps.

Start with creating a new directory and switch to it:

mkdir input-stream-app-consume && cd input-stream-app-consume

Then save this code to index.ts file in the new directory:

import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";

import { Kafka } from "kafkajs";
import { v4 as uuidv4 } from "uuid";

// Example telemetry data to be published.
const trackingId = "replace-with-your-tracking-id";
const streamIngestionData = {
  requestId: uuidv4(),
  type: "telemetry",
  version: "1.0",
  data: {
    id: trackingId,
    data: [
      {
        timestamp: Date.now(),
        position: {
          accuracy: 50,
          lat: 60.171318,
          lng: 24.941463
        }
      }
    ]
  }
};

// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-input-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-ingest";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });
  // Use API lookup API to find a Stream v2 API endpoint.
  const requestBuilder = await RequestFactory.create(
    "stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
  );
  // Get kafka connection parameters from the Stream v2 API.
  const streamLayerEndpoint = await StreamApi.endpointByConsumer(
    requestBuilder, { layerId, type: "producer" }
  );

  // Create kafka producer.
  const kafka = new Kafka({
    clientId: streamLayerEndpoint.clientId,
    brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
    ssl: true,
    sasl: {
      mechanism: "oauthbearer",
      oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
    }
  });
  const producer = kafka.producer();

  // Publish telemetry.
  await producer.connect();
  await producer.send({
    topic: streamLayerEndpoint.topic,
    messages: [{
      key: trackingId, value: JSON.stringify(streamIngestionData)
    }]
  });
  await producer.disconnect();
}

main();

You must now replace variables accessKeyId and accessKeySecret with OAuth 2.0 credentials you created earlier. Replace also projectHrn with your project HRN, catalogHrn with catalog HRN and trackingId with ID of an existing tracker. To build and run the example program use following shell commands. Create a Typescript project and install its dependencies:

npm init --yes \
  && npm install --save-dev typescript \
  && npx tsc --init \
  && npm install --save-dev @types/node @types/uuid \
  && npm install --save \
    @here/olp-sdk-authentication \
    @here/olp-sdk-core \
    @here/olp-sdk-dataservice-api \
    kafkajs \
    uuid

Compile and run the program:

npx tsx index.ts
📘

Note

Recommendation is that the device trackingId is used as Kafka partition key for the ingestions made for the device.

You can verify that the ingestions have been processed by getting traces for your device and checking that the ingested telemetry is present.

Usage with REST API

HERE platform offers an Ingest v1 API for ingesting data to stream using REST API calls.

Usage with it:

Following curl commands demonstrate how to ingest input stream data from telemetry.json file using application access token and the Ingest v1 API.

# Use API lookup API to get base URL for Ingest v1 API.
catalogHrn="replace-with-your-catalog-hrn";
curl -X GET "https://api-lookup.data.api.platform.here.com/lookup/v1/resources/${catalogHrn}/apis/ingest/v1" \
  -H 'Authorization: Bearer '"${accessToken}"

# Post input stream telemetry object from telemetry.json file.
ingestV1BaseURL="replace-with-baseURL-from-api-lookup"
curl -X POST "${ingestV1BaseURL}/layers/tc-ingest" \
  -H 'Authorization: Bearer '"${accessToken}" \
  -H 'Content-Type: application/json' \
  -d @telemetry.json

Dead letter queue

In addition to tc-ingest layer the input stream catalog has a tc-dead-letter layer (dead letter queue). The Tracking service publishes an error message to the dead letter queue for any invalid telemetry ingestion requests sent to input stream.

The error message data published to tc-dead-letter output stream layer is in JSON format and contains the following properties:

  • requestId: <uuid>, ID used for identifying requests. Matches to the requestId in the failed request.
  • correlationId: <uuid>, ID used for correlating requests within HERE Tracking. Used for logging and error reporting.
  • type: <string>, Type of the input data, identifies the used data format. Only values event and telemetry are supported.
  • version: <string>, Data format version. Only value 1.0 is supported.
  • error: <object>, Error object.
  • error.title: <string>, Human-readable error description.
  • error.status: <string>, HTTP status code.
  • error.code: <string>, Error code.
  • error.cause: <string>, Human-readable explanation for the error.
  • error.action: <string>, Human-readable description of the action that can be taken to correct the error.
  • error.details: <object[]>, Optional array of error details.
  • error.correlationId: <uuid>, ID used for correlating requests within HERE Tracking.

Below is an example of a dead letter queue message for an invalid request.

{
  "error": {
    "title": "Bad Request",
    "status": 400,
    "code": "E902400",
    "cause": "Input stream ingestion validation failed",
    "action": "Correct the request and try again",
    "correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
    "details": [
      {
        "name": "TrackingError",
        "correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
        "code": 400,
        "reason": {
          "errors": [
            {
              "instancePath": "/data/data/0/position",
              "schemaPath": "#/properties/data/properties/data/items/properties/position/required",
              "keyword": "required",
              "params": {
                "missingProperty": "lng"
              },
              "message": "must have required property 'lng'"
            }
          ]
        }
      }
    ]
  },
  "type": "telemetry",
  "version": "1.0",
  "correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
  "requestId": "6f23e24a-aca2-4eee-861a-adf95e5902f2"
}
📘

Note

The input stream content type is application/json and only JSON format input is processed. If other than JSON format data is published, the request is ignored and no error message is sent to the dead letter queue.

The following example is made with HERE Data SDK for TypeScript and it shows how to consume messages from a dead letter queue.

import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
  PollRequest,
  StreamLayerClient,
  SubscribeRequest,
  UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";

// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-input-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-dead-letter";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });

  // Create StreamLayerClient.
  const streamLayerClient = new StreamLayerClient({
    catalogHrn: HRN.fromString(catalogHrn),
    layerId,
    settings: olpClientSettings,
  });

  // Create a subscription for the stream topic.
  const subscribeRequest = new SubscribeRequest().withMode("serial");
  const subscribtionId = await streamLayerClient.subscribe(subscribeRequest);

  // Consume data using poll method.
  const pollRequest = new PollRequest()
    .withMode("serial")
    .withSubscriptionId(subscribtionId);
  const messages = await streamLayerClient.poll(pollRequest);
  for (const message of messages) {
    // Base64 decode the message data.
    const data = JSON.parse(
      Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
    );
    console.log('Received message:', JSON.stringify(data, null, 2));
  }

  // Delete subscription from the stream topic.
  await streamLayerClient.unsubscribe(
    new UnsubscribeRequest()
      .withMode("serial")
      .withSubscriptionId(subscribtionId)
  );
}

main();