Docker 部署 Kafka
# 创建Docker网络
docker network create kafka-net
# 下载Kafka镜像
docker pull wurstmeister/kafka
# 创建并启动Zookeeper容器
docker run -d --name zookeeper -p 2181:2181 --network kafka-net wurstmeister/zookeeper
# 创建并启动第一个Kafka容器
docker run -d --name kafka1 -p 9092:9092 --network kafka-net -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
# 创建并启动第二个Kafka容器
docker run -d --name kafka2 -p 9093:9093 --network kafka-net -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 wurstmeister/kafka
# 创建并启动第三个Kafka容器
docker run -d --name kafka3 -p 9094:9094 --network kafka-net -e KAFKA_BROKER_ID=3 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 wurstmeister/kafka
# 验证容器运行状态
docker ps
# create topic
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 3 --replication-factor 1
# check topic
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic leaderboard_benchmark --partitions 3 --replication-factor 1
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic leaderboard_submit --partitions 3 --replication-factor 1
Docker 部署 MySQL
# my.conf
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
docker pull mysql:latest
docker run -d --name mysql \
--network kafka-net \
-p 3307:3306 \
-v G:\mysql\data:/var/lib/mysql \
-v G:\mysql\conf\my.cnf:/etc/mysql/my.cnf \
-e MYSQL_ROOT_PASSWORD=root \
mysql:latest
# 访问 mysql 方式一
mysql -h 127.0.0.1 -P 3307 -u root -p
# 访问 mysql 方式二
docker exec -it <container_name_or_id> bash
Docker 部署 maxwell
function process_row(row) {
if (row.database == "leaderboard") {
row.kafka_topic = "leaderboard_" + row.table;
} else if (row.database == "leaderboard_prod") {
row.kafka_topic = "leaderboard_prod_" + row.table;
} else if (row.database == "leaderboard_k8s") {
row.kafka_topic = "leaderboard_k8s_" + row.table;
} else {
row.suppress() // 抑制处理,即不放到 kafka 中
return;
}
// submit 和 job 只有 insert 语句才会插入到 kafka 中
switch (row.table) {
case "submit":
if (row.type != "insert") {
row.suppress()
}
break;
case "job":
if (row.type != "insert") {
row.suppress()
}
break;
default:
row.suppress()
}
}
docker run -d --name maxwell -e MAXWELL_USER=root -e MAXWELL_PASSWORD=root -e MAXWELL_HOST=mysql -e MAXWELL_PORT=3307 -p 7000:7000 zendesk/maxwell
docker run -d --name maxwell \
--network kafka-net \
-v G:\maxwell\maxwell.filter.js:/etc/maxwell.filter.js \
-v G:\maxwell\config.properties:/etc/maxwell.conf \
-p 7000:7000 \
zendesk/maxwell