Daprが提供する主要な機能について理解してみる - Publish and subscribeについて

Daprが提供する主要な機能について理解してみる - Publish and subscribeについて

January 12, 2020
Kubernetes
kubernetes, dapr, microservices

環境構築などは、Daprの基本の構成を引き続き利用しています。

Publish and subscribe

この機能はpubsub対応のコンポーネント(kafkaやredisなど)につなげて、Daprを通して対話ができるというのものです この説明を聞くと、pubsub対応のコンポーネントのBindings?と思います。 前の記事で試したKafkaのBindingsと何が違うのでしょうか?

docsのconceptを見ると以下のような記述があります。

Dapr guarantees At-Least-Once semantics for message delivery it can assume the message is delivered at least once to any subscriber

subscriberにメッセージが最低1回以上配信されることが保証されます。

When multiple instances of the same application ID subscribe to a topic, Dapr will make sure to deliver the message to only one instance. If two different applications with different IDs subscribe to a topic, at least one instance in each application receives a copy of the same message.

またsubsciberが複数いる場合も、それぞれに最低1回以上配信されることが保証されます。

まずsubscribeするdapr.io/idのアプリグループそれぞれに最低1回以上は配信されます。 さらにdapr.io/idが同じpodが複数いる場合、これらのいずれか1つ以上のpodに配信されます。

bindingsでpubsubをしようとするとどうなる?

逆にbindingsではどのような挙動になるのでしょうか?(dapr.io/idが同じpodが複数いる場合はすべてのpodに配信される?)

1つ前の章で使ったkafka-bindingsからトリガーされるnode-appのreplica数を3つに増やして、確認してみます。

spec:
-  replicas: 1
+  replicas: 3
  selector:
    matchLabels:
      app: bindingsnodeapp

kafka-bindings-multi-node-app.gif

1つにしか配信されていないけど、負荷分散とかされていないように見えますね。

dapr.io/idが異なる場合はどうなるでしょうか? node.ymlをコピーして、idなどを変えてデプロイしてみます

$ diff node.yaml node2.yaml -u
--- node.yaml	2019-12-13 12:38:46.000000000 +0900
+++ node2.yaml	2019-12-13 12:38:36.000000000 +0900
@@ -1,12 +1,12 @@
 kind: Service
 apiVersion: v1
 metadata:
-  name: bindings-nodeapp
+  name: bindings-nodeapp2
   labels:
-    app: bindingsnodeapp
+    app: bindingsnodeapp2
 spec:
   selector:
-    app: bindingsnodeapp
+    app: bindingsnodeapp2
   ports:
   - protocol: TCP
     port: 80
@@ -16,21 +16,21 @@
 apiVersion: apps/v1
 kind: Deployment
 metadata:
-  name: bindings-nodeapp
+  name: bindings-nodeapp2
   labels:
-    app: bindingsnodeapp
+    app: bindingsnodeapp2
 spec:
   replicas: 2
   selector:
     matchLabels:
-      app: bindingsnodeapp
+      app: bindingsnodeapp2
   template:
     metadata:
       labels:
-        app: bindingsnodeapp
+        app: bindingsnodeapp2
       annotations:
         dapr.io/enabled: "true"
-        dapr.io/id: "bindings-nodeapp"
+        dapr.io/id: "bindings-nodeapp2"
         dapr.io/port: "3000"
     spec:
       containers:

kafka-bindings-multi-dapr-app-id.gif

pubsubと違って、それぞれのdapr.io/idに配布はされず、1つのpodにしか配信されていません。

Publish and subscribeでredisと通信する

では、pubsubを定義した場合はどうなるかを試していきましょう まずpubsubコンポーネントを定義します。 2019/12時点ではまだkafkaをサポートしていないため、redisを使います。

先にredisをhelmで入れておきます。

$ helm install dapr-redis --set password=hogehoge stable/redis
...
$ kubectl get po | grep redis
dapr-redis-master-0                      1/1     Running   0          6h48m
dapr-redis-slave-0                       1/1     Running   0          6h48m
dapr-redis-slave-1                       1/1     Running   0          6h47m

bindingsと同様にredis-pubsubのComponentを定義します。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.redis
  metadata:
  - name: "redisHost"
    value: "dapr-redis-master:6379"
  - name: "redisPassword"
    value: "hogehoge"

