ITPub博客

首页 > 大数据 > Hadoop > Kafka高可用集群部署与配置指南

Kafka高可用集群部署与配置指南

原创 Hadoop 作者:jaymarco 时间:2020-10-16 00:03:57 0 删除 编辑

一、Kafka   简介

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

Kafka主要设计目标如下:

1.    以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。

2.    高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。

3.    支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。

4.    同时支持离线数据处理和实时数据处理。

5.    Scale out:支持在线水平扩展

2          Kafka 的特性

·          高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。

·          可扩展性:kafka集群支持热扩展

·          持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

·          容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

·          高并发:支持数千个客户端同时读写

3          Kafka 的使用场景

·          日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

·          消息系统:解耦和生产者和消费者、缓存消息等。

·          用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

·          运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

·          流式处理:比如spark streaming和storm

4          专业术语

术语

术语含义

ZooKeeper

开源分布式应用程序协调服务

Broker

Kafka   集群包含一个或多个服务器,服务器节点称为broker

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic

Partition

topic中的数据分割为一个或多个partition

Producer

生产者即数据的发布者,该角色将消息发布到Kafka的topic中

Consumer

消费者可以从broker中读取数据

Consumer   Group

每个Consumer属于一个特定的Consumer Group

Leader

每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition

Follower

Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

5          集群架构

5.1      架构图

Kafka集群中包含若干Producer,若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

5.2      环境准备

5.2.1      虚拟机准备

准备三台虚拟机用于部署zookeeper和Kafka 集群,要求硬件配置标准一样。

主机名

IP 地址

用途

192.168.56.129

Kafkanode1

Zookeeper, Kafka 节点1

192.168.56.130

Kafkanode2

Zookeeper, Kafka 节点2

192.168.56.131

Kafkanode3

Zookeeper, Kafka 节点3

5.2.2      J DK 软件部署规划

JDK安装部署规划如下表:

I P 地址

部署目录

192.168.56.129

/opt/jdk1.8.0_221

192.168.56.130

/opt/jdk1.8.0_221

192.168.56.131

/opt/jdk1.8.0_221

5.2.3      Zookeeper 环境部署规划

Zookeeper集群部署前规划出消息端口、通信端口和部署目录如下:

I P 地址

消息端口

通信端口

ServerID 标识

部署目录

192.168.56.129

2181

2888:3888

1

/opt/zookeeper

192.168.56.130

2181

2888:3888

2

/opt/zookeeper

192.168.56.131

2181

2888:3888

3

/opt/zookeeper

5.2.4      Kafka 环境部署规划

Kafka集群部署前规划出集群通讯端口、协议通信端口、控制台端口、集群名和部署目录如下:

I P 地址

监听端口

节点目录

192.168.56.129

PLAINTEXT://192.168.58.129:9092

/opt/kafka_cluster/node1

192.168.56.130

PLAINTEXT://192.168.58.130:9092

/opt/kafka_cluster/node2

192.168.56.131

PLAINTEXT://192.168.58.131:9092

/opt/kafka_cluster/node3

5.2.5      软件部署分布

Kafka集群部署是各节点需要安装的软件分布如下:

Ip 地址

软件名

192.168.56.129

jdk1.8.0_221    zookeeper-3.4.14    kafka-2.5.0

192.168.56.130

jdk1.8.0_221    zookeeper-3.4.14    kafka -2.5.0

192.168.56.131

jdk1.8.0_221    zookeeper-3.4.14    kafka -2.5.0

5.2.6      软件来源获取

Kafka集群部署所需要软件来源下载地址如下:

软件名

版本号

来源地址

JDK

1.8.0_242

https://download.oracle.com/otn/java/jdk/8u251-b08/3d5a2bb8f8d4428bbe94aed7ec7ae784/jdk-8u221-linux-x64.tar.gz

zookeeper

3.4.14

https://downloads.apache.org/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

kafka

5.15.12

https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

ZooInspector


https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

6          集群实施部署

6.1      JDK 软件安装

分别在3台Kafka节点/opt目录下面安装JDK软件,将JDK软件包直接解压到目标安装路径下,配置好环境变量并让其变量生效成功。

cd /opt 目录下解压软件

tar xvf jdk-8u221-linux-x64.tar.gz

 

