ThingsboardのKafka pluginを試してみた
普段IoTなクラウドプラットフォームを運営しているエンジニアをやっております。
たまには競合プロダクトも触らないとね、ということで今回はThingsboardを触ってみました。
ついでにデフォルトで組み込まれているKafka pluginも試してみました。
今回のサンプルのリポジトリはこちらです。
まずThingsboardについてですが、OSSでIoTのためのデバイスのデータ管理や状態の可視化を行うことができるプロダクト、という感じです。
Java製でAkkaを使用しています。WebのUIとREST APIが用意されていて、IoTなら当然のMQTT通信もできます。
Iot Gatewayの管理機能もありますし、Ruleを定義してアラートを発生させるなど一通り揃っていて良いプロダクトだと思います。
Thingsboard自体はチュートリアルもちゃんと用意されていてあまり細かい話をしても意味が無いので、今回僕が用意した docker-compose.yml
を説明します。
version: '2' services: thingsboard: image: "thingsboard/application:1.1.0" ports: - "8080:8080" - "1883:1883" - "5683:5683" env_file: - thingsboard.env entrypoint: ./run_thingsboard.sh thingsboard-db-schema: image: "thingsboard/thingsboard-db-schema:1.1.0" env_file: - thingsboard-db-schema.env entrypoint: ./install_schema.sh db: image: "cassandra:3.9" volumes: - "${CASSANDRA_DATA_DIR}:/var/lib/cassandra" zk: image: "zookeeper:3.4.9" restart: always kafka_zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: build: kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1 KAFKA_CREATE_TOPICS: "test-topic:1:1" KAFKA_ZOOKEEPER_CONNECT: kafka_zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock
長々と書いてありますが、ThingsboardのDocker版インストールガイドの docker-compose.yml
と、Dockerでkafkaを使いたい人のためのkafka-dockerのリポジトリの docker-compose.yml
を合体させただけです。
余談ですがdocker composeのymlは簡潔で、ひと目で何がどういうコンテナか判別できるのでとてもありがたいですね。
ThingsboardにはCassandraとZookeeperが必要で、それに加えてKafkaとKafka用のZookeeperのコンテナを起動するようになっています。 .env
ファイル内に CASSANDRA_DATA_DIR
を定義することをお忘れなく。
Thingsboardが起動したら、Webからデフォルトのアカウントでログインします。いくつかのデバイスがすでにサンプルとして登録されているので、今回はそれを使っちゃいます。自由に新規デバイスを登録できるので増やしても良いでしょう。
さっそくKafka pluginを使ってみます。どのように使うのかというと、
- デバイスのtelemetryを更新する(今回はREST APIで)
- Thingsboardに登録したRuleが条件の一致により実行される
- Kafka pluginに設定された内容が実行される
- Kafka consumerで流れてきたデータストリームをconsumeする
ざっくりこんな形でいきます。 この小さいサンプルがうまくいけば、将来的に 全デバイスの更新データをストリームにして、consumerでよしなにそのデータを処理する ことが可能になります。ストリームデータを別のストレージに突っ込んで解析するもよし、集約して可視化するもよし、準リアルタイムで何でもできるわけです。
Kafka pluginは以下のように登録します。Bootstrap Serversの設定は、 172.17.0.1:9092
としました。Docker for Macを使用しているのでホストマシンのMac経由でKafkaにアクセスさせてます。たぶん kafka:9092
でもOKだと思います。
登録したら忘れずにActivateボタンを押しましょう。
次にKafka pluginを利用するRuleを以下のように登録します。Ruleも同じくActivateボタンを忘れずに。
さて、いよいよconsumeを行います。
Kafka consumerはdocker composeで起動したコンテナとは別のコンテナを一時的に起動して、そこで動かします。
$ ./start-kafka-shell.sh 172.17.0.1 172.17.0.1:2181 bash-4.3# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=test-topic --zookeeper=$ZK
Kafka用のZookeeperにセッション確立のログが表示されたらOKです。ターミナルはそのままで。
最後にTest Device A1にtelemetryをPOSTします。
$ THINGSBOARD_HOST=localhost THINGSBOARD_PORT=8080 ACCESS_TOKEN=A1_TEST_TOKEN curl -s -X POST http://$THINGSBOARD_HOST:$THINGSBOARD_PORT/api/v1/$ACCESS_TOKEN/telemetry -H "Content-Type:application/json" -d '{"temp":73.4}'
するとconsumerの方のターミナルに以下が出力されます。
73.4
うっ…地味wwですwwしかし大きな一歩です。
Kafka Pluginのactionがtempの値を出力するようになっており、consumerではその値をそのまま標準出力に吐き出す仕組みになっているので地味な感じになりました。
本来はconsumerを自前で実装して好きな処理を行う形になります。
今回はThingsboardに触れて、Kafka pluginでデバイスのtelemetryをproduceしKafka consumerでconsumeするサンプルを紹介しました。今後も期待できそうなOSSプロダクトですね。
うちのプラットフォームも頑張らなくては。