入力ストリームを使用してデータを送信する
トラッキングサービスへのデバイステレメトリーの取り込みは、取り込みREST APIまたはHEREプラットフォームカタログのストリームレイヤーを使用して行うことができます。入力ストリームレイヤーを介してテレメトリーを取り込むことで、大量のテレメトリーデータをより効率的に取り込むことができます。 デバイスのテレメトリーに加えて、外部位置情報イベントを入力ストリームに取り込むことができます。
前提条件
入力ストリームを使用して取り込みを行うための前提条件は次のとおりです。
- HEREプラットフォーム組織のトラッキングプロジェクト。
- プロジェクトの入力ストリーム機能が有効になっていること。プロジェクトの入力ストリーム機能を有効にするには、お問い合わせください。
- HEREプラットフォームアプリケーションと、HEREプラットフォームAPIへのアクセスに使用するアプリケーション資格情報。アプリケーションのアクセス資格情報とOAuthトークンを取得する方法については、HEREプラットフォームの認証手順を参照してください。
- 入力ストリームのプロジェクトHRNとカタログHRN。詳細については、「HRN」を参照してください。
入力ストリームレイヤーのレイヤーIDはtc-ingestです。
プロジェクトで入力ストリーム機能が有効になっている場合、プロジェクトの入力ストリームカタログとそのtc-ingestレイヤーが自動的に作成されます。
アプリケーションでストリームを使用できるようにするには、ストリームのカタログへのアクセス権をアプリケーションに付与する必要があります。詳細については、プロジェクトメンバーを追加する方法を参照してください。
入力ストリームカタログはプロジェクトのリソースであり、プロジェクトメンバーはプロジェクトのリソースにアクセスできます。
入力ストリームのデータ形式
tc-ingest入力ストリームレイヤーに公開されるメッセージデータはJSON形式で、次のプロパティが含まれている必要があります。
requestId:<uuid>、リクエストの識別に使用されるID。ログ記録およびエラーレポートで使用されます。type:<string>、入力データの型で、使用されるデータ形式を識別します。event値とtelemetry値のみがサポートされています。version:<string>、データ形式のバージョン。値1.0のみがサポートされています。data:<object>、入力ストリームのテレメトリー取り込みでは、/v3 Ingestion APIのリクエストと同じデータオブジェクトを使用しますが、idが必須のプロパティである点が異なります。入力ストリームの外部位置情報イベント取り込みでは、外部位置情報イベントの配列が使用されます。これらの定義は、入力ストリーム取り込みスキーマをダウンロードして確認できます。
入力ストリームデータオブジェクトの例は、以下の「Kafkaでの使用」セクションにあります。
注
入力ストリームにデータを取り込むプログラムを開発する場合は、無効なリクエストに対して説明的なエラーメッセージを返す、
/v3 ingestion APIを使用してリクエストデータ形式の有効性を確認することをお勧めします。
Kafkaでの使用
Kafkaで入力ストリームを使用する手順は、原則として次のとおりです。
- API lookup APIを使用して、Stream v2 APIエンドポイントを見つけます。
- 入力ストリームカタログレイヤー
tc-ingestには、Stream v2 APIの接続パラメーターを使用します。 - Kafka producerを使用して、メッセージを入力ストリームレイヤーに取り込みます。
次のコードスニペットでは、HERE Data SDK for TypeScriptを使用して、これらの手順を実装する最小限のプログラムを示しています。 まず、新規ディレクトリを作成してそれに切り替えます。
mkdir input-stream-app-consume && cd input-stream-app-consume以下のコードを新規ディレクトリのindex.tsファイルに保存します。
import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";
import { Kafka } from "kafkajs";
import { v4 as uuidv4 } from "uuid";
// Example telemetry data to be published.
const trackingId = "replace-with-your-tracking-id";
const streamIngestionData = {
requestId: uuidv4(),
type: "telemetry",
version: "1.0",
data: {
id: trackingId,
data: [
{
timestamp: Date.now(),
position: {
accuracy: 50,
lat: 60.171318,
lng: 24.941463
}
}
]
}
};
// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";
// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";
// Catalog HRN format: "hrn:here:data::<organization>:tc-input-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-ingest";
async function main(): Promise<void> {
// Setup client settings.
const userAuth = new UserAuth({
tokenRequester: requestToken,
credentials: { accessKeyId, accessKeySecret },
scope: projectHrn
});
const olpClientSettings = new OlpClientSettings({
environment: "here", getToken: () => userAuth.getToken()
});
// Use API lookup API to find a Stream v2 API endpoint.
const requestBuilder = await RequestFactory.create(
"stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
);
// Get kafka connection parameters from the Stream v2 API.
const streamLayerEndpoint = await StreamApi.endpointByConsumer(
requestBuilder, { layerId, type: "producer" }
);
// Create kafka producer.
const kafka = new Kafka({
clientId: streamLayerEndpoint.clientId,
brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
ssl: true,
sasl: {
mechanism: "oauthbearer",
oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
}
});
const producer = kafka.producer();
// Publish telemetry.
await producer.connect();
await producer.send({
topic: streamLayerEndpoint.topic,
messages: [{
key: trackingId, value: JSON.stringify(streamIngestionData)
}]
});
await producer.disconnect();
}
main();次に、変数accessKeyIdとaccessKeySecretを先ほど作成したOAuth 2.0認証情報に置き換える必要があります。また、projectHrnをプロジェクトHRNに、catalogHrnをカタログHRNに、trackingIdを既存のトラッカーのIDに置き換えます。
サンプルプログラムをビルドして実行するには、次のシェルコマンドを使用します。
TypeScriptプロジェクトを作成し、その依存関係をインストールします。
npm init --yes \
&& npm install --save-dev typescript \
&& npx tsc --init \
&& npm install --save-dev @types/node @types/uuid \
&& npm install --save \
@here/olp-sdk-authentication \
@here/olp-sdk-core \
@here/olp-sdk-dataservice-api \
kafkajs \
uuidプログラムをコンパイルして実行します。
npx tsx index.ts注
デバイスの
trackingIdを、デバイスへの取り込みのKafkaパーティションkeyとして使用することをお勧めします。
デバイスのトレースを取得し、取り込んだテレメトリーが存在することを確認することで、取り込みが処理されたことを確認できます。
REST APIでの使用
HEREプラットフォームはIngest v1 APIを提供しており、REST API呼び出しを使用してストリームにデータを取り込むことができます。次の手順でこれを使用します。
- API lookup APIを使用して、Ingest v1 APIエンドポイントを見つけます。
- Ingest v1 APIを使用して、メッセージを入力ストリームレイヤーに取り込みます。
以下のcurlコマンドはアプリケーションのアクセストークンとIngest v1 APIを使用して、telemetry.jsonファイルから入力ストリームデータを取り込む方法を示しています。
# Use API lookup API to get base URL for Ingest v1 API.
catalogHrn="replace-with-your-catalog-hrn";
curl -X GET "https://api-lookup.data.api.platform.here.com/lookup/v1/resources/${catalogHrn}/apis/ingest/v1" \
-H 'Authorization: Bearer '"${accessToken}"
# Post input stream telemetry object from telemetry.json file.
ingestV1BaseURL="replace-with-baseURL-from-api-lookup"
curl -X POST "${ingestV1BaseURL}/layers/tc-ingest" \
-H 'Authorization: Bearer '"${accessToken}" \
-H 'Content-Type: application/json' \
-d @telemetry.jsonデッドレターキュー
入力ストリームカタログにはtc-ingestレイヤーの他に、tc-dead-letterレイヤー (デッドレターキュー) があります。トラッキングサービスは、入力ストリームに送信された無効なテレメトリー取り込みのリクエストに対して、エラーメッセージをデッドレターキューに公開します。
tc-dead-letter出力ストリームレイヤーに公開されるエラーメッセージデータはJSON形式で、次のプロパティが含まれます。
requestId:<uuid>、リクエストの識別に使用されるID。失敗したリクエストのrequestIdと一致します。correlationId:<uuid>、HERE Tracking内でリクエストの関連付けに使用されるID。ログ記録およびエラーレポートで使用されます。type:<string>、入力データの型で、使用されるデータ形式を識別します。event値とtelemetry値のみがサポートされています。version:<string>、データ形式のバージョン。値1.0のみがサポートされています。error:<object>、エラーオブジェクト。error.title:<string>、人間が読めるエラーの説明。error.status:<string>、HTTPステータスコード。error.code:<string>、エラーコード。error.cause:<string>、人間が読めるエラーの説明。error.action:<string>、人間が読める、エラーを修正するためのアクションの説明。error.details:<object[]>、エラーの詳細を示すオプションの配列。error.correlationId:<uuid>、HERE Tracking内でリクエストの関連付けに使用されるID。
以下は無効なリクエストに対するデッドレターキューメッセージの例です。
{
"error": {
"title": "Bad Request",
"status": 400,
"code": "E902400",
"cause": "Input stream ingestion validation failed",
"action": "Correct the request and try again",
"correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
"details": [
{
"name": "TrackingError",
"correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
"code": 400,
"reason": {
"errors": [
{
"instancePath": "/data/data/0/position",
"schemaPath": "#/properties/data/properties/data/items/properties/position/required",
"keyword": "required",
"params": {
"missingProperty": "lng"
},
"message": "must have required property 'lng'"
}
]
}
}
]
},
"type": "telemetry",
"version": "1.0",
"correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
"requestId": "6f23e24a-aca2-4eee-861a-adf95e5902f2"
}注
入力ストリームのコンテンツタイプは
application/jsonで、JSON形式の入力のみが処理されます。JSON形式以外のデータが公開された場合、リクエストは無視され、デッドレターキューにはエラーメッセージが送信されません。
以下の例はHERE Data SDK for TypeScriptを用いて作成したもので、デッドレターキューからのメッセージを使用する方法を示しています。
import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
PollRequest,
StreamLayerClient,
SubscribeRequest,
UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";
// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";
// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";
// Catalog HRN format: "hrn:here:data::<organization>:tc-input-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-dead-letter";
async function main(): Promise<void> {
// Setup client settings.
const userAuth = new UserAuth({
tokenRequester: requestToken,
credentials: { accessKeyId, accessKeySecret },
scope: projectHrn
});
const olpClientSettings = new OlpClientSettings({
environment: "here", getToken: () => userAuth.getToken()
});
// Create StreamLayerClient.
const streamLayerClient = new StreamLayerClient({
catalogHrn: HRN.fromString(catalogHrn),
layerId,
settings: olpClientSettings,
});
// Create a subscription for the stream topic.
const subscribeRequest = new SubscribeRequest().withMode("serial");
const subscribtionId = await streamLayerClient.subscribe(subscribeRequest);
// Consume data using poll method.
const pollRequest = new PollRequest()
.withMode("serial")
.withSubscriptionId(subscribtionId);
const messages = await streamLayerClient.poll(pollRequest);
for (const message of messages) {
// Base64 decode the message data.
const data = JSON.parse(
Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
);
console.log('Received message:', JSON.stringify(data, null, 2));
}
// Delete subscription from the stream topic.
await streamLayerClient.unsubscribe(
new UnsubscribeRequest()
.withMode("serial")
.withSubscriptionId(subscribtionId)
);
}
main();先月の更新