下载安装包 http://kafka.apache.org/downloads.html
我下载的版本是 kafka_2.12-2.0.0.tgz
解压安装包 1 2 3 $ tar -zxvf /export/software/ kafka_2.12-2.0.0.tgz -C /export/servers/ $ cd /export/servers/$ ln -s kafka_2.12-2.0.0 kafka
修改配置文件 1 2 $ cp /export/servers/kafka/config/server.properties /export/servers/kafka/config/server.properties.bak$ vi /export/servers/kafka/config/server.properties
参考的配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 // broker的全局唯一标识 broker.id=0 // 用来监听链接的端口,producer或consumer将在此端口建立链接 port=9092 // 如果不配 使用终端broker list 发送数据会报错 listeners=PLAINTEXT://192.168.1.10:9092 // 处理网络请求的线程数量 num.network.threads=3 // 处理磁盘io的线程数量 num.io.threads=8 // 发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 // 接收套接字的缓冲区域大小 socket.receive.buffer.bytes=102400 // 请求套接字的缓冲区大小 socket.request.max.bytes=104857600 // 日志存放目录 log.dirs=/export/servers/logs/kafka // topic在当前broker上的分片数 num.partitions=2 // 用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 // segment文件保留的最长时间,超时将会被删除 offsets.topic.replication.factor=1 // 滚动生成新的segment文件的最大时间 log.retention.hours=168 log.roll.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 // zokeeper的链接端口 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000# false 时执行topic delete 时进行逻辑删除 delete.topic.enable=true
分发安装包 1 2 3 4 5 6 7 8 $ scp -r /export/servers/ kafka_2.12-2.0.0 kafka02:/export/servers // 然后分别在各机器上创建软连$ cd /export/servers/$ ln -s kafka_2.12-2.0.0 kafka
再次修改配置文件依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复。
运行下面指令启动kafka集群
1 $ bin/kafka-server-start.sh config/server.properties
server-start.sh config/server.properties
1 2 3 4 5 6 7 # 查看当前服务器中的所有topic ```shell$ bin/kafka-topics.sh --list --zookeeper zk01:2181
创建topic
1 $ ./kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 3 --topic first
删除topic
1 $ sh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
通过shell命令发送消息
1 $ kafka-console-producer.sh --broker-list 192.168.1.12:9092 --topic first
如果报错参考文章最下面的报错信息
通过shell消费消息
1 $ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --from-beginning --topic first
查看消费位置
1 $ sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
查看某个Topic的详情
1 $ sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181
困难记录 运行bin/kafka-console-producer.sh –broker-list cor3:9092 –topic first发送消息时
错误:ERROR Error when sending message to topic first with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for first-0: 1625 ms has passed since batch creation plus linger time
解决办法:
修改配置文件中
1 listeners=PLAINTEXT://192.168.1.11:9092
运行
1 bin/kafka-console-producer.sh --broker-list 192.168.1.11:9092 --topic first
参考blog https://blog.csdn.net/lvbinibnsb/article/details/81542893