kafka集群的搭建

 

kafka定义

  1. Java 和 scala都是运行在JVM上的语言。
  2. erlang和最近比较火的和go语言一样是从代码级别就支持高并发的一种语言,所以RabbitMQ天生就有很高的并发性能,但是 有RabbitMQ严格按照AMQP进行实现,受到了很多限制。kafka的设计目标是高吞吐量,所以kafka自己设计了一套高性能但是不通用的协议,他也是仿照AMQP( Advanced Message Queuing Protocol   高级消息队列协议)设计的
  3. 事物的概念:在数据库中,多个操作一起提交,要么操作全部成功,要么全部失败。举个例子, 在转账的时候付款和收款,就是一个事物的例子,你给一个人转账,你转成功,并且对方正常行收到款项后,这个操作才算成功,有一方失败,那么这个操作就是失败的。 
    对应消在息队列中,就是多条消息一起发送,要么全部成功,要么全部失败。3个中只有ActiveMQ支持,这个是因为,RabbitMQ和Kafka为了更高的性能,而放弃了对事物的支持
  4. 集群:多台服务器组成的整体叫做集群,这个整体对生产者和消费者来说,是透明的。其实对消费系统组成的集群添加一台服务器减少一台服务器对生产者和消费者都是无感之的
  5. 负载均衡,对消息系统来说负载均衡是大量的生产者和消费者向消息系统发出请求消息,系统必须均衡这些请求使得每一台服务器的请求达到平衡,而不是大量的请求,落到某一台或几台,使得这几台服务器高负荷或超负荷工作,严重情况下会停止服务或宕机
  6. 动态扩容是很多公司要求的技术之一,不支持动态扩容就意味着停止服务,这对很多公司来说是不可以接受的
  7. kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript,可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)

 

 

 

kafka的基础概念

  1. 消费者:(Consumer):从消息队列中请求消息的客户端应用程序
  2. 生产者:(Producer)  :向broker发布消息的应用程序
  3. AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例

 

 

 

第一步

机器

我这边拿二台虚拟机,centos7的

主节点IP地址:192.168.0.153

副节点IP地址:192.168.0.154

 

JDK8

zookeeper是需要jdk8的

JDK8安装文章:http://www.linux91.cn/c/Liunx__centos7__JDK8%E5%AE%89%E8%A3%85

 

下载

我这边是先下载好的

版本:zookeeper-3.4.12

官方下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz

kafka下载,建议下载 kafka_2.11-2.1.0.tgz不要下载kafka-2.1.0-src.tgz

 

 

 

第二步

以下操作二台主机都要操作

 

解压:tar  -zxvf kafka_2.11-2.1.0.tgz

移动:mv kafka_2.11-2.1.0 /usr/local/kafka

 

修改配置文件

cd /usr/local/kafka/config/

vim server.properties

broker.id=1   ##集群的唯一标识,注意这里要改不一样,节点分别1,2 来标识

listeners=PLAINTEXT://192.168.0.153:9092  ##设置kafka的监控地址和端口

log.dirs=/usr/local/kafka/logs  ##设置日志文件地址

num.partitions=6   ##设置新创建的topic有多少分区,按照情况设置

log.retention.hours=60   ##配置kafka中消息保存的时间

zookeeper.connect=192.168.0.153:2181,192.168.0.154:2181  ##指定zookeeper的地址,可以设置多个

auto.create.topics.enable=true  ##设置是否自动创建topic

delete.topic.enable=true   ##设置提供删除topicr的功能
 
 ###以上的配置是要求修改的
以下是收集来的全配置信息,可以按自身情况修改

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

 

 

 

第三步

启动

 

启动

cd /usr/local/kafka/

nohup bin/kafka-server-start.sh config/server.properties &

这里是将kafka放入后台运行,启动后,会在启动kafka的当前目录下生成一个nohup.out文件

可以通过这文件查看kafka的启动状态和运行状态

 

测试

[root@localhost config]# jps
7013 QuorumPeerMain
7754 Jps
7070 Kafka

通过jps命令,可以看到有个kafka的标识,这就是kafka进程启动成功的标识

 

 

 

转载请注明原文链接:kafka集群的搭建

发表评论:

共有 0 条评论

 Top