社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  docker

用 Docker 快速搭建 Kafka 集群

马哥Linux运维 • 4 年前 • 490 次点击  

版本

JDK 14ZookeeperKafka

安装 Zookeeper 和 Kafka

Kafka 依赖 Zookeeper,所以我们需要在安装 Kafka 之前先拥有 Zookeeper。准备如下的 docker-compose.yaml 文件,将文件中的主机地址 192.168.1.100 替换成你自己的环境中的主机地址即可。

version: "3"services:  zookeeper:    image: zookeeper    build:      context: ./    container_name: zookeeper    ports:      - 2181:2181    volumes:      - ./data/zookeeper/data:/data      - ./data/zookeeper/datalog:/datalog      - ./data/zookeeper/logs:/logs    restart: always  kafka_node_0:    depends_on:      - zookeeper    build:      context: ./


    
    container_name: kafka-node-0    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 0      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:      - 9092:9092    volumes:      - ./data/kafka/node_0:/kafka    restart: unless-stopped  kafka_node_1:    depends_on:      - kafka_node_0    build:      context: ./    container_name: kafka-node-1    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 1      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093      KAFKA_LISTENERS


    
: PLAINTEXT://0.0.0.0:9093      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:      - 9093:9093    volumes:      - ./data/kafka/node_1:/kafka    restart: unless-stopped  kafka_node_2:    depends_on:      - kafka_node_1    build:      context: ./    container_name: kafka-node-2    image: wurstmeister/kafka    environment:      KAFKA_BROKER_ID: 2      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094      KAFKA_NUM_PARTITIONS: 3      KAFKA_DEFAULT_REPLICATION_FACTOR: 2    ports:      - 9094:9094    volumes:      - ./data/kafka/node_2:/kafka


    
    restart: unless-stopped

输入 docker-compose up -d 运行脚本文件进行集群构建。等待一会儿,得到如下结果即为成功。

SpringBoot 集成 Kafka 集群

创建一个全新的 SpringBoot 工程,在 build.gradle 文件中添加下列依赖。

dependencies {    ...    ...    implementation 'org.springframework.kafka:spring-kafka:2.5.2.RELEASE'    implementation 'com.alibaba:fastjson:1.2.71'}

1.在 application.properties 进行 Kafka 相关参数配置。

spring.kafka.bootstrap-servers=192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094spring.kafka.producer.retries=0spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value


    
-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100

2.创建消息体类。

public class Message {    private Long id;    private String message;    private Date sendAt;}

3.创建消息发送者

public class Sender {    @Autowired    private KafkaTemplate<String, String> kafkaTemplate;    public void send() {        Message message = new Message();        message.setId


    
(System.currentTimeMillis());        message.setMessage(UUID.randomUUID().toString());        message.setSendAt(new Date());        log.info("message = {}", JSON.toJSONString(message));        kafkaTemplate.send("test", JSON.toJSONString(message));    }}

4.创建消息接收者

public class Receiver {    @KafkaListener(topics = {"test"}, groupId = "test")    public void listen(ConsumerRecord, ?> record) {        Optional> message = Optional.ofNullable(record.value());        if (message.isPresent()) {            log.info("receiver record = " + record);            log.info("receiver message = " + message.get());        }    }}


    

5.测试消息队列

public class QueueController {    @Autowired    private Sender sender;    @PostMapping("/test")    public void testQueue() {        sender.send();        sender.send();        sender.send();    }}

得到如下日志即为集成成功。

到这里就我们就成功搭建了一个 Kafka 伪集群,并成功与 SpringBoot 进行整合。

如果你想了解Docker容器技术及应用,欢迎参加8月7日晚的公开课。本次公开课主要围绕Docker容器技术入门和应用实战,系统介绍Docker容器技术、在企业中应用、以及网络管理、储存卷管理和部署Nginx、PHP、MySQL、Redis实战等,干货满满。

作者:曾是然
链接:https://segmentfault.com/a/1190000022988499
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/72227
 
490 次点击