Kafka ConnectでMirror Maker2.0を動かす

Kafka ConnectでMirror Maker2.0を動かす

May 25, 2020
Middleware
kafka

Mirror Maker2.0 とは

Mirror Makerとは, Kafka の機能の 1 つで
クラスタ間でのトピックのミラーリングを実現する機能です。

地理的に離れた場所にクラスタを用意して、トピックをミラーリングすることで地理的災害が発生した場合でも可用性を担保することができます。

Mirror Maker2.0 はKafka 2.4.0から導入された機能で従来の Mirror Maker で問題だった部分をいくつか解決したものになります。
主な更新内容は以下のリンクに記載されています。

Mirror Maker2.0 を動かす方法としては、以下があります。

  • Mirror Maker 専用クラスタの立ち上げ
  • Kafka Connect の Worker として動作させる

Kafka Connect で動作させるのについて、あまりドキュメントがなかったので、これの動作方法を以下でまとめてきます。
Mirror Maker 専用クラスタについては、ドキュメント通りにやれば動きます。

Kafka Connect とは

Kafka Connect とは Kafka とほかデータストアとの間でデータを移動するために、スケーラブルかつ信頼性の高い方法を提供するものです。

Kafka Connect を起動後に、task と呼ばれるデータの移動について記述したものを渡し実行することで、実現されます。
task には 2 種類(source, sink)があり、

  • source は他のデータストア->kafka への移動(ex. ローカルのファイルの中身を kafka の topic へコピー)
  • sink は kafka->他のデータストアへの移動(ex. kafka の topic の中身を ElasticSearch や MySQL などにコピー)

のようになってます。

Kafka Connect での Mirror Maker は source, sink のどちらもサポートしているようですが、今回は soruce を使って実現します。

イメージ的には以下のような感じになります。
image

Kafka Connect を使った Mirror Maker の実現

実際に動作確認をしていきます
各種バージョンは以下を利用します

ミドルウェアバージョン
zookeeper3.5.7
kafka2.5.0

Mirror Maker2.0 が動くバージョン以降での最新安定版を選んでます

Docker で Kafka のマルチクラスタ

Kafka の用意は少し面倒なので、Docker で簡易なマルチクラスタを用意します。
以下のような docker-compose.yml でマルチクラスタ Kafka を立ち上げれます。

version: '3'
services:
  # Cluster A
  zookeeper-source:
    image: 'bitnami/zookeeper:3.5.7'
    expose:
      - '2181'
    volumes:
      - 'zookeeper_data_source:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka-source:
    image: 'bitnami/kafka:2.5.0'
    expose:
      - '9092'
    volumes:
      - 'kafka_data_source:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-source:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper-source

  # Cluster B
  zookeeper-dist:
    image: 'bitnami/zookeeper:3.5.7'
    expose:
      - '2181'
    volumes:
      - 'zookeeper_data_dist:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka-dist:
    image: 'bitnami/kafka:2.5.0'
    expose:
      - '9092'
    volumes:
      - 'kafka_data_dist:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-dist:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper-dist

volumes:
  zookeeper_data_source:
    driver: local
  kafka_data_source:
    driver: local
  zookeeper_data_dist:
    driver: local
  kafka_data_dist:
    driver: local

立ち上げておきます

$ docker-compose up -d

Kafka Connect のセットアップ

Kafka Connect をセットアップしていきます。
今回はkafka-sourceクラスタで生成したメッセージをkafka-distクラスタに送りたいので、kafka-dist側で Kafka Connect を起動して、kafka-sourceからデータを mirror する設定を行います。

kafka-distに入りましょう

$ docker-compose ps
                  Name                                 Command               State                   Ports
---------------------------------------------------------------------------------------------------------------------------
kafka-mirror-marker-20_kafka-dist_1         /opt/bitnami/scripts/kafka ...   Up      9092/tcp
kafka-mirror-marker-20_kafka-source_1       /opt/bitnami/scripts/kafka ...   Up      9092/tcp
kafka-mirror-marker-20_zookeeper-dist_1     /entrypoint.sh /run.sh           Up      2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
kafka-mirror-marker-20_zookeeper-source_1   /entrypoint.sh /run.sh           Up      2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp

$ docker exec -u root -it kafka-mirror-marker-20_kafka-dist_1 bash

// コンフィグの編集用にvimを入れておく
# apt-get update && apt-get install -y jq vim


// kafkaの起動スクリプトや設定はいかにある
# cd /opt/bitnami/kafka/

// kafka connectは特に設定変える必要がないのでデフォの設定で起動する
# bin/connect-distributed.sh config/connect-distributed.properties

