ガイドAPIリファレンス
ガイド

出力ストリームからイベントを受け取る

イベントは特定の条件が満たされたときに発生します。ほとんどの場合、条件はルールによって定義されます。ルールに関連付けられたデバイスでは、ルールで定義されたしきい値を超えるとイベントが発生します。

はじめに

このチュートリアルでは、HEREプラットフォームカタログストリームレイヤーを使用して、Data API経由でトラッキングイベントを受信する方法を説明します。 このチュートリアルでは、イベントをリッスンして表示するクライアントプログラムを作成します。その後、テスト用デバイスとそのデバイス用のセンサールールを作成してからテレメトリーを取り込むことで、センサールールがトリガーされ、イベントが発生します。

必要なもの

ストリームへのアクセスを許可する

アプリケーションがストリームにアクセスできるようにするには、ストリームのカタログへのアクセス権をアプリケーションに付与する必要があります。詳細については、プロジェクトメンバーを追加する方法を参照してください。

ストリームからのイベントを消費する

ストリームからのイベントを消費するには、HERE Data SDK for TypeScriptを使用できます。イベントにアクセスするには、次の2つの方法があります。

このドキュメントでは、次に両方のオプションを紹介します。ご自身に合う方法を選んでください。Kafkaを直接使用する方が効率的ですが、REST APIを使用するとオーバーヘッドが増加します。

オプション1:Data APIを使用してイベントをポーリングする

まず、新規ディレクトリを作成してそれに切り替えます。

mkdir events-stream-app-poll && cd events-stream-app-poll

以下のコードを新規ディレクトリのindex.tsファイルに保存します。

import { HRN, OlpClientSettings } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import {
  PollRequest,
  StreamLayerClient,
  SubscribeRequest,
  UnsubscribeRequest,
} from "@here/olp-sdk-dataservice-read";

// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";

const layerId = "tc-events";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });

  // Create StreamLayerClient.
  const streamLayerClient = new StreamLayerClient({
    catalogHrn: HRN.fromString(catalogHrn),
    layerId,
    settings: olpClientSettings,
  });

  // Create a subscription for the stream topic.
  const subscribeRequest = new SubscribeRequest().withMode("serial");
  const subscriptionId = await streamLayerClient.subscribe(subscribeRequest);

  // Consume data using poll method.
  const pollRequest = new PollRequest()
    .withMode("serial")
    .withSubscriptionId(subscriptionId);

  let interrupted = false;
  process.on('SIGINT', () => { interrupted = true; });

  console.log('Consuming events (press Ctrl+C to exit)...');

  try {
    while (!interrupted) {
      const messages = await streamLayerClient.poll(pollRequest);
      for (const message of messages) {
        // Base64 decode the message data.
        const data = JSON.parse(
          Buffer.from(message.metaData.data as string, 'base64').toString('utf-8')
        );
        console.log(data);
      }
    }
  } finally {
    // Delete subscription from the stream topic.
    await streamLayerClient.unsubscribe(
      new UnsubscribeRequest()
        .withMode("serial")
        .withSubscriptionId(subscriptionId)
    );
  }
}

main();

次に、変数accessKeyIdaccessKeySecretを先ほど作成したOAuth 2.0認証情報に置き換える必要があります。その後、projectHrnをプロジェクトHRNに、catalogHrnをカタログHRNに置き換えます。 サンプルプログラムをビルドして実行するには、次のシェルコマンドを使用します。 TypeScriptプロジェクトを作成し、その依存関係をインストールします。

npm init --yes \
  && npm install --save-dev typescript \
  && npx tsc --init \
  && npm install --save-dev @types/node \
  && npm install --save \
    @here/olp-sdk-authentication \
    @here/olp-sdk-core \
    @here/olp-sdk-dataservice-read

プログラムをコンパイルして実行します。

npx tsx index.ts

プログラムが実行されたので、イベントをトリガーする条件を設定する必要があります。プログラムを実行したままにして、新しいターミナルを開きます。 以下はKafkaを使用する2番目の方法です。スキップしてもかまいません。 次の手順はユーザートークンの取得です。

オプション2:Kafkaで処理する

まず、新規ディレクトリを作成してそれに切り替えます。

mkdir events-stream-app-consume && cd events-stream-app-consume

以下のコードを新規ディレクトリのindex.tsファイルに保存します。

import { HRN, OlpClientSettings, RequestFactory } from "@here/olp-sdk-core";
import { UserAuth, requestToken } from "@here/olp-sdk-authentication";
import { StreamApi } from "@here/olp-sdk-dataservice-api";
import { Kafka, logLevel } from "kafkajs";
import { v4 as uuidv4 } from "uuid";


// Access credentials and project information.
const accessKeyId = "replace-with-your-access-key-id";
const accessKeySecret = "replace-with-your-access-key-secret";

// Project HRN format: "hrn:here:authorization::<organization>:project/<projectId>"
const projectHrn = "replace-with-your-project-hrn";

// Catalog HRN format: "hrn:here:data::<organization>:tc-output-<projectId>"
const catalogHrn = "replace-with-your-catalog-hrn";

const layerId = "tc-events";

