Send data using input stream
Device telemetry ingestions to Tracking service are made with the ingestion REST API or through HERE platform catalog stream layer. Ingesting telemetry through input stream layer provides more efficient way of ingesting large amounts of telemetry data.
In addition to device telemetry, external location events can also be ingested to input stream.
Prerequisites
Prerequisites for making ingestions using input streams are:
- A Tracking project in HERE platform organization.
- Input stream feature enabled for the project. To get the input stream feature enabled for your project, contact us.
- A HERE platform application and application credentials used for accessing the HERE platform APIs. To find out how to get access credentials and OAuth token for your application, see HERE platform authentication instructions.
- Project HRN and catalog HRN for input streams. For more information see HRN.
The layer ID for the input stream layer is tc-ingest.
The input stream catalog and tc-ingest layer for it are automatically created for the project when the input stream feature is enabled for the project.
Before your application can use the stream you need to grant the application access to the stream's catalog. For more information, see how to add project members.
Input stream catalog is a resource in the project and project members have access to project's resources.
Input stream data format
The message data published to tc-ingest input stream layer has to be in JSON format and contain the following properties:
requestId:<uuid>, ID used for identifying requests. Used for logging and error reporting.type:<string>, Type of the input data, identifies the used data format. Only valueseventandtelemetryare supported.version:<string>, Data format version. Only value1.0is supported.data:<object>, Input stream telemetry ingestion uses the same data objects as /v3 ingestion API requests with the difference thatidis a mandatory property here. Input stream external location event ingestion uses an array of external location events, their definition can be found by downloading the input stream ingestion schema.
There is an example of input stream data object in the following Usage with Kafka section.
Note
When developing a program which ingests data to input stream, it is a good practice to verify the validity of the request data format using the
/v3 ingestion APIwhich returns descriptive error messages for invalid requests.
Usage with Kafka
In principle the steps for using input streams with Kafka are as follows:
- Use API lookup API to find a Stream v2 API endpoint.
- Use Stream v2 API connection parameters for your input stream catalog layer
tc-ingest. - Ingest messages to input stream layer using Kafka producer.
The following code snippet uses HERE Data SDK for TypeScript to demonstrate a minimal program that implements these steps.
Start with creating a new directory and switch to it:
mkdir input-stream-app-consume && cd input-stream-app-consumeThen save this code to index.ts file in the new directory:
import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";
import { Kafka } from "kafkajs";
import { v4 as uuidv4 } from "uuid";
// Example telemetry data to be published.
const trackingId = "replace-with-your-tracking-id";
const streamIngestionData = {
requestId: uuidv4(),
type: "telemetry",
version: "1.0",
data: {
id: trackingId,
data: [
{
timestamp: Date.now(),
position: {
accuracy: 50,
lat: 60.171318,
lng: 24.941463
}
}
]
}
};
// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";
// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";
// Catalog HRN format: "hrn:here:data::<organization>:tc-input-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-ingest";
async function main(): Promise<void> {
// Setup client settings.
const userAuth = new UserAuth({
tokenRequester: requestToken,
credentials: { accessKeyId, accessKeySecret },
scope: projectHrn
});
const olpClientSettings = new OlpClientSettings({
environment: "here", getToken: () => userAuth.getToken()
});
// Use API lookup API to find a Stream v2 API endpoint.
const requestBuilder = await RequestFactory.create(
"stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
);
// Get kafka connection parameters from the Stream v2 API.
const streamLayerEndpoint = await StreamApi.endpointByConsumer(
requestBuilder, { layerId, type: "producer" }
);
// Create kafka producer.
const kafka = new Kafka({
clientId: streamLayerEndpoint.clientId,
brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
ssl: true,
sasl: {
mechanism: "oauthbearer",
oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
}
});
const producer = kafka.producer();
// Publish telemetry.
await producer.connect();
await producer.send({
topic: streamLayerEndpoint.topic,
messages: [{
key: trackingId, value: JSON.stringify(streamIngestionData)
}]
});
await producer.disconnect();
}
main();You must now replace variables accessKeyId and accessKeySecret with OAuth 2.0 credentials you created earlier. Replace also projectHrn with your project HRN, catalogHrn with catalog HRN and trackingId with ID of an existing tracker.
To build and run the example program use following shell commands.
Create a Typescript project and install its dependencies:
npm init --yes \
&& npm install --save-dev typescript \
&& npx tsc --init \
&& npm install --save-dev @types/node @types/uuid \
&& npm install --save \
@here/olp-sdk-authentication \
@here/olp-sdk-core \
@here/olp-sdk-dataservice-api \
kafkajs \
uuidCompile and run the program:
npx tsx index.tsNote
Recommendation is that the device
trackingIdis used as Kafka partitionkeyfor the ingestions made for the device.
You can verify that the ingestions have been processed by getting traces for your device and checking that the ingested telemetry is present.
Usage with REST API
HERE platform offers an Ingest v1 API for ingesting data to stream using REST API calls.
Usage with it:
- Use API lookup API to find the Ingest v1 API endpoint.
- Ingest messages to input stream layer using the Ingest v1 API.
Following curl commands demonstrate how to ingest input stream data from telemetry.json file using application access token and the Ingest v1 API.
# Use API lookup API to get base URL for Ingest v1 API.
catalogHrn="replace-with-your-catalog-hrn";
curl -X GET "https://api-lookup.data.api.platform.here.com/lookup/v1/resources/${catalogHrn}/apis/ingest/v1" \
-H 'Authorization: Bearer '"${accessToken}"
# Post input stream telemetry object from telemetry.json file.
ingestV1BaseURL="replace-with-baseURL-from-api-lookup"
curl -X POST "${ingestV1BaseURL}/layers/tc-ingest" \
-H 'Authorization: Bearer '"${accessToken}" \
-H 'Content-Type: application/json' \
-d @telemetry.jsonDead letter queue
In addition to tc-ingest layer the input stream catalog has a tc-dead-letter layer (dead letter queue). The Tracking service publishes an error message to the dead letter queue for any invalid telemetry ingestion requests sent to input stream.
The error message data published to tc-dead-letter output stream layer is in JSON format and contains the following properties:
requestId:<uuid>, ID used for identifying requests. Matches to therequestIdin the failed request.correlationId:<uuid>, ID used for correlating requests within HERE Tracking. Used for logging and error reporting.type:<string>, Type of the input data, identifies the used data format. Only valueseventandtelemetryare supported.version:<string>, Data format version. Only value1.0is supported.error:<object>, Error object.error.title:<string>, Human-readable error description.error.status:<string>, HTTP status code.error.code:<string>, Error code.error.cause:<string>, Human-readable explanation for the error.error.action:<string>, Human-readable description of the action that can be taken to correct the error.error.details:<object[]>, Optional array of error details.error.correlationId:<uuid>, ID used for correlating requests within HERE Tracking.
Below is an example of a dead letter queue message for an invalid request.
{
"error": {
"title": "Bad Request",
"status": 400,
"code": "E902400",
"cause": "Input stream ingestion validation failed",
"action": "Correct the request and try again",
"correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
"details": [
{
"name": "TrackingError",
"correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
"code": 400,
"reason": {
"errors": [
{
"instancePath": "/data/data/0/position",
"schemaPath": "#/properties/data/properties/data/items/properties/position/required",
"keyword": "required",
"params": {
"missingProperty": "lng"
},
"message": "must have required property 'lng'"
}
]
}
}
]
},
"type": "telemetry",
"version": "1.0",
"correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
"requestId": "6f23e24a-aca2-4eee-861a-adf95e5902f2"
}Note
The input stream content type is
application/jsonand only JSON format input is processed. If other than JSON format data is published, the request is ignored and no error message is sent to the dead letter queue.
The following example is made with HERE Data SDK for TypeScript and it shows how to consume messages from a dead letter queue.
import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
PollRequest,
StreamLayerClient,
SubscribeRequest,
UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";
// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";
// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";
// Catalog HRN format: "hrn:here:data::<organization>:tc-input-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-dead-letter";
async function main(): Promise<void> {
// Setup client settings.
const userAuth = new UserAuth({
tokenRequester: requestToken,
credentials: { accessKeyId, accessKeySecret },
scope: projectHrn
});
const olpClientSettings = new OlpClientSettings({
environment: "here", getToken: () => userAuth.getToken()
});
// Create StreamLayerClient.
const streamLayerClient = new StreamLayerClient({
catalogHrn: HRN.fromString(catalogHrn),
layerId,
settings: olpClientSettings,
});
// Create a subscription for the stream topic.
const subscribeRequest = new SubscribeRequest().withMode("serial");
const subscribtionId = await streamLayerClient.subscribe(subscribeRequest);
// Consume data using poll method.
const pollRequest = new PollRequest()
.withMode("serial")
.withSubscriptionId(subscribtionId);
const messages = await streamLayerClient.poll(pollRequest);
for (const message of messages) {
// Base64 decode the message data.
const data = JSON.parse(
Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
);
console.log('Received message:', JSON.stringify(data, null, 2));
}
// Delete subscription from the stream topic.
await streamLayerClient.unsubscribe(
new UnsubscribeRequest()
.withMode("serial")
.withSubscriptionId(subscribtionId)
);
}
main();Updated 21 days ago