当前位置:职场发展 > Kafka集群部署

Kafka集群部署

  • 发布:2023-10-06 23:56

  Kafka是一个由Apache软件基金会开发、用Scala和Java编写的开源流处理平台。 Kafka是一个高吞吐量的分布式发布订阅消息系统,可以处理网站中消费者的所有动作流数据。由于吞吐量要求,这些数据通常通过处理日志和日志聚合来处理。对于像Hadoop这样的日志数据和离线分析系统来说,这是一个可行的解决方案,但需要实时处理约束。 Kafka的目的是通过Hadoop的并行加载机制来统一线上和线下消息处理,并通过集群提供实时消息。 Kafka是一个高吞吐量的分布式发布订阅消息系统:

  本博客主要讲解:单节点单Broker部署、单节点多Broker部署、集群部署(多节点多Broker)。实际生产环境中常用的第三种方法是集群部署Kafka。 Kafka更多地依赖于zookeeper集群。如果要使用Kafka,必须部署zookeeper集群。 Kafka中的消费偏移信息、kafka集群、主题信息都会存储在ZK中。

在部署集群之前,需要先部署zookeeper集群。直接按照zookeeper3.5.5集群部署进行部署并启动。

1。 Kafka单节点部署

准备:

关闭防火墙:systemctl stopfirewalld.service
禁止防火墙自动启动:systemctldisablefirewalld.service

关闭selinux:setenforce 0
禁用selinux启动:vim /etc/selinux/config

SELINUX=禁用

各节点主机分析:

10.0.0.11node0110.0.0.12节点02
10.0.0.13 节点03

1.Kafka单节点单Broker部署及使用

下载:http://www.sychzs.cn 或

wget http://www.sychzs.cn/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz

安装

焦油
ln -s /usr/local/src/kafka_2.12-2.2.1 /usr/local/kafka

配置kafka

参考官网:http://www.sychzs.cn/quickstart

mkdir -p /data/kafka/logs

添加环境变量

echo -e 'export KAFKA_HOME=/usr/local/kafka\nexport PATH=$KAFKA_HOME/bin:$PATH' >>/etc/profile
来源/etc/profile

进入kafka的config目录,在server.properties文件中添加以下配置

vim /usr/local/kafka/config/server.properties

#broker id全球唯一
www.sychzs.cn=0
#监视器
端口=9092 
#日志目录
log.dirs=/data/kafka/logs
#配置zookeeper连接Zookeeper.connect=node01:2181

启动kafka

www.sychzs.cn $KAFKA_HOME/config/server.properties

打印的日志信息没有错误。您可以看到以下信息

[2019-06-16 17:44:14,757]信息 [KafkaServer id=0]启动(kafka.server.KafkaServer)

但是并不能保证Kafka已经启动成功。输入jps即可查看进程。如果能看到Kafka进程就说明启动成功了

[root@node01 ~]# jps
8577QuorumPeerMain
11000Jps
1899 -- 进程信息不可用
10637卡夫卡
[root@node01 ~]# jps -m
8577QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg
1899 -- 进程信息不可用
10637Kafka 配置/server.properties
11022 Jps -m

创建主题

www.sychzs.cn --create --zookeeper node01:2181 --复制因子1 --分区1 --主题测试

参数说明:
  --zookeeper:指定kafka连接zk的连接url。该值与server.properties文件中的配置项{zookeeper.connect}相同
  --replication-factor:指定副本数量
  --partitions:指定分区数量
  - -topic:主题名称

删除主题

www.sychzs.cn --delete --zookeeper node01:2181 --主题测试

查看所有主题信息

www.sychzs.cn --list --zookeeper node01:2181
测试

开始制作人

kafka-console- www.sychzs.cn --broker-list node01:9092 --主题测试

启动消费者

