Usual Software Engineer

よくあるソフトウェアエンジニアのブログ

ThingsboardのKafka pluginを試してみた

普段IoTなクラウドプラットフォームを運営しているエンジニアをやっております。
たまには競合プロダクトも触らないとね、ということで今回はThingsboardを触ってみました。
ついでにデフォルトで組み込まれているKafka pluginも試してみました。
今回のサンプルのリポジトリはこちらです。

github.com

まずThingsboardについてですが、OSSでIoTのためのデバイスのデータ管理や状態の可視化を行うことができるプロダクト、という感じです。
Java製でAkkaを使用しています。WebのUIとREST APIが用意されていて、IoTなら当然のMQTT通信もできます。
Iot Gatewayの管理機能もありますし、Ruleを定義してアラートを発生させるなど一通り揃っていて良いプロダクトだと思います。

https://thingsboard.io/images/demo/fleet-monitoring-dashboard.png

Thingsboard自体はチュートリアルもちゃんと用意されていてあまり細かい話をしても意味が無いので、今回僕が用意した docker-compose.yml を説明します。

thingsboard-kafka-example/docker-compose.yml at 6dbf90e54641743ab02dc76792714d2e068dba47 · innossh/thingsboard-kafka-example · GitHub

version: '2'

services:
  thingsboard:
    image: "thingsboard/application:1.1.0"
    ports:
      - "8080:8080"
      - "1883:1883"
      - "5683:5683"
    env_file:
      - thingsboard.env
    entrypoint: ./run_thingsboard.sh
  thingsboard-db-schema:
    image: "thingsboard/thingsboard-db-schema:1.1.0"
    env_file:
      - thingsboard-db-schema.env
    entrypoint: ./install_schema.sh
  db:
    image: "cassandra:3.9"
    volumes:
      - "${CASSANDRA_DATA_DIR}:/var/lib/cassandra"
  zk:
    image: "zookeeper:3.4.9"
    restart: always
  kafka_zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
      KAFKA_CREATE_TOPICS: "test-topic:1:1"
      KAFKA_ZOOKEEPER_CONNECT: kafka_zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

長々と書いてありますが、ThingsboardのDocker版インストールガイドの docker-compose.yml と、Dockerでkafkaを使いたい人のためのkafka-dockerのリポジトリdocker-compose.yml を合体させただけです。
余談ですがdocker composeのymlは簡潔で、ひと目で何がどういうコンテナか判別できるのでとてもありがたいですね。
ThingsboardにはCassandraとZookeeperが必要で、それに加えてKafkaとKafka用のZookeeperのコンテナを起動するようになっています。 .env ファイル内に CASSANDRA_DATA_DIR を定義することをお忘れなく。

Thingsboardが起動したら、Webからデフォルトのアカウントでログインします。いくつかのデバイスがすでにサンプルとして登録されているので、今回はそれを使っちゃいます。自由に新規デバイスを登録できるので増やしても良いでしょう。

f:id:innossh:20170228225136p:plain f:id:innossh:20170228225148p:plain

さっそくKafka pluginを使ってみます。どのように使うのかというと、

  1. バイスのtelemetryを更新する(今回はREST APIで)
  2. Thingsboardに登録したRuleが条件の一致により実行される
  3. Kafka pluginに設定された内容が実行される
  4. Kafka consumerで流れてきたデータストリームをconsumeする

ざっくりこんな形でいきます。 この小さいサンプルがうまくいけば、将来的に 全デバイスの更新データをストリームにして、consumerでよしなにそのデータを処理する ことが可能になります。ストリームデータを別のストレージに突っ込んで解析するもよし、集約して可視化するもよし、準リアルタイムで何でもできるわけです。

Kafka pluginは以下のように登録します。Bootstrap Serversの設定は、 172.17.0.1:9092 としました。Docker for Macを使用しているのでホストマシンのMac経由でKafkaにアクセスさせてます。たぶん kafka:9092 でもOKだと思います。

f:id:innossh:20170228225230p:plain f:id:innossh:20170228225245p:plain

登録したら忘れずにActivateボタンを押しましょう。

次にKafka pluginを利用するRuleを以下のように登録します。Ruleも同じくActivateボタンを忘れずに。

f:id:innossh:20170228225326p:plain f:id:innossh:20170228225344p:plain f:id:innossh:20170228225336p:plain

さて、いよいよconsumeを行います。
Kafka consumerはdocker composeで起動したコンテナとは別のコンテナを一時的に起動して、そこで動かします。

$ ./start-kafka-shell.sh 172.17.0.1 172.17.0.1:2181
bash-4.3# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=test-topic --zookeeper=$ZK

Kafka用のZookeeperにセッション確立のログが表示されたらOKです。ターミナルはそのままで。
最後にTest Device A1にtelemetryをPOSTします。

$ THINGSBOARD_HOST=localhost
  THINGSBOARD_PORT=8080
  ACCESS_TOKEN=A1_TEST_TOKEN
  curl -s -X POST http://$THINGSBOARD_HOST:$THINGSBOARD_PORT/api/v1/$ACCESS_TOKEN/telemetry -H "Content-Type:application/json" -d '{"temp":73.4}'

するとconsumerの方のターミナルに以下が出力されます。

73.4

うっ…地味wwですwwしかし大きな一歩です。
Kafka Pluginのactionがtempの値を出力するようになっており、consumerではその値をそのまま標準出力に吐き出す仕組みになっているので地味な感じになりました。
本来はconsumerを自前で実装して好きな処理を行う形になります。

今回はThingsboardに触れて、Kafka pluginでデバイスのtelemetryをproduceしKafka consumerでconsumeするサンプルを紹介しました。今後も期待できそうなOSSプロダクトですね。
うちのプラットフォームも頑張らなくては。