GuidesAPI Reference
Guides

Receive events from output stream

Events occur when certain conditions are fulfilled. In most cases, the condition is defined by a rule. With a device associated to a rule, an event occurs when a threshold defined by the rule is crossed.

Introduction

This tutorial demonstrates how to use the stream layer from the HERE platform catalog to receive Tracking Events using Data API. In the tutorial, you create a client program that listens for Events and prints them. From here, you can then create a test device and a sensor rule for the device, after which you can ingest telemetry which triggers the sensor rule and causes an event to occur.

What do you need?

Grant access to your stream

Before your application can access the stream you need to grant the application access to the stream's catalog. For more information, see how to add project members.

Consume events from stream

To consume events from the stream you can use HERE Data SDK for TypeScript. There are two ways to access the events, either with:

This document shows both options next. Choose the one that suits you best. Using Kafka directly is more efficient, while usage of REST API adds extra overhead.

Option 1: Polling events with Data API

Start with creating a new directory and switch to it:

mkdir events-stream-app-poll && cd events-stream-app-poll

Then save this code to index.ts file in the new directory:

import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
  PollRequest,
  StreamLayerClient,
  SubscribeRequest,
  UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";

// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";

const layerId = "tc-events";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });

  // Create StreamLayerClient.
  const streamLayerClient = new StreamLayerClient({
    catalogHrn: HRN.fromString(catalogHrn),
    layerId,
    settings: olpClientSettings,
  });

  // Create a subscription for the stream topic.
  const subscribeRequest = new SubscribeRequest().withMode("serial");
  const subscriptionId = await streamLayerClient.subscribe(subscribeRequest);

  // Consume data using poll method.
  const pollRequest = new PollRequest()
    .withMode("serial")
    .withSubscriptionId(subscriptionId);

  let interrupted = false;
  process.on('SIGINT', () => { interrupted = true; });

  console.log('Consuming events (press Ctrl+C to exit)...');

  try {
    while (!interrupted) {
      const messages = await streamLayerClient.poll(pollRequest);
      for (const message of messages) {
        // Base64 decode the message data.
        const data = JSON.parse(
          Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
        );
        console.log(data);
      }
    }
  } finally {
    // Delete subscription from the stream topic.
    await streamLayerClient.unsubscribe(
      new UnsubscribeRequest()
        .withMode("serial")
        .withSubscriptionId(subscriptionId)
    );
  }
}

main();

You must now replace variables accessKeyId and accessKeySecret with OAuth 2.0 credentials you created earlier. Then replace projectHrn with your project HRN and catalogHrn with catalog HRN. To build and run the example program use following shell commands. Create a Typescript project and install its dependencies:

npm init --yes \
  && npm install --save-dev typescript \
  && npx tsc --init \
  && npm install --save-dev @types/node \
  && npm install --save \
    @here/olp-sdk-authentication \
    @here/olp-sdk-core \
    @here/olp-sdk-dataservice-read

Compile and run the program:

npx tsx index.ts

Now that the program is running you need to setup a condition that triggers an event. Leave the program running and open a new terminal. Below is the second option of using Kafka which you can skip for now. Next step is to retrieve user token.

Option 2: Consume with Kafka

Start with creating a new directory and switch to it:

mkdir events-stream-app-consume && cd events-stream-app-consume

Then save this code to index.ts file in the new directory:

import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";
import { Kafka, logLevel } from "kafkajs";
import { v4 as uuidv4 } from "uuid";


// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";

const layerId = "tc-events";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });

  // Use API lookup API to find a Stream v2 API endpoint.
  const requestBuilder = await RequestFactory.create(
    "stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
  );
  // Get kafka connection parameters from the Stream v2 API.
  const streamLayerEndpoint = await StreamApi.endpointByConsumer(
    requestBuilder, { layerId, type: "consumer" }
  );

  // Create kafka consumer.
  const kafka = new Kafka({
    brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
    clientId: streamLayerEndpoint.clientId,
    logLevel: logLevel.WARN,
    sasl: {
      mechanism: "oauthbearer",
      oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
    },
    ssl: true,
  });
  const consumer = kafka.consumer({
    groupId: `${streamLayerEndpoint.consumerGroupPrefix}test-group`
  });

  // Subscribe and listen for events.
  await consumer.connect();
  await consumer.subscribe({ topic: streamLayerEndpoint.topic });
  await consumer.run({
    eachMessage: async ({ message }) => {
      const data = JSON.parse((message.value || "{}").toString());
      console.log(data);
    },
  });

  // Disconnect and exit on Ctrl+C.
  console.log('Consuming events (press Ctrl+C to exit)...');
  process.on('SIGINT', async () => {
    console.log('Disconnecting...');
    await consumer.disconnect();
  });
}