www.sychzs.cn --bootstrap-server node01:9092 --来自-开始--主题测试
1 {IMG_1:Ahr0CHM6LY9PBWCYMDE4LMNUYMXVZ3MUY2JSB2JSB2CVMTQ2OTIWMY8YMDE5MDYVMTQ2OTIWMYMDE5MDYXNJMTQ1MTIYMI0XODC4MZMXL nbuzw ==/}

备注

  --from-beginning:如果表示从头开始消费数据,则新旧数据都会被消费。如果没有该参数,则只会消耗新生成的数据。

2.Kafka单节点多Broker部署及使用

配置Kafka

参考官网:http://www.sychzs.cn/quickstart

复制server.properties 3次

cd /usr/local/kafka/config/
cp server.properties 服务器-1.properties
cp server.properties 服务器-2.properties
cp server.properties 服务器-3.properties

修改server-1.properties

vim /usr/local/kafka/config/server-1.properties

#broker id全球唯一
www.sychzs.cn=1
#监视器
端口=9093 
#日志目录
log.dirs=/data/kafka/logs-1
#配置zookeeper连接
Zookeeper.connect=node01:2181

修改server-2.properties

vim /usr/local/kafka/config/server-2.properties

#broker id全球唯一
www.sychzs.cn=2
#监视器
端口=9094 
#日志目录
log.dirs=/data/kafka/logs-2
#配置zookeeper连接
Zookeeper.connect=node01:2181

修改server-3.properties

vim /usr/local/kafka/config/server-3.properties

#broker id全球唯一
www.sychzs.cn=3
#监视器
端口=9095 #日志目录
log.dirs=/data/kafka/logs-3
#配置zookeeper连接
Zookeeper.connect=node01:2181

创建日志文件夹

mkdir -p /data/kafka/{logs-1,logs-2,logs-3}

启动Kafka(分别启动server1、2、3)

www.sychzs.cn $KAFKA_HOME/config/server-1.properties
kafka-www.sychzs.cn $KAFKA_HOME/config/server-2.properties
kafka-www.sychzs.cn $KAFKA_HOME/config/server-3.properties

查看进度

[root@node01 ~]# jps
8577QuorumPeerMain
18353Jps
17606卡夫卡
1899 -- 进程信息不可用
17243卡夫卡
17964卡夫卡
[root@node01 ~]# jps -m
8577QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg
17606Kafka /usr/local/kafka/config/server-2.properties
1899 -- 进程信息不可用
17243Kafka /usr/local/kafka/config/server-1.properties
18363 Jps -m17964Kafka /usr/local/kafka/config/server-3.properties

创建主题(指定份数为3)

www.sychzs.cn --create --zookeeper node01:2181 --复制因子 3 --分区 1 --topic wzxmt

查看所有主题信息

www.sychzs.cn --list --zookeeper node01:2181

查看某个主题的详细信息

www.sychzs.cn --describe --zookeeper node01:2181 --topic wzxmt

开始制作人

kafka-console- www.sychzs.cn --broker-list node01:9093,node01:9094,node01:9095 --topic wzxmt

启动消费者

www.sychzs.cn --bootstrap-server node01:2181 --来自-开始--topic wzxmt

单节点多Broker容错测试

Kafka支持容错。以上我们就完成了Kafka单节点和多个Blocker的部署。接下来我们来测试一下Kafka的容错能力。测试步骤如下

(1)。查看主题的详细信息,观察哪个blocker的角色是leader,哪个blocker的角色是follower

www.sychzs.cn --describe --zookeeper node01:2181 --topic wzxmt

(2)。手动杀死任何状态为 follower 的 Broker,并测试生成和消费的信息是否正确

在第1步中,可以看到1是leader,2和3是follower,杀掉follower 2的进程

开始生产和消费者测试,看看信息是否正确

结论:杀死任何状态为follower的broker,生成和消费的信息将是正确的,不会受到任何影响。

(3)。手动杀死状态为leader的broker,测试生产和消费信息是否正确

broker1 的角色是领导者。杀死它,经纪人 3 就成为领导者。

