出力ストリームからトレースを受け取る
このチュートリアルでは、HEREプラットフォームカタログのストリームレイヤーを使用して、Data APIかKafkaを用いてトラッキングトレースを受信する方法を学びます。詳細については、HEREプラットフォームカタログと「Stream layers」(ストリームレイヤー)を参照してください。
学習内容
このチュートリアルでは、トレースをリッスンして表示するクライアントプログラムを作成する手順を説明します。次に、テストデバイスを作成し、それを使用してテレメトリーを取り込み、出力ストリームにトレースとして表示します。
必要なもの
- HEREプラットフォーム組織のトラッキングプロジェクト。
- プロジェクトの出力ストリーム機能が有効になっていること。プロジェクトでこの機能を有効にするには、お問い合わせください。
- HEREプラットフォームアプリケーションと、HEREプラットフォームAPIへのアクセスに使用するアプリケーション資格情報。アプリケーションのアクセス資格情報とOAuthトークンを取得する方法については、HEREプラットフォームの認証手順を参照してください。
- 出力ストリームのプロジェクトHRNとカタログHRN。詳細については、「HRN」を参照してください。
ストリームへのアクセスを許可する
アプリケーションでストリームを使用できるようにするには、ストリームのカタログへのアクセス権をアプリケーションに付与する必要があります。詳細については、プロジェクトメンバーを追加する方法を参照してください。
ストリームからのトレースを消費する
ストリームからのトレースを消費するには、HERE Data SDK for TypeScriptを使用できます。次の2つの方法でトレースを消費できます。
- オプション1:Data APIからポーリングする (RESTインターフェイス経由)
- オプション2:Kafkaで処理する
このドキュメントでは、次に両方のオプションを紹介します。ご自身に合う方法を選んでください。Kafkaを直接使用する方が効率的ですが、REST APIを使用するとオーバーヘッドが増加します。
オプション1:Data APIを使用してトレースをポーリングする
まず、新規ディレクトリを作成してそれに切り替えます。
mkdir traces-stream-app-poll && cd traces-stream-app-poll以下のコードを新規ディレクトリのindex.tsファイルに保存します。
import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
PollRequest,
StreamLayerClient,
SubscribeRequest,
UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";
import util from 'util';
// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";
// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";
// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-traces";
async function main(): Promise<void> {
// Setup client settings.
const userAuth = new UserAuth({
tokenRequester: requestToken,
credentials: { accessKeyId, accessKeySecret },
scope: projectHrn
});
const olpClientSettings = new OlpClientSettings({
environment: "here", getToken: () => userAuth.getToken()
});
// Create StreamLayerClient.
const streamLayerClient = new StreamLayerClient({
catalogHrn: HRN.fromString(catalogHrn),
layerId,
settings: olpClientSettings,
});
// Create a subscription for the stream topic.
const subscribeRequest = new SubscribeRequest().withMode("serial");
const subscriptionId = await streamLayerClient.subscribe(subscribeRequest);
// Consume data using poll method.
const pollRequest = new PollRequest()
.withMode("serial")
.withSubscriptionId(subscriptionId);
let interrupted = false;
process.on('SIGINT', () => { interrupted = true; });
console.log('Consuming traces (press Ctrl+C to exit)...');
try {
while (!interrupted) {
const messages = await streamLayerClient.poll(pollRequest);
for (const message of messages) {
// Base64 decode the message data.
const data = JSON.parse(
Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
);
console.log(util.inspect(data, { depth: null, colors: true }));
}
}
} finally {
// Delete subscription from the stream topic.
await streamLayerClient.unsubscribe(
new UnsubscribeRequest()
.withMode("serial")
.withSubscriptionId(subscriptionId)
);
}
}
main();次に、変数accessKeyIdとaccessKeySecretを先ほど作成したOAuth 2.0認証情報に置き換える必要があります。その後、projectHrnをプロジェクトHRNに、catalogHrnをカタログHRNに置き換えます。
サンプルプログラムをビルドして実行するには、次のシェルコマンドを使用します。
TypeScriptプロジェクトを作成し、その依存関係をインストールします。
npm init --yes \
&& npm install --save-dev typescript \
&& npx tsc --init \
&& npm install --save-dev @types/node \
&& npm install --save \
@here/olp-sdk-authentication \
@here/olp-sdk-core \
@here/olp-sdk-dataservice-readプログラムをコンパイルして実行します。
npx tsc && node .プログラムが実行されたら、テレメトリーを取り込んでトレースの作成をトリガーする必要があります。プログラムを実行したままにして、新しいターミナルを開きます。 以下はKafkaを使用する2番目の方法です。スキップしてもかまいません。 次の手順はユーザートークンの取得です。
オプション2:Kafkaで処理する
まず、新規ディレクトリを作成してそれに切り替えます。
mkdir traces-stream-app-consume && cd traces-stream-app-consume以下のコードを新規ディレクトリのindex.tsファイルに保存します。
import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";
import { Kafka, logLevel } from "kafkajs";
import util from 'util';
// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";
// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";
// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-traces";
async function main(): Promise<void> {
// Setup client settings.
const userAuth = new UserAuth({
tokenRequester: requestToken,
credentials: { accessKeyId, accessKeySecret },
scope: projectHrn
});
const olpClientSettings = new OlpClientSettings({
environment: "here", getToken: () => userAuth.getToken()
});
// Use API lookup API to find a Stream v2 API endpoint.
const requestBuilder = await RequestFactory.create(
"stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
);
// Get kafka connection parameters from the Stream v2 API.
const streamLayerEndpoint = await StreamApi.endpointByConsumer(
requestBuilder, { layerId, type: "consumer" }
);
// Create kafka consumer.
const kafka = new Kafka({
brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
clientId: streamLayerEndpoint.clientId,
logLevel: logLevel.WARN,
sasl: {
mechanism: "oauthbearer",
oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
},
ssl: true,
});
const consumer = kafka.consumer({
groupId: `${streamLayerEndpoint.consumerGroupPrefix}test-group`
});
// Subscribe and listen for traces.
await consumer.connect();
await consumer.subscribe({ topic: streamLayerEndpoint.topic });
await consumer.run({
eachMessage: async ({ message }) => {
const data = JSON.parse((message.value || "{}").toString());
console.log(util.inspect(data, { depth: null, colors: true }));
},
});
// Disconnect and exit on Ctrl+C.
console.log('Consuming traces (press Ctrl+C to exit)...');
process.on('SIGINT', async () => {
console.log('Disconnecting...');
await consumer.disconnect();
});
}
main();次に、変数accessKeyIdとaccessKeySecretを先ほど作成したOAuth 2.0認証情報に置き換える必要があります。その後、projectHrnをプロジェクトHRNに、catalogHrnをカタログHRNに置き換えます。
サンプルプログラムをビルドして実行するには、次のシェルコマンドを使用します。
TypeScriptプロジェクトを作成し、その依存関係をインストールします。
npm init --yes \
&& npm install --save-dev typescript \
&& npx tsc --init \
&& npm install --save-dev @types/node \
&& npm install --save \
@here/olp-sdk-authentication \
@here/olp-sdk-core \
@here/olp-sdk-dataservice-api \
kafkajsプログラムをコンパイルして実行します。
npx tsc && node .プログラムが実行されたら、テレメトリーを取り込んでトレースの作成をトリガーする必要があります。プログラムを実行したままにして、新しいターミナルを開きます。
ユーザートークンを取得する
次の手順を実行するには、プロジェクトスコープユーザーアクセストークンが必要になります。トークンを取得するには、プラットフォームプロジェクトへのサインイン方法の手順に従います。スコープアクセストークンは以降、{accessToken}として表記します。
テストデバイスを入手する
プロジェクトに登録済みのデバイスが必要です。すでに持っているデバイスを使用するか、次の手順に従います。
- デバイスを作成する
- プロジェクトに新しいデバイスを登録します。
テストデバイスが用意できたら、トラッキングIDを控えておきます。この後のステップで{trackingId}として必要になります。
テレメトリーを取り込み、トレースを観察する
テレメトリーを取り込み、温度範囲内および温度範囲外の両方の温度値を送信します。
timestamp=$(date +%s)
curl https://tracking.hereapi.com/v3/ \
--request POST \
--header 'authorization: Bearer {accessToken}' \
--header 'content-type: application/json' \
--data '{
"id": "{trackingId}",
"data": [
{
"timestamp": '$timestamp'000,
"position": {
"lat": "52.4988217",
"lng": "13.35170005",
"alt": "86",
"accuracy": "25"
}
}
]
}'このコマンドを実行すると、トレースオブジェクトが作成され、トレースを消費するプログラムに表示されます。以下がその表示例です。
Consuming traces (press Ctrl+C to exit)...
{
type: 'traces',
version: '1.0',
traces: [
{
position: { alt: 86, lat: 52.4988217, lng: 13.35170005, accuracy: 25 },
serverTimestamp: 1736956128828,
system: { computed: { moving: false } },
timestamp: 1736956128000
}
],
correlationId: '8cf668d2-ba58-435a-9caa-603c748cbbfa',
trackingId: 'HERE-626daaf1-cd80-4165-8785-1718cf76bd07'
}
メッセージ形式の詳細については、「トレース出力ストリーム」を参照してください。
先月の更新