main();

You must now replace variables accessKeyId and accessKeySecret with OAuth 2.0 credentials you created earlier. Then replace projectHrn with your project HRN and catalogHrn with catalog HRN. To build and run the example program use following shell commands. Create a Typescript project and install its dependencies:

npm init --yes \
  && npm install --save-dev typescript \
  && npx tsc --init \
  && npm install --save-dev @types/node @types/uuid \
  && npm install --save \
    @here/olp-sdk-authentication \
    @here/olp-sdk-core \
    @here/olp-sdk-dataservice-api \
    kafkajs \
    uuid

Compile and run the program:

npx tsx index.ts

Now that the program is running you need to setup a condition that triggers an event. Leave the program running and open a new terminal.

Retrieve user token

To perform next steps you need a project scoped access token. Follow instructions of how to sign in to platform project to get the token. The scoped access token is referenced as {accessToken} from now on.

Obtain a test device

You need a device that has been claimed for your project. You can use either a device that you already have or follow instructions to:

When you have the test device, take note of its tracking id, which is needed in upcoming steps as {trackingId}.

Create sensor rule

curl -X POST \
  'https://tracking.hereapi.com/sensors/v3' \
  -H 'authorization: Bearer {accessToken}' \
  -H 'content-type: application/json' \
  -d '{
  "type": "temperature",
  "range":  {
    "begin": 5,
    "end": 40
  },
  "name": "My temperature rule"
}'

This command creates a temperature rule and return unique ID for it. Take note of the ID value, it is referenced as {ruleId} in the next step. With this rule ingested temperature values between 5 and 40 celsius are within normal range and values outside of the range are abnormal. When reported temperature value crosses from normal to abnormal value range or vice versa, an event is triggered.

Associate rule with the device

Before the rule triggers for your device, the device and the rule needs to be associated together:

curl -X PUT \
  'https://tracking.hereapi.com/associations/v3/{trackingId}/sensors/{ruleId}' \
  -H 'authorization: Bearer {accessToken}' \
  -H 'content-type: application/json'

Ingest telemetry and observe events

Now ingest telemetry to send temperature values both within, and outside of the temperature range:

timestamp=$(date +%s)
curl -X POST \
  https://tracking.hereapi.com/v3/ \
  -H 'authorization: Bearer {accessToken}' \
  -H 'content-type: application/json' \
  -d '{
  "id": "{trackingId}",
  "data": [
    {
      "timestamp": '$timestamp'000,
      "system": {
        "reportedSensorData": {
          "temperatureC": 5
        }
      }
    },
    {
      "timestamp": '$timestamp'001,
      "system": {
        "reportedSensorData": {
          "temperatureC": 4
        }
      }
    }
  ]
}'

This command triggers temperature events of types IN_RANGE and BELOW_RANGE and you see them printed out in the events consuming program. Here is an example of what you might see:

Consuming events (press Ctrl+C to exit)...
{
  type: 'events',
  version: '1.0',
  events: [
    {
      trackingId: 'HERE-84fe3e53-5988-4b4a-91fd-47611bb71a7e',
      timestamp: 1726673335000,
      ruleId: '6e627562-196d-4a90-838c-fc39372fa98f',
      eventSource: 'temperature',
      eventType: 'IN_RANGE',
      initialState: true
    },
    {
      trackingId: 'HERE-84fe3e53-5988-4b4a-91fd-47611bb71a7e',
      timestamp: 1726673335001,
      ruleId: '6e627562-196d-4a90-838c-fc39372fa98f',
      eventSource: 'temperature',
      eventType: 'BELOW_RANGE',
      initialState: false
    }
  ],
  correlationId: 'feb768c9-b0c4-4bbf-bd68-63f99e238b37'
}

For more details of the message format, see Events Output Streams.