Kafka

  • 2019-09-19
  • 80
  • 0
  • 1

kafka简介

  • kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
  • kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序

kafka功能

支持分布式,支持分区,支持复制的消息系统

kafka的组成

生产者

生产消息并将消息发送给生产者

  • kafka-server : 消息队列,接收生产者发送的消息,并将这些消息推送给消费者
  • topic: 主题,可根据不同消息的类型创建不同的主题
  • partition: 对主题进行分区,同时也是消息的载体,用来存放消息,是一个日志文件,可以对文件中的消息设置保存时间,在该时间内即使消息被消费也不会删除,所以每个数据都会有自己的偏移量,表名自己所在的位置
  • offset偏移量,用来标明消息所在的位置,同时消费者也可以通过偏移量来指定要消费的消息

消费者组

一个消费者组对于kafka节点来说相当于一个逻辑的消费者,消费者组由多个消费者实例组成,要求同一个组中的消费者实例不能消费同一个分区中的消息,但是不同组的消费者实例可以消费同一组中的消息

  • zookeeper: kafka所依赖的协调程序,用来记录kafka节点的信息,以及主题分区信息和消费者使用的偏移量信息

kafka总体数据流程图

大概用法就是,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。

实验环境

角色 kafka1 kafka2 kafka3
IP地址 192.168.1.11 192.168.1.22 192.168.1.33

实验步骤

部署zookeeper集群

1、安装zookeeper

[root@localhost ~]# tar -zxvf zookeeper-3.3.6.tar.gz
[root@localhost ~]# mv zookeeper-3.3.6 /usr/local/zookeeper
[root@localhost ~]# ln -s /usr/local/zookeeper/bin/* /usr/local/bin/
[root@localhost ~]# cd /usr/local/zookeeper/
[root@localhost zookeeper]# cd conf/
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg

2、修改zookeeper配置文件

[root@localhost conf]# vim zoo.cfg 
# The number of milliseconds of each tick
tickTime=2000        # 集群中节点间发送心跳包的间隔时间,单位毫秒,可自定义
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10        # 集群中新添加的节点的初始化时间,为10个心跳包的时间也就是10个2s,可自定义
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5        # 节点间通信的超时等待时间,为5个心跳包的时间,可自定义
# the directory where the snapshot is stored.
dataDir=/usr/local/zookeeper/data            # zookeeper存放数据的目录
dataLogDir=/usr/local/zookeeper/datalog      # zookeeper存放日志的目录
# the port at which the clients will connect
clientPort=2181        # zookeeper对外提供服务的端口
# 在最后添加
server.1=192.168.1.11:2888:3888
server.2=192.168.1.22:2888:3888
server.3=192.168.1.33:2888:3888
  • server.id: 集群中节点的标识符,id可以是任意数值
  • 192.168.1.11:每个节点的IP地址
  • 2888: 节点间通信使用的端口号
  • 3888: 节点间选举主节点使用的端口号

3、创建存放数据目录

[root@localhost conf]# cd /usr/local/zookeeper/
[root@localhost zookeeper]# mkdir data datalog

4、创建myid(分别对应自己的ID)

[root@localhost zookeeper]# vim data/myid
1

其他两台服务器做相同配置!!!

关闭防火墙、沙盒

[root@localhost ~]# systemctl stop firewalld.service 
[root@localhost ~]# setenforce 0

启动zookeeper(三台服务器)

[root@localhost ~]# zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... already running as process 67238.

查看zookeeper集群状态

[root@localhost ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader

安装kafka

[root@localhost ~]# tar -zxvf kafka_2.11-1.0.1.tgz
[root@localhost ~]# mv kafka_2.11-1.0.1 /usr/local/kafka
[root@localhost ~]# ln -s /usr/local/kafka/bin/* /usr/local/bin/
[root@localhost ~]# cd /usr/local/kafka/
[root@localhost kafka]# cd config/

编辑kafka配置文件

[root@localhost config]# vim server.properties
 21 broker.id=0        # kafka节点标识ID
 31 listeners=PLAINTEXT://192.168.1.11:9092
 60 log.dirs=/usr/local/kafka/data
 61 message.max.byte=10240000
 62 default.replication.factors=3
 63 replica.fetch.max.bytes=10240000        # 消费者每次消费最大字节
126 zookeeper.connect=192.168.1.11:2181,192.168.1.22:2181,192.168.1.33:2181

创建kafka日志存放目录

[root@localhost config]# cd /usr/local/kafka/
[root@localhost kafka]# mkdir data

启动kafka

[root@localhost kafka]# cd bin/
[root@localhost bin]# ./kafka-server-start.sh  -daemon /usr/local/kafka/config/server.properties

创建主题consume

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.11:2181 --partitions 1 --replication-factor 1 --topic consume
[2019-09-19 01:31:57,050] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "consume".

查看日志

[root@localhost bin]# cd ../data/consume-0/
[root@localhost consume-0]# ls
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
[root@localhost consume-0]# cat 00000000000000000000.log 
[root@localhost consume-0]#

第二台kafka模拟生产者推送消息

[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.1.11:9092 --topic consume 

>this is my second message
>www.shuaiguoer.com
>pan.shuaiguoer.com
>

第一台kafka查看日志

[root@localhost consume-0]# cat 00000000000000000000.log 
Q`�4�mEp��mEp����������������>2this is my second messageJ���mEsnmEsn��������������0$www.shuaiguoer.comJ�4 mEu՚mEu՚��������������0$pan.shuaiguoer.com

第三台kafka模拟消费者

[root@localhost bin]# ./kafka-console-consumer.sh --zookeeper 192.168.1.11:2181 --topic consume --from-beginning

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2019-09-19 01:39:54,419] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
[2019-09-19 01:39:54,437] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
[2019-09-19 01:39:54,506] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
[2019-09-19 01:39:54,725] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
this is my second message
www.shuaiguoer.com
pan.shuaiguoer.com

评论

还没有任何评论,你来说两句吧

发表评论

:zhenbang: :yiwen: :yinyue: :yinxian: :yi: :xxyl: :xinsui: :xiaonian: :weiqu: :tushe: :tu: :taiyang: :taikaixin: :shuijiao: :shenli: :shengqi: :ruo: :qianbi: :qian: :pen: :neng: :mianqiang: :meigui: :lu: :liwu: :kuanghan: :ku: :kaixin: :jinya: :jiku: :huaxin: :huaji: :hu: :heixian: :hehe: :han: :haha: :haha2: :guai: :dangao: :damuzhi: :daku: :chabei: :caihong: :bugaoxing: :bishi: :aixin: :a: :OK: :) :(