最后需要修改环境变量,通过命令vim /etc/profile vim 编辑器来编辑profile 文件,在文件末尾添加一下内容:

export JAVA_HOME=/opt/jdk1.8.0_221

export JRE_HOME=${JAVA_HOME}/jre

export   CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH

export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin

export PATH=$PATH:${JAVA_PATH}

 

执行如下命令生效jdk 环境变量

source /etc/profile

 

[root@node1 opt]# java -version

openjdk version "1.8.0_242"

OpenJDK Runtime Environment (build   1.8.0_242-b08)

OpenJDK 64-Bit Server VM (build   25.242-b08, mixed mode)

 

到此JDK 软件安装成功!

6.2      ZooKeeper 集群搭建

6.2.1      ZK 软件安装

分别在3台Kafka节点/opt目录下面安装zookeeper软件,将zk软件包直接解压到目标安装路径下,配置好环境变量并让其变量生效成功。

tar zxvf zookeeper-3.4.14.tar.gz   -C /opt

cd /opt && mv zookeeper-3.4.8   zookeeper

cd zookeeper

cp conf/zoo_sample.cfg conf/zoo.cfg

 

# zookeeper 加入到环境变量,3 台主机都需要执行

echo -e "# append zk_env\nexport   PATH=$PATH:/opt/zookeeper/bin" >> /etc/profile

 

3 台主机上面执行生效命令

Source /etc/profile

6.2.2      ZK 集群配置

1、 配置文件修改

修改节点1上面的zk配置文件zoo.cfg,详细配置参数内容如下:

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/opt/zookeeper/data

dataLogDir=/opt/zookeeper/logs

clientPort=2181

autopurge.purgeInterval=24

autopurge.snapRetainCount=500

server.1= 192.168.58.129:2888:3888

server.2= 192.168.58.130:2888:3888

server.3= 192.168.58.131:2888:3888

 

并将节点1配置文件zoo.cfg拷贝到节点2和节点3上面。

在三台节点上面创建如下目录

mkdir -p /opt/zookeeper/{logs,data}

 

2、 ZK 配置参数说明

n   tickTime:Client-Server通信心跳时间

Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。tickTime以毫秒为单位。

n   initLimit:Leader-Follower初始通信时限
集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。
initLimit=5

n   syncLimit:Leader-Follower同步通信时限

集群中的 follower 服务器与 leader 服务器之间请求和应答之间能容忍的最多心跳数( tickTime 的数量)。
syncLimit=2

n   dataDir:数据文件目录

Zookeeper 保存数据的目录,默认情况下, Zookeeper 将写数据的日志文件也保存在这个目录里。 dataDir= /opt/zookeeper/data

n   clientPort:客户端连接端口

客户端连接 Zookeeper 服务器的端口, Zookeeper 会监听这个端口,接受客户端的访问请求。 clientPort=2181

n   6.服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)

这个配置项的书写格式比较特殊,规则如下:
server.N=YYY:A:B

server.1=zknode01:2888:3888
server.2= zknode02:2888:3888
server.3= zknode03:2888:3888

 

3、 ZK 服务创建 ServerID 标识

在三个虚拟机下的data文件夹下创建三个myid文件,并且三个文件里面分别写入1,2,3

#192.168.58.129 节点1 上面创建myid 1

echo "1" >   /opt/zookeeper/data/myid

#192.168.58.130 节点1 上面创建myid 2

echo "2" >   /opt/zookeeper/data/myid

#192.168.58.131 节点1 上面创建myid 3

echo "3" > /opt/zookeeper/data/myid

6.2.3      ZK 集群启动

l   启动 ZK 节点

/opt/zookeeper/bin/zkServer.sh start

l   停止 ZK 节点

/opt/zookeeper/bin/zkServer.sh stop

l   查看 ZK 状态

/opt/zookeeper/bin/zkServer.sh status

l   日志路径

/opt/zookeeper/logs

6.2.4      ZK 集群测试

模拟ZK集群切换,停掉Leader节点让ZK自动发生选举到其它节点。

上图Leader节点在192.168.58.130,其它两个节点都是follower状态。