別ターミナルを開いて、またkafka-distコンテナに入って、正常に動いているか確認します

$ docker exec -u root -it kafka-mirror-marker-20_kafka-dist_1 bash


// kafka connectが動いてることを確認する
 curl -s localhost:8083 | jq .
{
  "version": "2.5.0",
  "commit": "66563e712b0b9f84",
  "kafka_cluster_id": "J-fS0SZ6QfyFQ-le4NuJEg"
}

Mirror Maker の Connector を動かす

task は REST で登録できます

// taskの登録
# curl -s --noproxy "*" -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors -d'{
     "name": "mirror-maker-test",
     "config": {
     "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
     "source.cluster.alias": "kafka-source",
     "target.cluster.alias": "kafka-dist",
     "source.cluster.bootstrap.servers": "kafka-source:9092",
     "topics": ".*",
     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
     "schemas.enable": "false",
     "admin.bootstrap.servers": "kafka-dist:9092"
     }
}
' | jq .

公式ドキュメントとの変更点は以下です

{
     "name": "mirror-maker-test",
     "config": {
       "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
       "source.cluster.alias": "kafka-source",
       "target.cluster.alias": "kafka-dist",
       "source.cluster.bootstrap.servers": "kafka-source:9092",
       "topics": ".*",
+       "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
+       "schemas.enable": "false",
+       "admin.bootstrap.servers": "kafka-dist:9092"
     }
}

各種設定

config 名概要
connector.classここで、connector の種類を指定します。今回は mirrormaker を使うので、MirrorSourceConnectorを指定します
source.cluster.aliassource は mirror 元になるクラスタとなります。ここで指定した値がミラーされたトピック名になります..XXX.topic(ここのXXXがここの値になります)
target.cluster.aliastarget は mirror 先(つまりこのクラスタ自身)になります
source.cluster.bootstrap.serverssource なので、mirror 元のクラスタを指定します.
topicsmirror 対象となるトピック名を正規表現で指定します。.*だと全ての topic が mirror されます
value.converterここで value を mirror するときに値を変換できます。Mirror するときは Base64 でエンコードされてるので、ByteArrayConverterでデコードしておきます
schemas.enableMirror すると右のような schema に入ってます{"schema":{"type":"bytes","optional":false},"payload":"VfasagadfaSSdasgda=="}ここでは、value だけ取りたいので, falseに設定しておきます
admin.bootstrap.serversこれを設定しないとエラーが出ます、自身のクラスタの kafka サーバを指定します

kafka-source で topic を生成して、mirror されるかチェックする

次は kafka-source に入って, topic を設定します

$ docker exec -u root -it kafka-mirror-marker-20_kafka-source_1 bash

# cd /opt/bitnami/kafka/

// topicを生成する
# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mirror-maker-test

// topicが生成されたことを確認する
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
connect-configs
connect-offsets
connect-status
mirror-maker-test

// topicにメッセージを入れておきます
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mirror-maker-test
>this is from source cluster
>foo
>test
>moge
...

一度 kafka connect プロセスを再起動しておきます
その後kafka-distコンテナに入って, 同様にトピックを生成します

$ docker exec -u root -it kafka-mirror-marker-20_kafka-dist_1 bash

# cd /opt/bitnami/kafka/

// topicを生成する
# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mirror-maker-test

// 今作成したトピックと、sourceからmirrorされたトピックが生成されていることを確認する
# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
kafka-source.mirror-maker-test
mirror-maker-test

kafka-source.mirror-maker-testというトピックが自動的に生成されています。
これが、source クラスタから mirror されたトピックです。

これを consume してみます

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-source.mirror-maker-test --from-beginning
test
this is from source cluster
foo
test
moge

さっき生成したメッセージが格納されてます。

mirror された topic と, 自分自身の topic を一緒に consume する

上で紹介したやり方だと 1 つの topic しか consume しかできません。

運用方法によっては
kafka-sourcekafka-distで同じトピック名を使って、それぞれのクラスタでデータを集めて、両方のクラスタに回収したデータを consume したいということもあるでしょう

その場合は--whitelistオプションで正規表現を使えば、自分自身の topic のデータと mirror された topic の両方を consume できます

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*mirror-maker-test' --from-beginning

上でも紹介したように、mirror-maker-testという topic を mirror すると<mirror元clusterのalias>.<mirror対象のtopic名>という名前で topic が生成されます。
ので.*<mirror対象のtopic名>という正規表現で回収すればお k です。

これで回収すると以下のように, kafka-sourcekafka-distどちらで生成したメッセージも回収できます

mirrormaker-blog用

まとめ

Kafka Connect で Mirror Maker2.0 を動作させました