Daprが提供する主要な機能について理解してみる - Resource bindings & triggersについて
January 12, 2020
環境構築などは、Daprの基本の構成を引き続き利用しています。
Resource bindings & triggers
DaprではDaprコンテナが注入されたサービス以外のコンポーネント(Redis, Kafka, AWS DynamoDBなど)ともDaprを通して対話が可能です。
この機能がBindingsと呼ばれています。
あるプログラムでKafkaやRedisと対話しようとすると、クライアントライブラリなどを使って、各コンポーネントに合わせたプログラムを書かないといけません。
DaprのBindingsでは、規格が決まっており、その各コンポーネントのBindingsは規格を満たす実装になっています。そのため共通的なインタフェース(HTTP/gRPC)になっており プログラム側はコンポーネント用のライブラリを用意せずにHTTP/gRPCといった標準的なやりとりで、外部コンポーネントとやりとりできます。
Bindingsには出力(Daprコンテナから呼びにいく)と入力(外部からの入力を受けて、Daprコンテナをトリガする)があり 入力側Bindingsで使われるのがTriggersになります。
動かしてみる
https://github.com/dapr/samples/tree/master/5.bindings
公式でKafka Bindingsを使ったサンプルがあるので、それを動かしてみます。 このサンプルではKafka Bindingsを用意し、そのOutput Bindingsを用いてkafkaのトピックにデータを送るpythonアプリと、Input Bindingsを利用してKafkaのトピックにデータが入ったらトリガされるnodeアプリが作成されます。
まずhelmでkafkaをk8s上にデプロイします。helmはv3.0.0を使います。
$ helm version
version.BuildInfo{Version:"v3.0.0", GitCommit:"e29ce2a54e96cd02ccfce88bee4f58bb6e2a28b6", GitTreeState:"clean", GoVersion:"go1.13.4"}
helmでkafkaを入れます
// 事前にnamespace作っときます
$ kubectl create ns kafka
$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
$ helm repo update
$ helm install dapr-kafka --namespace kafka incubator/kafka --set replicas=1
$ kubectl get po -n kafka
NAME READY STATUS RESTARTS AGE
dapr-kafka-0 1/1 Running 2 4m27s
dapr-kafka-zookeeper-0 1/1 Running 0 4m27s
dapr-kafka-zookeeper-1 1/1 Running 0 3m37s
dapr-kafka-zookeeper-2 1/1 Running 0 2m59s
topicを作っておきます。
$ git clone https://github.com/dapr/samples.git
$ cd samples/5.bindings/
$ kubectl apply -f k8s_kafka_testclient.yaml
$ kubectl -n kafka exec testclient -- kafka-topics --zookeeper dapr-kafka-zookeeper:2181 --topic sample --create --partitions 1 --replication-factor 1
Created topic "sample".
次にBindingsをデプロイします
$ kubectl apply -f deploy/kafka_bindings.yaml
$ kubectl get component.dapr.io
NAME AGE
sample-topic 23s
kafka_bindings.yamlの中身は以下のようになっています。
カスタムリソースでComponentというリソースを定義してます。
spec.metadata
でkafkaの接続先や、consumerやpublisherが使うtopicを定義しています。
重要なのがmetadata.name
でこれが、次のnodeアプリで受け付けるパスになります。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
# このnameがInputBindingのパスになる
name: sample-topic
spec:
type: bindings.kafka
metadata:
# Kafka broker connection setting
- name: brokers
value: dapr-kafka.kafka:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
次に, このBindingsを使ってtopicをconsumeするnodeアプリをデプロイします。
$ kubectl apply -f deploy/node.yaml
https://github.com/dapr/samples/blob/master/5.bindings/deploy/node.yaml
このマニフェスト自体は、これまで紹介したDaprのマニフェストと同じで、ラベルでdapr.io/id
を定義しているくらいです。
重要なのがアプリのリクエストを受け付けるパスで、上記の通りComponentのmetadata.nameがパスになってます。
app.post('/sample-topic', (req, res) => {
console.log("Hello from Kafka!");
console.log(req.body);
res.status(200).send();
});
最後にkafkaにpublishするのは、pythonアプリでもいいんですが手動でいろいろみたいので、service invocationでも使ったcallerを使います。
$ kubectl apply -f caller.yml
$ kubectl get po
NAME READY STATUS RESTARTS AGE
caller-deployment-5887c557cd-srhsk 2/2 Running 0 49s
...
$ kubectl exec -it caller-deployment-5887c557cd-srhsk bash
// 3つほどデータを送ってみる
$ curl -H "Content-Type: application/json" -XPOST http://localhost:3500/v1.0/bindings/sample-topic -d '{"data":{"orderId": 200}}'
$ curl -H "Content-Type: application/json" -XPOST http://localhost:3500/v1.0/bindings/sample-topic -d '{"data":{"orderId": 250}}'
$ curl -H "Content-Type: application/json" -XPOST http://localhost:3500/v1.0/bindings/sample-topic -d '{"data":{"orderId": 150}}'
これで、nodeappのログをみるとちゃんと受信しています
$ kubectl logs bindings-nodeapp-d77548db8-vjn29 node -f
...
Hello from Kafka!
{ orderId: 200 }
Hello from Kafka!
{ orderId: 250 }
Hello from Kafka!
{ orderId: 150 }
まとめ
Resource bindingsの概要を確認して、実際にkafkaのトピックに対する入出力をDaprを利用して確認しました。