模拟将192.168.58.130节点leader服务发生故障宕机,观察ZK集群是否会自动选举到其它两个follower节点之一作为leader节点。当leader服务主节点已经被停止后,ZK最终选举了follower节点192.168.58.131为leader,选举成功,ZK集群正常工作。后面再将宕机的那台主机恢复回来后,此节点ZK的状态就变成了被选举状态follower。至此说明ZK集群切换成功。

6.3      Kafka 集群搭建

Kafka集群安装采集节点克隆安装,先将kafka安装到其中一个节点,然后将配置修改后,直接将节点1上面的kafka以克隆方式COPY到其他两个节点。

6.3.1      kafka 软件安装

选择节点一台kafka主机服务器节点/opt目录下面安装kafka软件,安装完后并配置好环境变量并让其变量生效成功。

选择其中一台kafkanode1 部署

mkdir   /opt/kafka-cluster/

tar zxvf kafka_2.12-2.5.0.tgz -C /opt/kafka-cluster/

cd /opt/ kafka-cluster/ && mv kafka node1

6.3.2      kafka 配置修改

修改kafka配置文件/opt/kafka-cluster/node1/config/server.properties ,注意只需要修改如下三项配置,broker.id listeners zookeeper.connect

broker.id=1

listeners=PLAINTEXT://192.168.58.129:9092

zookeeper.connect=192.168.56.129:2181,192.168.58.130:2181,192.168.58.131:2181

 

注意:以上kafka节点1已经配置完成。

6.3.3      克隆 kafka 节点

1 、克隆 kafka 节点 2

将kafka节点1上面的kafka安装软件目录远程拷贝到远程节点2

#从节点1上面拷贝kafka软件到节点2

scp -r /opt/kafka-cluster/   192.168.58.130:/opt

cd    /opt/kafka-cluster/

mv   node1 node2

 

# 修改配置文件server.properties ,只需要修改broker.id listeners 两个参数

原来如下:

###############################################

broker.id=1

listeners=PLAINTEXT://192.168.58.129:9092

###############################################

 

修改为:

###############################################

broker.id=2

listeners=PLAINTEXT://192.168.58.130:9092

###############################################

 

2 、克隆 kafka 节点 3

将kafka节点1上面的kafka安装软件目录远程拷贝到远程节点3

#从节点1上面拷贝kafka软件到节点3

scp -r /opt/kafka-cluster/   192.168.58.131:/opt

cd    /opt/kafka-cluster/

mv   node1 node3

 

# 修改配置文件server.properties ,只需要修改broker.id listeners 两个参数

原来如下:

###############################################

broker.id=1

listeners=PLAINTEXT://192.168.58.129:9092

###############################################

 

修改为:

###############################################

broker.id=3

listeners=PLAINTEXT://192.168.58.131:9092

###############################################

 

到此kafka 集群已经配置完成!

6.3.4      kafka 集群启动

分别在三台kafka节点上面用命令去启停服务

l   启动kafka服务

./kafka-server-start.sh -daemon  ../config/server.properties

l   停止kafka服务

./kafka-server-stop.sh  -daemon  ../config/server.propertiesp

l   查看kafka日志

/opt/kakfa-cluster/node1/logs/kafkaServer.out

7          高可用集群测试

待补充


8          Kafka 性能调优

8.1      Broker 参数配置

1、网络和io操作线程配置优化

#   broker处理消息的最大线程数(默认为3)

num.network.threads=cpu核数+1

#   broker处理磁盘IO的线程数

num.io.threads=cpu核数*2

2、log数据文件刷盘策略  

#   每当producer写入10000条消息时,刷数据到磁盘

log.flush.interval.messages=10000

#   每间隔1秒钟时间,刷数据到磁盘

log.flush.interval.ms=1000

3、日志保留策略配置

#   保留三天,也可以更短   (log.cleaner.delete.retention.ms)

log.retention.hours=72

#   段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件

log.segment.bytes=1073741824

4、Replica相关配置

default.replication.factor:3

#   这个参数指新创建一个topic时,默认的Replica数量,Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。

8.2      Java API 调优

1、zookeeper.session.timeout.ms

解释:配置的超时时间太短,Zookeeper没有读完Consumer的数据,连接就被Consumer断开了!

参数:5000

2、zookeeper.sync.time.ms

解释:ZooKeeper集群中leader和follower之间的同步的时间

参数:2000

3、auto.commit.enable=true

