ThingsboardのKafka pluginを試してみた
普段IoTなクラウドプラットフォームを運営しているエンジニアをやっております。
たまには競合プロダクトも触らないとね、ということで今回はThingsboardを触ってみました。
ついでにデフォルトで組み込まれているKafka pluginも試してみました。
今回のサンプルのリポジトリはこちらです。
まずThingsboardについてですが、OSSでIoTのためのデバイスのデータ管理や状態の可視化を行うことができるプロダクト、という感じです。
Java製でAkkaを使用しています。WebのUIとREST APIが用意されていて、IoTなら当然のMQTT通信もできます。
Iot Gatewayの管理機能もありますし、Ruleを定義してアラートを発生させるなど一通り揃っていて良いプロダクトだと思います。
Thingsboard自体はチュートリアルもちゃんと用意されていてあまり細かい話をしても意味が無いので、今回僕が用意した docker-compose.yml を説明します。
version: '2'
services:
thingsboard:
image: "thingsboard/application:1.1.0"
ports:
- "8080:8080"
- "1883:1883"
- "5683:5683"
env_file:
- thingsboard.env
entrypoint: ./run_thingsboard.sh
thingsboard-db-schema:
image: "thingsboard/thingsboard-db-schema:1.1.0"
env_file:
- thingsboard-db-schema.env
entrypoint: ./install_schema.sh
db:
image: "cassandra:3.9"
volumes:
- "${CASSANDRA_DATA_DIR}:/var/lib/cassandra"
zk:
image: "zookeeper:3.4.9"
restart: always
kafka_zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
KAFKA_CREATE_TOPICS: "test-topic:1:1"
KAFKA_ZOOKEEPER_CONNECT: kafka_zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
長々と書いてありますが、ThingsboardのDocker版インストールガイドの docker-compose.yml と、Dockerでkafkaを使いたい人のためのkafka-dockerのリポジトリの docker-compose.yml を合体させただけです。
余談ですがdocker composeのymlは簡潔で、ひと目で何がどういうコンテナか判別できるのでとてもありがたいですね。
ThingsboardにはCassandraとZookeeperが必要で、それに加えてKafkaとKafka用のZookeeperのコンテナを起動するようになっています。 .env ファイル内に CASSANDRA_DATA_DIR を定義することをお忘れなく。
Thingsboardが起動したら、Webからデフォルトのアカウントでログインします。いくつかのデバイスがすでにサンプルとして登録されているので、今回はそれを使っちゃいます。自由に新規デバイスを登録できるので増やしても良いでしょう。

さっそくKafka pluginを使ってみます。どのように使うのかというと、
- デバイスのtelemetryを更新する(今回はREST APIで)
- Thingsboardに登録したRuleが条件の一致により実行される
- Kafka pluginに設定された内容が実行される
- Kafka consumerで流れてきたデータストリームをconsumeする
ざっくりこんな形でいきます。 この小さいサンプルがうまくいけば、将来的に 全デバイスの更新データをストリームにして、consumerでよしなにそのデータを処理する ことが可能になります。ストリームデータを別のストレージに突っ込んで解析するもよし、集約して可視化するもよし、準リアルタイムで何でもできるわけです。
Kafka pluginは以下のように登録します。Bootstrap Serversの設定は、 172.17.0.1:9092 としました。Docker for Macを使用しているのでホストマシンのMac経由でKafkaにアクセスさせてます。たぶん kafka:9092 でもOKだと思います。

登録したら忘れずにActivateボタンを押しましょう。
次にKafka pluginを利用するRuleを以下のように登録します。Ruleも同じくActivateボタンを忘れずに。

さて、いよいよconsumeを行います。
Kafka consumerはdocker composeで起動したコンテナとは別のコンテナを一時的に起動して、そこで動かします。
$ ./start-kafka-shell.sh 172.17.0.1 172.17.0.1:2181 bash-4.3# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=test-topic --zookeeper=$ZK
Kafka用のZookeeperにセッション確立のログが表示されたらOKです。ターミナルはそのままで。
最後にTest Device A1にtelemetryをPOSTします。
$ THINGSBOARD_HOST=localhost
THINGSBOARD_PORT=8080
ACCESS_TOKEN=A1_TEST_TOKEN
curl -s -X POST http://$THINGSBOARD_HOST:$THINGSBOARD_PORT/api/v1/$ACCESS_TOKEN/telemetry -H "Content-Type:application/json" -d '{"temp":73.4}'
するとconsumerの方のターミナルに以下が出力されます。
73.4
うっ…地味wwですwwしかし大きな一歩です。
Kafka Pluginのactionがtempの値を出力するようになっており、consumerではその値をそのまま標準出力に吐き出す仕組みになっているので地味な感じになりました。
本来はconsumerを自前で実装して好きな処理を行う形になります。
今回はThingsboardに触れて、Kafka pluginでデバイスのtelemetryをproduceしKafka consumerでconsumeするサンプルを紹介しました。今後も期待できそうなOSSプロダクトですね。
うちのプラットフォームも頑張らなくては。
