Kafka ConnectでMirror Maker2.0を動かす
May 25, 2020
Mirror Maker2.0 とは
Mirror Makerとは, Kafka の機能の 1 つで
クラスタ間でのトピックのミラーリングを実現する機能です。
地理的に離れた場所にクラスタを用意して、トピックをミラーリングすることで地理的災害が発生した場合でも可用性を担保することができます。
Mirror Maker2.0 はKafka 2.4.0から導入された機能で従来の Mirror Maker で問題だった部分をいくつか解決したものになります。
主な更新内容は以下のリンクに記載されています。
Mirror Maker2.0 を動かす方法としては、以下があります。
- Mirror Maker 専用クラスタの立ち上げ
- Kafka Connect の Worker として動作させる
Kafka Connect で動作させるのについて、あまりドキュメントがなかったので、これの動作方法を以下でまとめてきます。
Mirror Maker 専用クラスタについては、ドキュメント通りにやれば動きます。
Kafka Connect とは
Kafka Connect とは Kafka とほかデータストアとの間でデータを移動するために、スケーラブルかつ信頼性の高い方法を提供するものです。
Kafka Connect を起動後に、task と呼ばれるデータの移動について記述したものを渡し実行することで、実現されます。
task には 2 種類(source, sink)があり、
- source は他のデータストア->kafka への移動(ex. ローカルのファイルの中身を kafka の topic へコピー)
- sink は kafka->他のデータストアへの移動(ex. kafka の topic の中身を ElasticSearch や MySQL などにコピー)
のようになってます。
Kafka Connect での Mirror Maker は source, sink のどちらもサポートしているようですが、今回は soruce を使って実現します。
イメージ的には以下のような感じになります。
Kafka Connect を使った Mirror Maker の実現
実際に動作確認をしていきます
各種バージョンは以下を利用します
ミドルウェア | バージョン |
---|---|
zookeeper | 3.5.7 |
kafka | 2.5.0 |
Mirror Maker2.0 が動くバージョン以降での最新安定版を選んでます
Docker で Kafka のマルチクラスタ
Kafka の用意は少し面倒なので、Docker で簡易なマルチクラスタを用意します。
以下のような docker-compose.yml でマルチクラスタ Kafka を立ち上げれます。
version: '3'
services:
# Cluster A
zookeeper-source:
image: 'bitnami/zookeeper:3.5.7'
expose:
- '2181'
volumes:
- 'zookeeper_data_source:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka-source:
image: 'bitnami/kafka:2.5.0'
expose:
- '9092'
volumes:
- 'kafka_data_source:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-source:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper-source
# Cluster B
zookeeper-dist:
image: 'bitnami/zookeeper:3.5.7'
expose:
- '2181'
volumes:
- 'zookeeper_data_dist:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka-dist:
image: 'bitnami/kafka:2.5.0'
expose:
- '9092'
volumes:
- 'kafka_data_dist:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-dist:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper-dist
volumes:
zookeeper_data_source:
driver: local
kafka_data_source:
driver: local
zookeeper_data_dist:
driver: local
kafka_data_dist:
driver: local
立ち上げておきます
$ docker-compose up -d
Kafka Connect のセットアップ
Kafka Connect をセットアップしていきます。
今回はkafka-source
クラスタで生成したメッセージをkafka-dist
クラスタに送りたいので、kafka-dist
側で Kafka Connect を起動して、kafka-source
からデータを mirror する設定を行います。
kafka-dist
に入りましょう
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------------------------------------
kafka-mirror-marker-20_kafka-dist_1 /opt/bitnami/scripts/kafka ... Up 9092/tcp
kafka-mirror-marker-20_kafka-source_1 /opt/bitnami/scripts/kafka ... Up 9092/tcp
kafka-mirror-marker-20_zookeeper-dist_1 /entrypoint.sh /run.sh Up 2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
kafka-mirror-marker-20_zookeeper-source_1 /entrypoint.sh /run.sh Up 2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
$ docker exec -u root -it kafka-mirror-marker-20_kafka-dist_1 bash
// コンフィグの編集用にvimを入れておく
# apt-get update && apt-get install -y jq vim
// kafkaの起動スクリプトや設定はいかにある
# cd /opt/bitnami/kafka/
// kafka connectは特に設定変える必要がないのでデフォの設定で起動する
# bin/connect-distributed.sh config/connect-distributed.properties
別ターミナルを開いて、またkafka-dist
コンテナに入って、正常に動いているか確認します
$ docker exec -u root -it kafka-mirror-marker-20_kafka-dist_1 bash
// kafka connectが動いてることを確認する
curl -s localhost:8083 | jq .
{
"version": "2.5.0",
"commit": "66563e712b0b9f84",
"kafka_cluster_id": "J-fS0SZ6QfyFQ-le4NuJEg"
}
Mirror Maker の Connector を動かす
task は REST で登録できます
// taskの登録
# curl -s --noproxy "*" -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors -d'{
"name": "mirror-maker-test",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "kafka-source",
"target.cluster.alias": "kafka-dist",
"source.cluster.bootstrap.servers": "kafka-source:9092",
"topics": ".*",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"schemas.enable": "false",
"admin.bootstrap.servers": "kafka-dist:9092"
}
}
' | jq .
公式ドキュメントとの変更点は以下です
{
"name": "mirror-maker-test",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "kafka-source",
"target.cluster.alias": "kafka-dist",
"source.cluster.bootstrap.servers": "kafka-source:9092",
"topics": ".*",
+ "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
+ "schemas.enable": "false",
+ "admin.bootstrap.servers": "kafka-dist:9092"
}
}
各種設定
config 名 | 概要 |
---|---|
connector.class | ここで、connector の種類を指定します。今回は mirrormaker を使うので、MirrorSourceConnector を指定します |
source.cluster.alias | source は mirror 元になるクラスタとなります。ここで指定した値がミラーされたトピック名になります..XXX.topic (ここのXXX がここの値になります) |
target.cluster.alias | target は mirror 先(つまりこのクラスタ自身)になります |
source.cluster.bootstrap.servers | source なので、mirror 元のクラスタを指定します. |
topics | mirror 対象となるトピック名を正規表現で指定します。.* だと全ての topic が mirror されます |
value.converter | ここで value を mirror するときに値を変換できます。Mirror するときは Base64 でエンコードされてるので、ByteArrayConverter でデコードしておきます |
schemas.enable | Mirror すると右のような schema に入ってます{"schema":{"type":"bytes","optional":false},"payload":"VfasagadfaSSdasgda=="} ここでは、value だけ取りたいので, false に設定しておきます |
admin.bootstrap.servers | これを設定しないとエラーが出ます、自身のクラスタの kafka サーバを指定します |
kafka-source で topic を生成して、mirror されるかチェックする
次は kafka-source に入って, topic を設定します
$ docker exec -u root -it kafka-mirror-marker-20_kafka-source_1 bash
# cd /opt/bitnami/kafka/
// topicを生成する
# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mirror-maker-test
// topicが生成されたことを確認する
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
connect-configs
connect-offsets
connect-status
mirror-maker-test
// topicにメッセージを入れておきます
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mirror-maker-test
>this is from source cluster
>foo
>test
>moge
...
一度 kafka connect プロセスを再起動しておきます
その後kafka-dist
コンテナに入って, 同様にトピックを生成します
$ docker exec -u root -it kafka-mirror-marker-20_kafka-dist_1 bash
# cd /opt/bitnami/kafka/
// topicを生成する
# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mirror-maker-test
// 今作成したトピックと、sourceからmirrorされたトピックが生成されていることを確認する
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
kafka-source.mirror-maker-test
mirror-maker-test
kafka-source.mirror-maker-test
というトピックが自動的に生成されています。
これが、source クラスタから mirror されたトピックです。
これを consume してみます
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-source.mirror-maker-test --from-beginning
test
this is from source cluster
foo
test
moge
さっき生成したメッセージが格納されてます。
mirror された topic と, 自分自身の topic を一緒に consume する
上で紹介したやり方だと 1 つの topic しか consume しかできません。
運用方法によってはkafka-source
とkafka-dist
で同じトピック名を使って、それぞれのクラスタでデータを集めて、両方のクラスタに回収したデータを consume したいということもあるでしょう
その場合は--whitelist
オプションで正規表現を使えば、自分自身の topic のデータと mirror された topic の両方を consume できます
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*mirror-maker-test' --from-beginning
上でも紹介したように、mirror-maker-test
という topic を mirror すると<mirror元clusterのalias>.<mirror対象のtopic名>
という名前で topic が生成されます。
ので.*<mirror対象のtopic名>
という正規表現で回収すればお k です。
これで回収すると以下のように, kafka-source
とkafka-dist
どちらで生成したメッセージも回収できます
まとめ
Kafka Connect で Mirror Maker2.0 を動作させました