开始生产和消费者测试,看看信息是否正确。

结论:杀掉状态为leader的broker,生产消费信息正确

总结:无论当前broker是leader还是follower,我们kill掉它之后,只要有一个broker能够正常使用,消息仍然可以正常生产和发送。也就是说,Kafka的容错性是有保证的!

2。 Kafka多节点部署(多节点、多个broker)

在多个节点上安装和部署。部署方式与之前的单节点相同。

修改配置文件server.properties

vim /usr/local/kafka/config/server.properties

#broker全球唯一编号,不能重复,只能是数字
www.sychzs.cn=1
#这里的www.sychzs.cn是本机IP(重要),如果不改则客户端会抛出:Producerconnection to node01:9092 unsuccessful 错误!
www.sychzs.cn=10.0.0.11
#用于监控连接的端口。生产者或消费者将在此端口上建立连接
port=9092 ?区域大小
socket.send.buffer.bytes=102400
#接收套接字缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字缓冲区大小 socket.request.max .bytes=104857600
#Kafka消息存储路径(持久化到磁盘)
log.dirs=/data/kafka/logs
#当前broker上的topic分片数量
num.partitions=2
#data下用于恢复和清理数据的线程数
num.recovery.threads.per.data.dir=1
#segment文件保留最大时间,超时就会删除
log.retention.hours=168
#滚动生成新Segment文件的最大时间
log.roll.hours=168
#日志文件中每次的每个Segment的大小,默认为1G
log.segment.bytes=1073741824
#定期检查文件大小的时间
www.sychzs.cn=300000
#日志是否清理Open
log.cleaner.enable=true
#broker需要使用zookeeper保存元数据
zookeeper.connect=node01:2181,node02:2181,node03:2181
#zookeeper链接超时
www.sychzs.cn=6000
#分区缓冲区中,当消息数量达到阈值时,进行一次flush将被触发。磁盘
log.flush.interval.messages=10000
#当消息缓冲时间达到阈值时,会触发刷新到磁盘
www.sychzs.cn=3000
#删除topic需要服务器在.properties中设置delete.topic.enable=true,否则只会标记删除
delete.topic.enable=true
# 延迟初始用户重新平衡时间(3用于生产)
组.initial.rebalance .www.sychzs.cn=0
#broker 可以接收的最大字节数
message.max.bytes=2000000000
#broker 可以复制的最大字节数
副本.fetch。 max.bytes=2000000000
#消费端最大可读消息
fetch.message.max.bytes=2000000000

不同节点之间只需修改server.properties的www.sychzs.cn即可。 www.sychzs.cn 不能相同。

确保zookeeper集群正常运行并启动服务

www.sychzs.cn -daemon $KAFKA_HOME/config/server.properties

  #注意“&”或“-daemon”在后台运行,不占用当前窗口

检查Kafka是否启动成功。输入jps查看进程。如果能看到Kafka进程就说明启动成功了

创建主题

www.sychzs.cn --create --zookeeper node01:2181 --replication-factor 3 --partitions 2 --topic www

查看主题详情

kafka-主题.sh -- 描述--zookeeper node01:第2181章

开始制作人

[root@node01 ~]# kafka-console- www.sychzs.cn --broker-list node01:9092,node02:9092,node03:9092 --topic www

启动 2 个消费者

[root@node02 bin]# www.sychzs.cn --bootstrap-server node02:9092 --来自-开始 --主题 www[root@node03 bin]# kafka-www.sychzs.cn --bootstrap-server node03:9092 --来自-开始 --topic www

  Kafka集群和单节点多Broker测试相同,结果相同;从Kafka单节点多Broker和多节点多Broker容错测试来看,结论是:

   不管当前的broker是leader还是follower,我们杀死它之后,只要有一个broker可以正常使用,消息仍然可以正常生产和发送。也就是说,Kafka的容错性是有保证的!

相关文章

最新资讯