Receive traces from output stream
In this tutorial you learn how to use stream layers from HERE platform catalog to receive Tracking traces using either Data API or Kafka. For more information see HERE platform catalog and Stream layers.
What will you learn?
The tutorial walks you through creation of a client program that listens for traces and prints them out. Then you create a test device and use it to ingest telemetry which then appears as a trace in output stream.
What do you need?
- A Tracking project in HERE platform organization.
- Output stream feature enabled for your project. To have the feature enabled for your project, contact us.
- A HERE platform application and application credentials used for accessing the HERE platform APIs. To find out how to get access credentials and OAuth token for your application, see HERE platform authentication instructions.
- Project HRN and catalog HRN for output streams. For more information see HRN.
Grant access to your stream
Before your application can use the stream you need to grant the application access to the stream's catalog. For more information, see how to add project members.
Consume traces from stream
To consume traces from the stream you can use HERE Data SDK for TypeScript. There are two ways to consume the traces, either with:
- Option 1: Polling from Data API (via REST interface) or
- Option 2: Consume with Kafka.
This document shows both options next. Choose the one that suits you best. Using Kafka directly is more efficient, while usage of REST API adds extra overhead.
Option 1: Polling traces with Data API
Start with creating a new directory and switch to it:
mkdir traces-stream-app-poll && cd traces-stream-app-pollThen save this code to index.ts file in the new directory:
import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
PollRequest,
StreamLayerClient,
SubscribeRequest,
UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";
import util from 'util';
// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";
// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";
// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-traces";
async function main(): Promise<void> {
// Setup client settings.
const userAuth = new UserAuth({
tokenRequester: requestToken,
credentials: { accessKeyId, accessKeySecret },
scope: projectHrn
});
const olpClientSettings = new OlpClientSettings({
environment: "here", getToken: () => userAuth.getToken()
});
// Create StreamLayerClient.
const streamLayerClient = new StreamLayerClient({
catalogHrn: HRN.fromString(catalogHrn),
layerId,
settings: olpClientSettings,
});
// Create a subscription for the stream topic.
const subscribeRequest = new SubscribeRequest().withMode("serial");
const subscriptionId = await streamLayerClient.subscribe(subscribeRequest);
// Consume data using poll method.
const pollRequest = new PollRequest()
.withMode("serial")
.withSubscriptionId(subscriptionId);
let interrupted = false;
process.on('SIGINT', () => { interrupted = true; });
console.log('Consuming traces (press Ctrl+C to exit)...');
try {
while (!interrupted) {
const messages = await streamLayerClient.poll(pollRequest);
for (const message of messages) {
// Base64 decode the message data.
const data = JSON.parse(
Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
);
console.log(util.inspect(data, { depth: null, colors: true }));
}
}
} finally {
// Delete subscription from the stream topic.
await streamLayerClient.unsubscribe(
new UnsubscribeRequest()
.withMode("serial")
.withSubscriptionId(subscriptionId)
);
}
}
main();You must now replace variables accessKeyId and accessKeySecret with OAuth 2.0 credentials you created earlier. Then replace projectHrn with your project HRN and catalogHrn with catalog HRN.
To build and run the example program use following shell commands.
Create a Typescript project and install its dependencies:
npm init --yes \
&& npm install --save-dev typescript \
&& npx tsc --init \
&& npm install --save-dev @types/node \
&& npm install --save \
@here/olp-sdk-authentication \
@here/olp-sdk-core \
@here/olp-sdk-dataservice-readCompile and run the program:
npx tsc && node .Now that the program is running you need to ingest telemetry to trigger creation of a trace. Leave the program running and open a new terminal. Below is the second option of using Kafka which you can skip for now. Next step is to retrieve user token.
Option 2: Consume with Kafka
Start with creating a new directory and switch to it:
mkdir traces-stream-app-consume && cd traces-stream-app-consumeThen save this code to index.ts file in the new directory:
import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";
import { Kafka, logLevel } from "kafkajs";
import util from 'util';
// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";
// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";
// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-traces";
async function main(): Promise<void> {
// Setup client settings.
const userAuth = new UserAuth({
tokenRequester: requestToken,
credentials: { accessKeyId, accessKeySecret },
scope: projectHrn
});
const olpClientSettings = new OlpClientSettings({
environment: "here", getToken: () => userAuth.getToken()
});
// Use API lookup API to find a Stream v2 API endpoint.
const requestBuilder = await RequestFactory.create(
"stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
);
// Get kafka connection parameters from the Stream v2 API.
const streamLayerEndpoint = await StreamApi.endpointByConsumer(
requestBuilder, { layerId, type: "consumer" }
);
// Create kafka consumer.
const kafka = new Kafka({
brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
clientId: streamLayerEndpoint.clientId,
logLevel: logLevel.WARN,
sasl: {
mechanism: "oauthbearer",
oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
},
ssl: true,
});
const consumer = kafka.consumer({
groupId: `${streamLayerEndpoint.consumerGroupPrefix}test-group`
});
// Subscribe and listen for traces.
await consumer.connect();
await consumer.subscribe({ topic: streamLayerEndpoint.topic });
await consumer.run({
eachMessage: async ({ message }) => {
const data = JSON.parse((message.value || "{}").toString());
console.log(util.inspect(data, { depth: null, colors: true }));
},
});
// Disconnect and exit on Ctrl+C.
console.log('Consuming traces (press Ctrl+C to exit)...');
process.on('SIGINT', async () => {
console.log('Disconnecting...');
await consumer.disconnect();
});
}
main();You must now replace variables accessKeyId and accessKeySecret with OAuth 2.0 credentials you created earlier. Then replace projectHrn with your project HRN and catalogHrn with catalog HRN.
To build and run the example program use following shell commands.
Create a Typescript project and install its dependencies:
npm init --yes \
&& npm install --save-dev typescript \
&& npx tsc --init \
&& npm install --save-dev @types/node \
&& npm install --save \
@here/olp-sdk-authentication \
@here/olp-sdk-core \
@here/olp-sdk-dataservice-api \
kafkajsCompile and run the program:
npx tsc && node .Now that the program is running you need to ingest telemetry to trigger creation of a trace. Leave the program running and open a new terminal.
Retrieve user token
To perform next steps you need a project scoped access token. Follow instructions of how to sign in to platform project to get the token. The scoped access token is referenced as {accessToken} from now on.
Obtain a test device
You need a device that has been claimed for your project. You can use either a device that you already have or follow instructions to:
- Create a device, and then
- Claim the new device to your project.
When you have the test device, take note of its tracking id, which is needed in upcoming steps as {trackingId}.
Ingest telemetry and observe traces
Now ingest telemetry to send temperature values both within, and outside of the temperature range:
timestamp=$(date +%s)
curl https://tracking.hereapi.com/v3/ \
--request POST \
--header 'authorization: Bearer {accessToken}' \
--header 'content-type: application/json' \
--data '{
"id": "{trackingId}",
"data": [
{
"timestamp": '$timestamp'000,
"position": {
"lat": "52.4988217",
"lng": "13.35170005",
"alt": "86",
"accuracy": "25"
}
}
]
}'This command triggers a creation of a trace object and you see it printed out in the traces consuming program. Here is an example of what you might see:
Consuming traces (press Ctrl+C to exit)...
{
type: 'traces',
version: '1.0',
traces: [
{
position: { alt: 86, lat: 52.4988217, lng: 13.35170005, accuracy: 25 },
serverTimestamp: 1736956128828,
system: { computed: { moving: false } },
timestamp: 1736956128000
}
],
correlationId: '8cf668d2-ba58-435a-9caa-603c748cbbfa',
trackingId: 'HERE-626daaf1-cd80-4165-8785-1718cf76bd07'
}
For more details of the message format, see Traces output streams.
Updated last month