侧边栏壁纸
  • 累计撰写 208 篇文章
  • 累计创建 16 个标签
  • 累计收到 5 条评论

目 录CONTENT

文章目录

kafka集群的搭建和维护

Wake
2022-08-07 / 0 评论 / 0 点赞 / 580 阅读 / 2,225 字

目标:

image-1659859050967
服务器环境 node1
192.168.2.43 node2 192.168.2.44 node3 192.168.2.45

软件环境: java openjdk version “1.8.0_222” (自行提前安装)

一. kafka原理 1.1
Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。

topic:消息存放的目录即主题 Producer:生产消息到topic的一方 Consumer:订阅topic消费消息的一方 Broker:Kafka的服务实例就是一个broker
image-1659859074127

二. 安装zookeeper node1,node2,node3都要下载和安装

下载版本: https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/

cd/opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
tar -zxvf  zookeeper-3.4.12.tar.gz
修改配置:
vim conf/zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper-3.4.12/data    ###根据实际情况修改
dataLogDir=/opt/zookeeper-3.4.12/log  ###根据实际情况修改
clientPort=2181
server.1=192.168.2.43:2888:3888
server.2=192.168.2.44:2888:3888
server.3=192.168.2.45:2888:3888
#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
#192.168.2.43-45为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888

配置文件解释:

  1. tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
  2. initLimit: 这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 52000=10 秒
  3. syncLimit: 这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是52000=10秒 dataDir: 快照日志的存储路径 dataLogDir: 事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多 clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

创建myid文件

#node1
echo "1" > /opt/zookeeper-3.4.12/data/myid

#node2
echo "2" > /opt/zookeeper-3.4.12/data/myid

#node3
echo "3" > /opt/zookeeper-3.4.12/data/myid

创建zookerper.service的systemd系统守护进程 node1,node2,node3都需要操作(这里也可以用supervisor)

tee zookeeper.service > /dev/null <<EOF
[Unit]
Description=ZooKeeper Service
Documentation=http://zookeeper.apache.org
Requires=network.target
After=network.target
[Service]
Type=forking
User=ubuntu
Group=ubuntu
ExecStart=/opt/zookeeper-3.4.12/bin/zkServer.sh start 
ExecStop=/opt/zookeeper-3.4.12/bin/zkServer.sh stop 
ExecReload=/opt/zookeeper-3.4.12/bin/zkServer.sh restart
WorkingDirectory=/var/log/zookeeper
EOF
cp zookeeper.service  /etc/systemd/system/zookeeper.service
systemctl enable zookeeper.service
systemctl start zookeeper.service 
systemctl status zookeeper.service

进入到Zookeeper的bin目录下 (3台都需要操作) cd /opt/zookeeper-3.4.12/bin 检查服务状态 ./zkServer.sh status

三. 安装kafka 3.1 创建目录并下载安装软件

#安装目录
cd /opt
#下载软件
wget http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz #解压软件
tar -zxvf kafka_2.11-2.1.1.tgz

3.2 修改配置文件 进入到config目录

cd /opt/kafka_2.11-2.1.1/config
vim server.properties   ###详细可以参考当前环境的配置
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样,建议node1为0,node2为1,node3为2
listeners=PLAINTEXT://192.168.2.43:9092 #kafka监听地址,根据本机ip填写
num.network.threads=5 #这个是borker进行网络处理的线程数,建议不大于CPU的2倍
num.io.threads=10  #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
num.partitions=6 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
zookeeper.connect=192.168.2.43:2181,192.168.2.44:2181,192.168.2.45:2181#设置zookeeper的连接端口
message.max.byte=5242880
default.replication.factor=3
replica.fetch.max.bytes=5242880
zookeeper.connection.timeout.ms=6000 #连接zookeeper的超时时间 
auto.leader.rebalance.enable=true

创建zookerper.service的systemd系统守护进程 node1,node2,node3都需要操作

tee kafkals.service > /dev/null <<EOF
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service
[Service]
Type=simple
User=ubuntu
Group=ubuntu
ExecStart=/opt/kafka_2.11-2.1.1/bin/kafka-server-start.sh /opt/kafka_2.11-2.1.1/config/server.properties
ExecStop=/opt/kafka_2.11-2.1.1/bin/kafka-server-stop.sh
WorkingDirectory=/var/log/kafka
[Install]
WantedBy=multi-user.target
EOF
cp kafka.service  /etc/systemd/system/kafka.service
systemctl enable kafka.service
systemctl start kafka.service 
systemctl status kafka.service

3.3 创建Topic来验证是否创建成功

#创建Topic
cd /opt/kafka_2.11-2.1.1/bin
kafka-topics.sh --create --zookeeper 192.168.2.43:2181 --replication-factor 3 --partitions 6 --topic test
#解释 --replication-factor 3 #复制两份 --partitions 6 #创建6个分区 --topic 主题为test 
### 在一台服务器上创建发布者
kafka-console-producer.sh --broker-list 192.168.2.43:9092 --topic test   ###从这里输入发送的消息

###新开一个终端,创建一个消费者,接收消息
kafka-console-consumer.sh --zookeeper 192.168.2.43:2181 --topic test --from-beginning

四、其他说明标注

4.1、日志说明

默认kafka的日志是保存在/opt/kafka/kafka_2.12-0.10.2.1/logs目录下的,这里说几个需要注意的日志

server.log #kafka的运行日志
state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

4.2上面的大家你完成之后可以登录zk来查看zk的目录情况

#使用客户端进入
cd /opt/zookeeper-3.4.12/bin ./zkCli.sh 
#查看目录情况 执行
[zk: 192.168.2.43:2181(CONNECTED) 1] ls /  #查看目录
[zk: 192.168.2.43:2181(CONNECTED) 1] get /brokers/ids/0 #查看broker
0

评论区