Daprが提供する主要な機能について理解してみる - Publish and subscribeについて
January 12, 2020
環境構築などは、Daprの基本の構成を引き続き利用しています。
Publish and subscribe
この機能はpubsub対応のコンポーネント(kafkaやredisなど)につなげて、Daprを通して対話ができるというのものです この説明を聞くと、pubsub対応のコンポーネントのBindings?と思います。 前の記事で試したKafkaのBindingsと何が違うのでしょうか?
docsのconceptを見ると以下のような記述があります。
Dapr guarantees At-Least-Once semantics for message delivery it can assume the message is delivered at least once to any subscriber
subscriberにメッセージが最低1回以上配信されることが保証されます。
When multiple instances of the same application ID subscribe to a topic, Dapr will make sure to deliver the message to only one instance. If two different applications with different IDs subscribe to a topic, at least one instance in each application receives a copy of the same message.
またsubsciberが複数いる場合も、それぞれに最低1回以上配信されることが保証されます。
まずsubscribeするdapr.io/id
のアプリグループそれぞれに最低1回以上は配信されます。
さらにdapr.io/id
が同じpodが複数いる場合、これらのいずれか1つ以上のpodに配信されます。
bindingsでpubsubをしようとするとどうなる?
逆にbindingsではどのような挙動になるのでしょうか?(dapr.io/id
が同じpodが複数いる場合はすべてのpodに配信される?)
1つ前の章で使ったkafka-bindingsからトリガーされるnode-appのreplica数を3つに増やして、確認してみます。
spec:
- replicas: 1
+ replicas: 3
selector:
matchLabels:
app: bindingsnodeapp
1つにしか配信されていないけど、負荷分散とかされていないように見えますね。
dapr.io/idが異なる場合はどうなるでしょうか? node.ymlをコピーして、idなどを変えてデプロイしてみます
$ diff node.yaml node2.yaml -u
--- node.yaml 2019-12-13 12:38:46.000000000 +0900
+++ node2.yaml 2019-12-13 12:38:36.000000000 +0900
@@ -1,12 +1,12 @@
kind: Service
apiVersion: v1
metadata:
- name: bindings-nodeapp
+ name: bindings-nodeapp2
labels:
- app: bindingsnodeapp
+ app: bindingsnodeapp2
spec:
selector:
- app: bindingsnodeapp
+ app: bindingsnodeapp2
ports:
- protocol: TCP
port: 80
@@ -16,21 +16,21 @@
apiVersion: apps/v1
kind: Deployment
metadata:
- name: bindings-nodeapp
+ name: bindings-nodeapp2
labels:
- app: bindingsnodeapp
+ app: bindingsnodeapp2
spec:
replicas: 2
selector:
matchLabels:
- app: bindingsnodeapp
+ app: bindingsnodeapp2
template:
metadata:
labels:
- app: bindingsnodeapp
+ app: bindingsnodeapp2
annotations:
dapr.io/enabled: "true"
- dapr.io/id: "bindings-nodeapp"
+ dapr.io/id: "bindings-nodeapp2"
dapr.io/port: "3000"
spec:
containers:
pubsubと違って、それぞれのdapr.io/idに配布はされず、1つのpodにしか配信されていません。
Publish and subscribeでredisと通信する
では、pubsubを定義した場合はどうなるかを試していきましょう まずpubsubコンポーネントを定義します。 2019/12時点ではまだkafkaをサポートしていないため、redisを使います。
先にredisをhelmで入れておきます。
$ helm install dapr-redis --set password=hogehoge stable/redis
...
$ kubectl get po | grep redis
dapr-redis-master-0 1/1 Running 0 6h48m
dapr-redis-slave-0 1/1 Running 0 6h48m
dapr-redis-slave-1 1/1 Running 0 6h47m
bindingsと同様にredis-pubsubのComponentを定義します。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.redis
metadata:
- name: "redisHost"
value: "dapr-redis-master:6379"
- name: "redisPassword"
value: "hogehoge"
redisHostはredisのservice名とport, redisPasswordはhelmで作成時に指定したものです。
$ kubectl get svc | grep redis
dapr-redis-headless ClusterIP None <none> 6379/TCP 6h52m
dapr-redis-master ClusterIP 10.100.179.73 <none> 6379/TCP 6h52m
dapr-redis-slave ClusterIP 10.100.27.189 <none> 6379/TCP 6h52m
パスワードを平文で直書きになるので、本番環境などでは利用が難しいですね。 そのためDaprはSecretの読み込みもサポートしてます。
$ kubectl create secret generic redis-password --from-literal=password=hogehoge
secretを作成して、secretKeyRefで参照できます。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.redis
metadata:
- name: "redisHost"
value: "dapr-redis-master:6379"
- name: "redisPassword"
secretKeyRef:
name: redis-password
key: password
これは。PubSubだけでなく他の全てのDapr Componentで対応しているそうです。
https://github.com/dapr/docs/blob/master/concepts/components/secrets.md
Components can reference secrets for the spec.metadata section.
戻りまして、次にsubscribeするアプリを2つデプロイします。
- https://github.com/dapr/samples/blob/master/4.pub-sub/deploy/node-subscriber.yaml
- https://github.com/dapr/samples/blob/master/4.pub-sub/deploy/python-subscriber.yaml
こちらもマニフェスト自体は特に変わりなくで、アプリ側でいろいろ設定してます。 nodejsの方で確認してみます。
https://github.com/dapr/samples/blob/master/4.pub-sub/node-subscriber/app.js#L15-L20
app.get('/dapr/subscribe', (_req, res) => {
res.json([
'A',
'B'
]);
});
GETエンドポイントで上記のような定義をしています。
Daprはここにアクセスし、このPodがどのトピックをサブスクライブするかを把握します。
(この場合はA
, B
トピックをサブスクライブします。)
https://github.com/dapr/samples/blob/master/4.pub-sub/node-subscriber/app.js#L22-L30
app.post('/A', (req, res) => {
console.log("A: ", req.body);
res.sendStatus(200);
});
app.post('/B', (req, res) => {
console.log("B: ", req.body);
res.sendStatus(200);
});
実際にそれぞれのトピックからデータをsubscirbeしたときの処理は上になります。
A
,B
というパスがそのまま受け付けるトピック名になります。
そしてまた同様にcallerからcurlを投げてみます。
pubsubのエンドポイントは以下のようになっています。
localhost:3500/v1.0/publish/{topic名}
このエンドポイントにjson bodyをPOSTしてデータを送る挙動になります。
上のgifでは、A, B, Cの3つのtopicにデータを送っています。 deployしたnodeとpyhtonは, nodeがA,Bトピックを、 pythonがA,Cトピックをサブスクライブするようになっています。 実際に、Aは両方で、Bはnodeだけ、Cはpythonだけがサブスクライブするのが確認できました。
また、redisには以下のようにデータやキーが登録されていました。
127.0.0.1:6379> keys *
1) "C"
2) "B"
3) "A"
4) ""
127.0.0.1:6379> XRANGE A - + COUNT 1
1) 1) "1576240692974-0"
2) 1) "data"
2) "{\"id\":\"c1f507f6-d03d-4083-be1f-a2ae5ffa870e\",\"source\":\"caller\",\"type\":\"com.dapr.event.sent\",\"specversion\":\"0.3\",\"datacontenttype\":\"application/json\",\"data\":{\"sample\":\"aaaaaaaaaaaaa\"}}"
最後に、1つのアプリが複数pod存在するときの挙動も確認しておきます。
nodeappのreplica数を増やして、同様にcallerでpublishしてみます。
metadata:
name: node-subscriber
labels:
app: node-subscriber
spec:
- replicas: 1
+ replicas: 3
selector:
bindingsの時と違って、ラウンドロビンのような感じで負荷分散ができてます。 (これだけ見ると、どれか1つのpodだけがサブスクライブするように見えますが、daprが保証しているのはAt-Least-Onceのため、2つのpodが同じデータを重複して受け取る可能性はあります。その対応はアプリ側で実施する必要がありそうです)
まとめ
pubsubによる通信とbindingsによる通信の違いを確認しました。 あるコンポーネント(例えばredis)と通信する際は用途によって、pubsubかbindingsかを使い分ける必要があります。