async function main(): Promise<void> {
  // Setup client settings.
  const userAuth = new UserAuth({
    tokenRequester: requestToken,
    credentials: { accessKeyId, accessKeySecret },
    scope: projectHrn
  });
  const olpClientSettings = new OlpClientSettings({
    environment: "here", getToken: () => userAuth.getToken()
  });

  // Use API lookup API to find a Stream v2 API endpoint.
  const requestBuilder = await RequestFactory.create(
    "stream", "v2", olpClientSettings, HRN.fromString(catalogHrn)
  );
  // Get kafka connection parameters from the Stream v2 API.
  const streamLayerEndpoint = await StreamApi.endpointByConsumer(
    requestBuilder, { layerId, type: "consumer" }
  );

  // Create kafka consumer.
  const kafka = new Kafka({
    brokers: streamLayerEndpoint.bootstrapServers.map(s => `${s.hostname}:${s.port}`),
    clientId: streamLayerEndpoint.clientId,
    logLevel: logLevel.WARN,
    sasl: {
      mechanism: "oauthbearer",
      oauthBearerProvider: async () => ({ value: await userAuth.getToken() })
    },
    ssl: true,
  });
  const consumer = kafka.consumer({
    groupId: `${streamLayerEndpoint.consumerGroupPrefix}test-group`
  });

  // Subscribe and listen for events.
  await consumer.connect();
  await consumer.subscribe({ topic: streamLayerEndpoint.topic });
  await consumer.run({
    eachMessage: async ({ message }) => {
      const data = JSON.parse((message.value || "{}").toString());
      console.log(data);
    },
  });

  // Disconnect and exit on Ctrl+C.
  console.log('Consuming events (press Ctrl+C to exit)...');
  process.on('SIGINT', async () => {
    console.log('Disconnecting...');
    await consumer.disconnect();
  });
}

main();

次に、変数accessKeyIdaccessKeySecretを先ほど作成したOAuth 2.0認証情報に置き換える必要があります。その後、projectHrnをプロジェクトHRNに、catalogHrnをカタログHRNに置き換えます。 サンプルプログラムをビルドして実行するには、次のシェルコマンドを使用します。 TypeScriptプロジェクトを作成し、その依存関係をインストールします。

npm init --yes \
  && npm install --save-dev typescript \
  && npx tsc --init \
  && npm install --save-dev @types/node @types/uuid \
  && npm install --save \
    @here/olp-sdk-authentication \
    @here/olp-sdk-core \
    @here/olp-sdk-dataservice-api \
    kafkajs \
    uuid

プログラムをコンパイルして実行します。

npx tsx index.ts

プログラムが実行されたので、イベントをトリガーする条件を設定する必要があります。プログラムを実行したままにして、新しいターミナルを開きます。

ユーザートークンを取得する

次の手順を実行するには、プロジェクトスコープユーザーアクセストークンが必要になります。トークンを取得するには、プラットフォームプロジェクトへのサインイン方法の手順に従います。スコープアクセストークンは以降、{accessToken}として表記します。

テストデバイスを入手する

プロジェクトに登録済みのデバイスが必要です。すでに持っているデバイスを使用するか、次の手順に従います。

テストデバイスが用意できたら、トラッキングIDを控えておきます。この後のステップで{trackingId}として必要になります。

センサールールを作成する

curl -X POST \
  'https://tracking.hereapi.com/sensors/v3' \
  -H 'authorization: Bearer {accessToken}' \
  -H 'content-type: application/json' \
  -d '{
  "type": "temperature",
  "range":  {
    "begin": 5,
    "end": 40
  },
  "name": "My temperature rule"
}'

このコマンドにより温度ルールが作成され、一意のIDが返されます。ID値を控えておきます。この値は次のステップで{ruleId}として参照します。このルールでは、取り込まれた温度値が摂取5〜40度であれば正常範囲内で、この範囲外の値は異常です。報告された温度値が正常値範囲から異常値範囲に入った場合、またはその逆の場合、イベントがトリガーされます。

ルールをデバイスに関連付ける

デバイスに対してルールがトリガーされる前に、デバイスとルールを関連付ける必要があります。

curl -X PUT \
  'https://tracking.hereapi.com/associations/v3/{trackingId}/sensors/{ruleId}' \
  -H 'authorization: Bearer {accessToken}' \
  -H 'content-type: application/json'

テレメトリーを取り込み、イベントを観察する

テレメトリーを取り込み、温度範囲内および温度範囲外の両方の温度値を送信します。

timestamp=$(date +%s)
curl -X POST \
  https://tracking.hereapi.com/v3/ \
  -H 'authorization: Bearer {accessToken}' \
  -H 'content-type: application/json' \
  -d '{
  "id": "{trackingId}",
  "data": [
    {
      "timestamp": '$timestamp'000,
      "system": {
        "reportedSensorData": {
          "temperatureC": 5
        }
      }
    },
    {
      "timestamp": '$timestamp'001,
      "system": {
        "reportedSensorData": {
          "temperatureC": 4
        }
      }
    }
  ]
}'

このコマンドはタイプIN_RANGEおよびBELOW_RANGEtemperatureイベントをトリガーし、イベントを消費するプログラムにそれらのイベントが表示されます。以下がその表示例です。

Consuming events (press Ctrl+C to exit)...
{
  type: 'events',
  version: '1.0',
  events: [
    {
      trackingId: 'HERE-84fe3e53-5988-4b4a-91fd-47611bb71a7e',
      timestamp: 1726673335000,
      ruleId: '6e627562-196d-4a90-838c-fc39372fa98f',
      eventSource: 'temperature',
      eventType: 'IN_RANGE',
      initialState: true
    },
    {
      trackingId: 'HERE-84fe3e53-5988-4b4a-91fd-47611bb71a7e',
      timestamp: 1726673335001,
      ruleId: '6e627562-196d-4a90-838c-fc39372fa98f',
      eventSource: 'temperature',
      eventType: 'BELOW_RANGE',
      initialState: false
    }
  ],
  correlationId: 'feb768c9-b0c4-4bbf-bd68-63f99e238b37'
}

メッセージ形式の詳細については、「イベント出力ストリーム」を参照してください。