Daprが提供する主要な機能について理解してみる - Resource bindings & triggersについて

Daprが提供する主要な機能について理解してみる - Resource bindings & triggersについて

January 12, 2020
Kubernetes
kubernetes, dapr, microservices

環境構築などは、Daprの基本の構成を引き続き利用しています。

Resource bindings & triggers

DaprではDaprコンテナが注入されたサービス以外のコンポーネント(Redis, Kafka, AWS DynamoDBなど)ともDaprを通して対話が可能です。

この機能がBindingsと呼ばれています。

あるプログラムでKafkaやRedisと対話しようとすると、クライアントライブラリなどを使って、各コンポーネントに合わせたプログラムを書かないといけません。

image.png

DaprのBindingsでは、規格が決まっており、その各コンポーネントのBindingsは規格を満たす実装になっています。そのため共通的なインタフェース(HTTP/gRPC)になっており プログラム側はコンポーネント用のライブラリを用意せずにHTTP/gRPCといった標準的なやりとりで、外部コンポーネントとやりとりできます。

image.png

Bindingsには出力(Daprコンテナから呼びにいく)と入力(外部からの入力を受けて、Daprコンテナをトリガする)があり 入力側Bindingsで使われるのがTriggersになります。

動かしてみる

https://github.com/dapr/samples/tree/master/5.bindings KafkaBindings

公式でKafka Bindingsを使ったサンプルがあるので、それを動かしてみます。 このサンプルではKafka Bindingsを用意し、そのOutput Bindingsを用いてkafkaのトピックにデータを送るpythonアプリと、Input Bindingsを利用してKafkaのトピックにデータが入ったらトリガされるnodeアプリが作成されます。

まずhelmでkafkaをk8s上にデプロイします。helmはv3.0.0を使います。

$ helm version
version.BuildInfo{Version:"v3.0.0", GitCommit:"e29ce2a54e96cd02ccfce88bee4f58bb6e2a28b6", GitTreeState:"clean", GoVersion:"go1.13.4"}

helmでkafkaを入れます

// 事前にnamespace作っときます
$ kubectl create ns kafka
$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
$ helm repo update
$ helm install dapr-kafka --namespace kafka incubator/kafka --set replicas=1

$ kubectl get po -n kafka
NAME                     READY   STATUS    RESTARTS   AGE
dapr-kafka-0             1/1     Running   2          4m27s
dapr-kafka-zookeeper-0   1/1     Running   0          4m27s
dapr-kafka-zookeeper-1   1/1     Running   0          3m37s
dapr-kafka-zookeeper-2   1/1     Running   0          2m59s

topicを作っておきます。

$ git clone https://github.com/dapr/samples.git
$ cd samples/5.bindings/
$ kubectl apply -f k8s_kafka_testclient.yaml
$ kubectl -n kafka exec testclient -- kafka-topics --zookeeper dapr-kafka-zookeeper:2181 --topic sample --create --partitions 1 --replication-factor 1
Created topic "sample".

次にBindingsをデプロイします

$ kubectl apply -f deploy/kafka_bindings.yaml
$ kubectl get component.dapr.io
NAME           AGE
sample-topic   23s

kafka_bindings.yamlの中身は以下のようになっています。 カスタムリソースでComponentというリソースを定義してます。 spec.metadataでkafkaの接続先や、consumerやpublisherが使うtopicを定義しています。

重要なのがmetadata.nameでこれが、次のnodeアプリで受け付けるパスになります。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  # このnameがInputBindingのパスになる
  name: sample-topic
spec:
  type: bindings.kafka
  metadata:
  # Kafka broker connection setting
  - name: brokers
    value: dapr-kafka.kafka:9092
  # consumer configuration: topic and consumer group
  - name: topics
    value: sample
  - name: consumerGroup
    value: group1
  # publisher configuration: topic
  - name: publishTopic
    value: sample
  - name: authRequired
    value: "false"

次に, このBindingsを使ってtopicをconsumeするnodeアプリをデプロイします。

$ kubectl apply -f deploy/node.yaml

https://github.com/dapr/samples/blob/master/5.bindings/deploy/node.yaml このマニフェスト自体は、これまで紹介したDaprのマニフェストと同じで、ラベルでdapr.io/idを定義しているくらいです。

重要なのがアプリのリクエストを受け付けるパスで、上記の通りComponentのmetadata.nameがパスになってます。

app.post('/sample-topic', (req, res) => {
    console.log("Hello from Kafka!");
    console.log(req.body);
    res.status(200).send();
});

最後にkafkaにpublishするのは、pythonアプリでもいいんですが手動でいろいろみたいので、service invocationでも使ったcallerを使います。

$ kubectl apply -f caller.yml
$ kubectl get po
NAME                                 READY   STATUS    RESTARTS   AGE
caller-deployment-5887c557cd-srhsk   2/2     Running   0          49s
...
$ kubectl exec -it caller-deployment-5887c557cd-srhsk bash

// 3つほどデータを送ってみる
$ curl -H "Content-Type: application/json" -XPOST http://localhost:3500/v1.0/bindings/sample-topic -d '{"data":{"orderId": 200}}'
$ curl -H "Content-Type: application/json" -XPOST http://localhost:3500/v1.0/bindings/sample-topic -d '{"data":{"orderId": 250}}'
$ curl -H "Content-Type: application/json" -XPOST http://localhost:3500/v1.0/bindings/sample-topic -d '{"data":{"orderId": 150}}'

これで、nodeappのログをみるとちゃんと受信しています

$ kubectl logs bindings-nodeapp-d77548db8-vjn29 node -f
...
Hello from Kafka!
{ orderId: 200 }
Hello from Kafka!
{ orderId: 250 }
Hello from Kafka!
{ orderId: 150 }

まとめ

Resource bindingsの概要を確認して、実際にkafkaのトピックに対する入出力をDaprを利用して確認しました。