GuidesAPI Reference
Guides

Receive traces from output stream

In this tutorial you learn how to use stream layers from HERE platform catalog to receive Tracking traces using either Data API or Kafka. For more information see HERE platform catalog and Stream layers.

What will you learn?

The tutorial walks you through creation of a client program that listens for traces and prints them out. Then you create a test device and use it to ingest telemetry which then appears as a trace in output stream.

What do you need?

Grant access to your stream

Before your application can use the stream you need to grant the application access to the stream's catalog. For more information, see how to add project members.

Consume traces from stream

To consume traces from the stream you can use HERE Data SDK for TypeScript. There are two ways to consume the traces, either with:

This document shows both options next. Choose the one that suits you best. Using Kafka directly is more efficient, while usage of REST API adds extra overhead.

Option 1: Polling traces with Data API

Start with creating a new directory and switch to it:

mkdir traces-stream-app-poll && cd traces-stream-app-poll

Then save this code to index.ts file in the new directory:

import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
  PollRequest,
  StreamLayerClient,
  SubscribeRequest,
  UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";
import util from 'util';

// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";

const layerId = "tc-traces";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });

  // Create StreamLayerClient.
  const streamLayerClient = new StreamLayerClient({
    catalogHrn: HRN.fromString(catalogHrn),
    layerId,
    settings: olpClientSettings,
  });

  // Create a subscription for the stream topic.
  const subscribeRequest = new SubscribeRequest().withMode("serial");
  const subscriptionId = await streamLayerClient.subscribe(subscribeRequest);

  // Consume data using poll method.
  const pollRequest = new PollRequest()
    .withMode("serial")
    .withSubscriptionId(subscriptionId);

  let interrupted = false;
  process.on('SIGINT', () => { interrupted = true; });

  console.log('Consuming traces (press Ctrl+C to exit)...');

  try {
    while (!interrupted) {
      const messages = await streamLayerClient.poll(pollRequest);
      for (const message of messages) {
        // Base64 decode the message data.
        const data = JSON.parse(
          Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
        );
        console.log(util.inspect(data, { depth: null, colors: true }));
      }
    }
  } finally {
    // Delete subscription from the stream topic.
    await streamLayerClient.unsubscribe(
      new UnsubscribeRequest()
        .withMode("serial")
        .withSubscriptionId(subscriptionId)
    );
  }
}

main();

You must now replace variables accessKeyId and accessKeySecret with OAuth 2.0 credentials you created earlier. Then replace projectHrn with your project HRN and catalogHrn with catalog HRN. To build and run the example program use following shell commands. Create a Typescript project and install its dependencies:

npm init --yes \
  && npm install --save-dev typescript \
  && npx tsc --init \
  && npm install --save-dev @types/node \
  && npm install --save \
    @here/olp-sdk-authentication \
    @here/olp-sdk-core \
    @here/olp-sdk-dataservice-read

Compile and run the program:

npx tsc && node .

Now that the program is running you need to ingest telemetry to trigger creation of a trace. Leave the program running and open a new terminal. Below is the second option of using Kafka which you can skip for now. Next step is to retrieve user token.

Option 2: Consume with Kafka

Start with creating a new directory and switch to it:

mkdir traces-stream-app-consume && cd traces-stream-app-consume

Then save this code to index.ts file in the new directory:

import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";
import { Kafka, logLevel } from "kafkajs";
import util from 'util';

// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";

const layerId = "tc-traces";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });

  // Use API lookup API to find a Stream v2 API endpoint.
  const requestBuilder = await RequestFactory.create(
    "stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
  );
  // Get kafka connection parameters from the Stream v2 API.
  const streamLayerEndpoint = await StreamApi.endpointByConsumer(
    requestBuilder, { layerId, type: "consumer" }
  );

  // Create kafka consumer.
  const kafka = new Kafka({
    brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
    clientId: streamLayerEndpoint.clientId,
    logLevel: logLevel.WARN,
    sasl: {
      mechanism: "oauthbearer",
      oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
    },
    ssl: true,
  });
  const consumer = kafka.consumer({
    groupId: `${streamLayerEndpoint.consumerGroupPrefix}test-group`
  });

  // Subscribe and listen for traces.
  await consumer.connect();
  await consumer.subscribe({ topic: streamLayerEndpoint.topic });
  await consumer.run({
    eachMessage: async ({ message }) => {
      const data = JSON.parse((message.value || "{}").toString());
      console.log(util.inspect(data, { depth: null, colors: true }));
    },
  });

  // Disconnect and exit on Ctrl+C.
  console.log('Consuming traces (press Ctrl+C to exit)...');
  process.on('SIGINT', async () => {
    console.log('Disconnecting...');
    await consumer.disconnect();
  });
}

main();

You must now replace variables accessKeyId and accessKeySecret with OAuth 2.0 credentials you created earlier. Then replace projectHrn with your project HRN and catalogHrn with catalog HRN. To build and run the example program use following shell commands. Create a Typescript project and install its dependencies:

npm init --yes \
  && npm install --save-dev typescript \
  && npx tsc --init \
  && npm install --save-dev @types/node \
  && npm install --save \
    @here/olp-sdk-authentication \
    @here/olp-sdk-core \
    @here/olp-sdk-dataservice-api \
    kafkajs

Compile and run the program:

npx tsc && node .

Now that the program is running you need to ingest telemetry to trigger creation of a trace. Leave the program running and open a new terminal.

Retrieve user token

To perform next steps you need a project scoped access token. Follow instructions of how to sign in to platform project to get the token. The scoped access token is referenced as {accessToken} from now on.

Obtain a test device

You need a device that has been claimed for your project. You can use either a device that you already have or follow instructions to:

When you have the test device, take note of its tracking id, which is needed in upcoming steps as {trackingId}.

Ingest telemetry and observe traces

Now ingest telemetry to send temperature values both within, and outside of the temperature range:

timestamp=$(date +%s)
curl https://tracking.hereapi.com/v3/ \
  --request POST \
  --header 'authorization: Bearer {accessToken}' \
  --header 'content-type: application/json' \
  --data '{
  "id": "{trackingId}",
  "data": [
    {
      "timestamp": '$timestamp'000,
      "position": {
        "lat": "52.4988217",
        "lng": "13.35170005",
        "alt": "86",
        "accuracy": "25"
      }
    }
  ]
}'

This command triggers a creation of a trace object and you see it printed out in the traces consuming program. Here is an example of what you might see:

Consuming traces (press Ctrl+C to exit)...
{
  type: 'traces',
  version: '1.0',
  traces: [
    {
      position: { alt: 86, lat: 52.4988217, lng: 13.35170005, accuracy: 25 },
      serverTimestamp: 1736956128828,
      system: { computed: { moving: false } },
      timestamp: 1736956128000
    }
  ],
  correlationId: '8cf668d2-ba58-435a-9caa-603c748cbbfa',
  trackingId: 'HERE-626daaf1-cd80-4165-8785-1718cf76bd07'
}

For more details of the message format, see Traces output streams.