redisHostはredisのservice名とport, redisPasswordはhelmで作成時に指定したものです。

$ kubectl get svc | grep redis
dapr-redis-headless      ClusterIP      None             <none>                                                                        6379/TCP           6h52m
dapr-redis-master        ClusterIP      10.100.179.73    <none>                                                                        6379/TCP           6h52m
dapr-redis-slave         ClusterIP      10.100.27.189    <none>                                                                        6379/TCP           6h52m

パスワードを平文で直書きになるので、本番環境などでは利用が難しいですね。 そのためDaprはSecretの読み込みもサポートしてます。

$ kubectl create secret generic redis-password --from-literal=password=hogehoge

secretを作成して、secretKeyRefで参照できます。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.redis
  metadata:
  - name: "redisHost"
    value: "dapr-redis-master:6379"
  - name: "redisPassword"
    secretKeyRef:
      name: redis-password
      key:  password

これは。PubSubだけでなく他の全てのDapr Componentで対応しているそうです。

https://github.com/dapr/docs/blob/master/concepts/components/secrets.md

Components can reference secrets for the spec.metadata section.


戻りまして、次にsubscribeするアプリを2つデプロイします。

こちらもマニフェスト自体は特に変わりなくで、アプリ側でいろいろ設定してます。 nodejsの方で確認してみます。

https://github.com/dapr/samples/blob/master/4.pub-sub/node-subscriber/app.js#L15-L20

app.get('/dapr/subscribe', (_req, res) => {
    res.json([
        'A',
        'B'
    ]);
});

GETエンドポイントで上記のような定義をしています。 Daprはここにアクセスし、このPodがどのトピックをサブスクライブするかを把握します。 (この場合はA, Bトピックをサブスクライブします。)

https://github.com/dapr/samples/blob/master/4.pub-sub/node-subscriber/app.js#L22-L30

app.post('/A', (req, res) => {
    console.log("A: ", req.body);
    res.sendStatus(200);
});

app.post('/B', (req, res) => {
    console.log("B: ", req.body);
    res.sendStatus(200);
});

実際にそれぞれのトピックからデータをsubscirbeしたときの処理は上になります。 A,Bというパスがそのまま受け付けるトピック名になります。

参考

そしてまた同様にcallerからcurlを投げてみます。

pubsub-id.gif

pubsubのエンドポイントは以下のようになっています。

localhost:3500/v1.0/publish/{topic名}

このエンドポイントにjson bodyをPOSTしてデータを送る挙動になります。

上のgifでは、A, B, Cの3つのtopicにデータを送っています。 deployしたnodeとpyhtonは, nodeがA,Bトピックを、 pythonがA,Cトピックをサブスクライブするようになっています。 実際に、Aは両方で、Bはnodeだけ、Cはpythonだけがサブスクライブするのが確認できました。

また、redisには以下のようにデータやキーが登録されていました。

127.0.0.1:6379> keys *
1) "C"
2) "B"
3) "A"
4) ""
127.0.0.1:6379> XRANGE A - + COUNT 1
1) 1) "1576240692974-0"
    2) 1) "data"
       2) "{\"id\":\"c1f507f6-d03d-4083-be1f-a2ae5ffa870e\",\"source\":\"caller\",\"type\":\"com.dapr.event.sent\",\"specversion\":\"0.3\",\"datacontenttype\":\"application/json\",\"data\":{\"sample\":\"aaaaaaaaaaaaa\"}}"

最後に、1つのアプリが複数pod存在するときの挙動も確認しておきます。

nodeappのreplica数を増やして、同様にcallerでpublishしてみます。

metadata:
  name: node-subscriber
  labels:
    app: node-subscriber
spec:
-  replicas: 1
+  replicas: 3
  selector:

pubsub-one-app.gif

bindingsの時と違って、ラウンドロビンのような感じで負荷分散ができてます。 (これだけ見ると、どれか1つのpodだけがサブスクライブするように見えますが、daprが保証しているのはAt-Least-Onceのため、2つのpodが同じデータを重複して受け取る可能性はあります。その対応はアプリ側で実施する必要がありそうです)

まとめ

pubsubによる通信とbindingsによる通信の違いを確認しました。 あるコンポーネント(例えばredis)と通信する際は用途によって、pubsubかbindingsかを使い分ける必要があります。