ガイドAPIリファレンス
ガイド

入力ストリームを使用してデータを送信する

トラッキングサービスへのデバイステレメトリーの取り込みは、取り込みREST APIまたはHEREプラットフォームカタログストリームレイヤーを使用して行うことができます。入力ストリームレイヤーを介してテレメトリーを取り込むことで、大量のテレメトリーデータをより効率的に取り込むことができます。 デバイスのテレメトリーに加えて、外部位置情報イベントを入力ストリームに取り込むことができます。

前提条件

入力ストリームを使用して取り込みを行うための前提条件は次のとおりです。

  • HEREプラットフォーム組織のトラッキングプロジェクト
  • プロジェクトの入力ストリーム機能が有効になっていること。プロジェクトの入力ストリーム機能を有効にするには、お問い合わせください
  • HEREプラットフォームアプリケーションと、HEREプラットフォームAPIへのアクセスに使用するアプリケーション資格情報。アプリケーションのアクセス資格情報とOAuthトークンを取得する方法については、HEREプラットフォームの認証手順を参照してください。
  • 入力ストリームのプロジェクトHRNとカタログHRN。詳細については、「HRN」を参照してください。

入力ストリームレイヤーのレイヤーIDはtc-ingestです。 プロジェクトで入力ストリーム機能が有効になっている場合、プロジェクトの入力ストリームカタログとそのtc-ingestレイヤーが自動的に作成されます。 アプリケーションでストリームを使用できるようにするには、ストリームのカタログへのアクセス権をアプリケーションに付与する必要があります。詳細については、プロジェクトメンバーを追加する方法を参照してください。 入力ストリームカタログはプロジェクトのリソースであり、プロジェクトメンバーはプロジェクトのリソースにアクセスできます。

入力ストリームのデータ形式

tc-ingest入力ストリームレイヤーに公開されるメッセージデータはJSON形式で、次のプロパティが含まれている必要があります。

  • requestId: <uuid>、リクエストの識別に使用されるID。ログ記録およびエラーレポートで使用されます。
  • type: <string>、入力データの型で、使用されるデータ形式を識別します。event値とtelemetry値のみがサポートされています。
  • version: <string>、データ形式のバージョン。値1.0のみがサポートされています。
  • data: <object>、入力ストリームのテレメトリー取り込みでは、/v3 Ingestion APIのリクエストと同じデータオブジェクトを使用しますが、idが必須のプロパティである点が異なります。入力ストリームの外部位置情報イベント取り込みでは、外部位置情報イベントの配列が使用されます。これらの定義は、入力ストリーム取り込みスキーマをダウンロードして確認できます。

入力ストリームデータオブジェクトの例は、以下の「Kafkaでの使用」セクションにあります。

📘

入力ストリームにデータを取り込むプログラムを開発する場合は、無効なリクエストに対して説明的なエラーメッセージを返す、/v3 ingestion APIを使用してリクエストデータ形式の有効性を確認することをお勧めします。

Kafkaでの使用

Kafkaで入力ストリームを使用する手順は、原則として次のとおりです。

  • API lookup APIを使用して、Stream v2 APIエンドポイントを見つけます。
  • 入力ストリームカタログレイヤーtc-ingestには、Stream v2 API接続パラメーターを使用します。
  • Kafka producerを使用して、メッセージを入力ストリームレイヤーに取り込みます。

次のコードスニペットでは、HERE Data SDK for TypeScriptを使用して、これらの手順を実装する最小限のプログラムを示しています。 まず、新規ディレクトリを作成してそれに切り替えます。

mkdir input-stream-app-consume && cd input-stream-app-consume

以下のコードを新規ディレクトリのindex.tsファイルに保存します。

import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";

import { Kafka } from "kafkajs";
import { v4 as uuidv4 } from "uuid";

// Example telemetry data to be published.
const trackingId = "replace-with-your-tracking-id";
const streamIngestionData = {
  requestId: uuidv4(),
  type: "telemetry",
  version: "1.0",
  data: {
    id: trackingId,
    data: [
      {
        timestamp: Date.now(),
        position: {
          accuracy: 50,
          lat: 60.171318,
          lng: 24.941463
        }
      }
    ]
  }
};

// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-input-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-ingest";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });
  // Use API lookup API to find a Stream v2 API endpoint.
  const requestBuilder = await RequestFactory.create(
    "stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
  );
  // Get kafka connection parameters from the Stream v2 API.
  const streamLayerEndpoint = await StreamApi.endpointByConsumer(
    requestBuilder, { layerId, type: "producer" }
  );

  // Create kafka producer.
  const kafka = new Kafka({
    clientId: streamLayerEndpoint.clientId,
    brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
    ssl: true,
    sasl: {
      mechanism: "oauthbearer",
      oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
    }
  });
  const producer = kafka.producer();

  // Publish telemetry.
  await producer.connect();
  await producer.send({
    topic: streamLayerEndpoint.topic,
    messages: [{
      key: trackingId, value: JSON.stringify(streamIngestionData)
    }]
  });
  await producer.disconnect();
}

main();

次に、変数accessKeyIdaccessKeySecretを先ほど作成したOAuth 2.0認証情報に置き換える必要があります。また、projectHrnをプロジェクトHRNに、catalogHrnをカタログHRNに、trackingIdを既存のトラッカーのIDに置き換えます。 サンプルプログラムをビルドして実行するには、次のシェルコマンドを使用します。 TypeScriptプロジェクトを作成し、その依存関係をインストールします。