解释:注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交  

4、auto.commit.interval.ms

解释:自动提交offset到zookeeper的时间间隔

参数:1000

5、zookeeper.connection.timeout.ms

解释:确认zookeeper连接建立操作客户端能等待的最长时间

参数:10000

6、rebalance.backoff.ms

解释:消费均衡两次重试之间的时间间隔

参数:2000

7、rebalance.max.retries

解释:消费均衡的重试次数

参数:10

 

9          客户端配置

9.1      Producer 客户端配置

<!-- 定义producer的参数 -->

    <bean   id="producerProperties" class="java.util.HashMap">  

        <constructor-arg>  

            <map>

                <!-- 配置kafka的broke -->

                <entry   key="bootstrap.servers" value="192.168.172.129:9092"/>  

                <!-- 配置组-->

                <entry   key="group.id" value="group1"/>

                <entry   key="acks" value="all"/>

                 <entry   key="retries" value="10"/>

                <entry   key="batch.size" value="16384"/>

                <entry   key="linger.ms" value="1"/>

                <entry   key="buffer.memory" value="33554432"/>

                <entry key="key.serializer"   value="org.apache.kafka.common.serialization.StringSerializer"/>  

                <entry   key="value.serializer"   value="org.apache.kafka.common.serialization.StringSerializer"/>  

            </map>

        </constructor-arg>  

    </bean>

 

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->

    <bean   id="producerFactory"   class="org.springframework.kafka.core.DefaultKafkaProducerFactory">  

        <constructor-arg>  

            <ref   bean="producerProperties"/>

        </constructor-arg>  

    </bean>

 

    <!-- 创建kafkatemplate   bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->

    <bean   id="kafkaTemplate"   class="org.springframework.kafka.core.KafkaTemplate">

        <constructor-arg   ref="producerFactory"/>

        <constructor-arg   name="autoFlush" value="true"/>

        <property   name="defaultTopic" value="test1"/>

    </bean>

  9.2      Consumer 客户端配置

<!-- 定义consumer的参数 -->

    <bean id="consumerProperties"   class="java.util.HashMap">

        <constructor-arg>  

            <map>

                <!-- 配置kafka的broke -->

                <entry   key="bootstrap.servers" value="192.168.172.129:9092"/>  

                <!-- 配置组-->

                 <entry   key="group.id" value="group1"/>

                <entry   key="enable.auto.commit" value="true"/>

                <entry   key="auto.commit.interval.ms" value="1000"/>

                <entry   key="session.timeout.ms" value="30000"/>

                <entry   key="key.deserializer"   value="org.apache.kafka.common.serialization.StringDeserializer"/>  

                <entry   key="value.deserializer"   value="org.apache.kafka.common.serialization.StringDeserializer"/>  

            </map>

         </constructor-arg>  

    </bean>

 

    <!-- 创建consumerFactory   bean -->

    <bean   id="consumerFactory"   class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">  

        <constructor-arg>  

            <ref   bean="consumerProperties"/>

        </constructor-arg>  

    </bean>

 

    <!-- 实际执行消息消费的类 -->

    <bean   id="messageListernerConsumerService"   class="com.netease.hdone.repay.vo.ConsumerListener"/>

 

    <!-- 消费者容器配置信息 -->

    <bean   id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">  

        <!-- 重要!配置topic   -->

        <constructor-arg   value="test1"/>

        <property   name="messageListener"   ref="messageListernerConsumerService"/>

    </bean>

 

    <!-- 创建kafkatemplate   bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->

    <bean   id="messageListenerContainer"   class="org.springframework.kafka.listener.KafkaMessageListenerContainer"   init-method="doStart">

        <constructor-arg   ref="consumerFactory"/>

        <constructor-arg   ref="containerProperties"/>

    </bean>

 

 

有需要的朋友可以关注我的公众号,文章每日一更



来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/28833846/viewspace-2727299/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论
负责数据库、中间件、大数据等基础软件建设、优化和业务保障工作。具有10年的电信与银行企业一线/二线运维管理经验。目前专注研究云计算、中间件和数据库等领域技术研究。持有Oracle OCP、weblogic OCP、Docker容器、PGCE和阿里云ACP等认证

注册时间:2020-06-22

  • 博文量
    65
  • 访问量
    27666