npm init --yes \
  && npm install --save-dev typescript \
  && npx tsc --init \
  && npm install --save-dev @types/node @types/uuid \
  && npm install --save \
    @here/olp-sdk-authentication \
    @here/olp-sdk-core \
    @here/olp-sdk-dataservice-api \
    kafkajs \
    uuid

プログラムをコンパイルして実行します。

npx tsx index.ts
📘

デバイスのtrackingIdを、デバイスへの取り込みのKafkaパーティションkeyとして使用することをお勧めします。

デバイスのトレースを取得し、取り込んだテレメトリーが存在することを確認することで、取り込みが処理されたことを確認できます。

REST APIでの使用

HEREプラットフォームはIngest v1 APIを提供しており、REST API呼び出しを使用してストリームにデータを取り込むことができます。次の手順でこれを使用します。

  • API lookup APIを使用して、Ingest v1 APIエンドポイントを見つけます。
  • Ingest v1 APIを使用して、メッセージを入力ストリームレイヤーに取り込みます。

以下のcurlコマンドはアプリケーションのアクセストークンとIngest v1 APIを使用して、telemetry.jsonファイルから入力ストリームデータを取り込む方法を示しています。

# Use API lookup API to get base URL for Ingest v1 API.
catalogHrn="replace-with-your-catalog-hrn";
curl -X GET "https://api-lookup.data.api.platform.here.com/lookup/v1/resources/${catalogHrn}/apis/ingest/v1" \
  -H 'Authorization: Bearer '"${accessToken}"

# Post input stream telemetry object from telemetry.json file.
ingestV1BaseURL="replace-with-baseURL-from-api-lookup"
curl -X POST "${ingestV1BaseURL}/layers/tc-ingest" \
  -H 'Authorization: Bearer '"${accessToken}" \
  -H 'Content-Type: application/json' \
  -d @telemetry.json

デッドレターキュー

入力ストリームカタログにはtc-ingestレイヤーの他に、tc-dead-letterレイヤー (デッドレターキュー) があります。トラッキングサービスは、入力ストリームに送信された無効なテレメトリー取り込みのリクエストに対して、エラーメッセージをデッドレターキューに公開します。 tc-dead-letter出力ストリームレイヤーに公開されるエラーメッセージデータはJSON形式で、次のプロパティが含まれます。

  • requestId: <uuid>、リクエストの識別に使用されるID。失敗したリクエストのrequestIdと一致します。
  • correlationId: <uuid>、HERE Tracking内でリクエストの関連付けに使用されるID。ログ記録およびエラーレポートで使用されます。
  • type: <string>、入力データの型で、使用されるデータ形式を識別します。event値とtelemetry値のみがサポートされています。
  • version: <string>、データ形式のバージョン。値1.0のみがサポートされています。
  • error: <object>、エラーオブジェクト。
  • error.title: <string>、人間が読めるエラーの説明。
  • error.status: <string>、HTTPステータスコード。
  • error.code: <string>、エラーコード。
  • error.cause: <string>、人間が読めるエラーの説明。
  • error.action: <string>、人間が読める、エラーを修正するためのアクションの説明。
  • error.details: <object[]>、エラーの詳細を示すオプションの配列。
  • error.correlationId: <uuid>、HERE Tracking内でリクエストの関連付けに使用されるID。

以下は無効なリクエストに対するデッドレターキューメッセージの例です。

{
  "error": {
    "title": "Bad Request",
    "status": 400,
    "code": "E902400",
    "cause": "Input stream ingestion validation failed",
    "action": "Correct the request and try again",
    "correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
    "details": [
      {
        "name": "TrackingError",
        "correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
        "code": 400,
        "reason": {
          "errors": [
            {
              "instancePath": "/data/data/0/position",
              "schemaPath": "#/properties/data/properties/data/items/properties/position/required",
              "keyword": "required",
              "params": {
                "missingProperty": "lng"
              },
              "message": "must have required property 'lng'"
            }
          ]
        }
      }
    ]
  },
  "type": "telemetry",
  "version": "1.0",
  "correlationId": "6f23e24a-aca2-4eee-861a-adf95e5902f2",
  "requestId": "6f23e24a-aca2-4eee-861a-adf95e5902f2"
}
📘

入力ストリームのコンテンツタイプはapplication/jsonで、JSON形式の入力のみが処理されます。JSON形式以外のデータが公開された場合、リクエストは無視され、デッドレターキューにはエラーメッセージが送信されません。

以下の例はHERE Data SDK for TypeScriptを用いて作成したもので、デッドレターキューからのメッセージを使用する方法を示しています。

import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
  PollRequest,
  StreamLayerClient,
  SubscribeRequest,
  UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";

// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-input-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";
const layerId = "tc-dead-letter";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });

  // Create StreamLayerClient.
  const streamLayerClient = new StreamLayerClient({
    catalogHrn: HRN.fromString(catalogHrn),
    layerId,
    settings: olpClientSettings,
  });

  // Create a subscription for the stream topic.
  const subscribeRequest = new SubscribeRequest().withMode("serial");
  const subscribtionId = await streamLayerClient.subscribe(subscribeRequest);

  // Consume data using poll method.
  const pollRequest = new PollRequest()
    .withMode("serial")
    .withSubscriptionId(subscribtionId);
  const messages = await streamLayerClient.poll(pollRequest);
  for (const message of messages) {
    // Base64 decode the message data.
    const data = JSON.parse(
      Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
    );
    console.log('Received message:', JSON.stringify(data, null, 2));
  }

  // Delete subscription from the stream topic.
  await streamLayerClient.unsubscribe(
    new UnsubscribeRequest()
      .withMode("serial")
      .withSubscriptionId(subscribtionId)
  );
}

main();