1. 什么是Kafka?
- Apache Kafka是一个开源的消息系统,由Scala写成.
- Kafka是一个分布式消息队列, 生产者消费者的功能,类似与 JMS特性.
- Kafka对消息保存时根据Topic进行归类
- Kafka由多个实例构成,每个实例为一个broker
- 无论Kafka集群,producer,comsumer都依赖zookeeper来保存meta信息,保证系统的可用性.
优点:
- 提供Pub/Sub方式的海量消息处理。
- 以高容错的方式存储海量数据流。
- 保证数据流的顺序
2. JMS(java message system )
- 作用:用来 异构系统,集成通信,缓解系统瓶颈.. 通过生产消费模式(生产者,服务端,消费者)
- 点对点模式
- 发布/订阅模式
3. 消息队列
为什么需要使用消息队列?
- 解耦 异步 并行
- ack 数据发出到得到反馈
例: 当用户在支付宝注册账号时,可能系统需要对当前用户发送一下消息 –>发新手红包–>准备账号–>进行合法验证–>通知SNS.这时候是十分消耗时间的,可以是用消息队列,当用户发送注册请求时,直接反馈注册成功,在用户浏览旗下产品时,异步并行完成上述事情.
4. Kafka核心组件
broker: Kafka集群包含一个或多个服务器,其中服务器被叫做broker,broker不维护数据的消费状态,提高性能.直接使用硬盘进行存储
- Producer: 消息生产者
- consumer: 消息消费者
- Topic : 消息根据Topic进行归类(不同的Topic可以分开存储,逻辑上一个Topic消息虽然保存于一个或多个broker中,但用户只需要指定消息的Topic即可生产或消费数据,且不必担心数据的存储)
- Partition: 分区/分片
- Consumer Group: 每个consumer属于一个 CG
- zookeeper: 依赖集群保存meta信息
5. Kafka集群部署
- 5.1 环境准备
- zk集群
- 关闭防火墙
chkconfig iptables off && setenforce 0 - 创建用户
groupadd realtime && useradd realtime && usermod -a -G realtime realtime - 创建工作目录并赋权
mkdir /export
mkdir /export/servers
chmod 755 -R /export - 切换到 realtime 用户下
su realtime
5.2Kafka集群部署
下载安装包
- 下载地址:http://kafka.apache.org/downloads.html
在 linux 中使用 wget 命令下载安装包
wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
- 下载地址:http://kafka.apache.org/downloads.html
解压
- tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
cd /export/servers/
mv kafka_2.11-1.0.0 kafka
- tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
修改配置文件
cp /export/servers/kafka/config/server.properties
/export/servers/kafka/config/server.properties.bak
vi /export/servers/kafka/config/server.properties输入以下内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62broker 的全局唯一编号,不能重复
broker.id=0
用来监听链接的端口,producer 或 consumer 将在此端口建立连接
port=9092
处理网络请求的线程数量
num.network.threads=3
用来处理磁盘 IO 的线程数量
num.io.threads=8
发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
请求套接字的缓冲区大小
socket.request.max.bytes=104857600
kafka 运行日志存放的路径
log.dirs=/export/servers/logs/kafka
topic 在当前 broker 上的分片个数
num.partitions=2
用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
滚动生成新的 segment 文件的最大时间
log.roll.hours=168
日志文件中每个 segment 的大小,默认为 1G
log.segment.bytes=1073741824
周期性检查文件大小的时间
log.retention.check.interval.ms=300000
日志清理是否打开
log.cleaner.enable=true
broker 需要使用 zookeeper 保存 meta 数据
zookeeper.connect=192.168.52.106:2181,192.168.52.107:2181,192.168.52.108:2181
zookeeper 链接超时时间
zookeeper.connection.timeout.ms=6000
partion buffer 中,消息的条数达到阈值,将触发 flush 到磁盘
log.flush.interval.messages=10000
消息 buffer 的时间,达到阈值,将触发 flush 到磁盘
log.flush.interval.ms=3000
删除 topic 需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除
delete.topic.enable=true
此处的 host.name 为本机 IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01
分发安装包
cp -r /export/servers/kafka kafka02:/export/servers
然后分别在各机器上创建软连 cd /export/servers/
scp -r /export/servers/kafka kafka03:/export/servers 然后分别在各机器上创建软连
cd /export/servers/ 依次修改各服务器上配置文件的的 broker.id ,分别是 0,1,2 不得重复。
启动集群
- 依次在各节点上启动 kafka
nohup /export/servers/kafka/bin/kafka-server-start.sh
/export/servers/kafka/config/server.properties >/dev/null 2>&1 &
输出错误日志到黑洞
command >/dev/null 2>&1 &
- 依次在各节点上启动 kafka
Kafka常用操作命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25- 查看当前服务器中的所有 topic
bin/kafka-topics.sh --list --zookeeper zk01:2181
- 创建 topic
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
- 删除 topic
bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除或者直接重启。
- 通过 shell 命令发送消息
bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test
- 通过 shell 消费消息
bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test
- 查看消费位置
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group
testGroup
- 查看某个 Topic 的详情
bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181
- 对分区数进行修改
bin/kafka-topics.sh --zookeeper zk01 --alter --partitions 2 --topic test
6. Kafka JAVA API
略/…
7. Kafka文件存储机制
- 7.1 Kafka文件存储基本结构
- 在kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partition的命名规则为 topic+ 名称 +有序序号,第一个partition序号从0开始,序号的最大值为partitions数量减1
- 每个partiton(目录) 相当于一个巨型文件被平均分配到多个大小相等的segment 数据文件中. 每个段segment file消息数量不一定相等,这种特性方便old segment file被删除,默认保留七天数据.
- 每个partition 只需要支持顺序读写就可以,segment文件的生命周期由服务端配置参数决定.
- 顺序有序性讨论?
- 一个partition的数据是否有序? 间隔有序不连续
- 针对一个topic中的数据,只能做到partition内部有序,不能做到全局有序.特别是在加入到消费者.
- 如何保证消费者消费数据全局有序? 伪命题: 只有一个情况才能保证全局有序,只有一个partition
- 顺序有序性讨论?
- 7.2 Kafka Partition Segment
- 组成: 由两大部分组成 两个文件一一对应,后缀 “.index”,”.log” 分表表示为segment的索引文件和数据文件
- index file
- data file
- 命名规则: partion全局的第一个segment从0开始,,后续每个segment的文件名为上一个segment文件最后一条消息的offset值.数值最大为64为long大小,19位数字字符长度,没有数字用0补充.
- 索引文件存储大量元数据,数据文件存储大量消息,索引文件中的元数据指向对应数据文件中message的物理偏移量
- 组成: 由两大部分组成 两个文件一一对应,后缀 “.index”,”.log” 分表表示为segment的索引文件和数据文件
Kafka常见问题
Kafka是什么?
- 分布式消息队列,类似与JMS,典型的生产者消费者模式
- 生产者生产的消息存储在kafka集群中,集群由多个broker组成
- kafka的元数据保存在zookeeper中
针对topic为什么要进行分区?
- 一般情况下,正对海量数据,会将数据切分成多个块,存储在不同的机器上
- 如果一个topic的数据量特别大,应该提前划分好分区的个数
- producer根据分区数进行数据分发(分发策略 partitioner)
针对一个分区,为什么要添加副本?
- 保证数据完整性(容错)
- 添加副本的个数?
- 添加副本,因为数据要同步到不同的机器上,有大量的网络传输和磁盘占用.
- 根据业务需求对数据容错性,可以将副本数设置为N=2
一个分区在broker是以目录的形式存放的,为什么分区下会设置segment段?
- 消息队列系统,一般都是实时的,只能短时间保存数据.
- broker需要对分片的数据进行删除,按照一定的数据量来存储数据,方便根据数据最后修改时间进行删除.
Producer数据生产不丢失问题
- ACK(Acknowledgement) 确认字符. 数据发出到得到反馈
- 数据生产环节不丢失
- 同步模式
- 数据发送的三种状态
- 0: 只管发送,不需要反馈
- 1: 只要leader接收到数据,就任务成功
- -1: 所有副本收到,任务成功
- 将数据发送模式设置为-1 是最妥当的,但是考虑到所有副本收到所需要的时间过长,面对海量数据,效率会大大降低
- 一般做法将状态设置成1,提高效率,但是会有风险
- 数据发送的三种状态
- 异步模式
- 生产的数据不会立即发送给broker,而是在produer端有个容器(队列)来临时存储缓存数据
- 针对容器,阻塞设置,如果设置为0,则是立即丢弃数据,如果为-1,就永久阻塞
- 当设置为-1时,如果将机器KILL,会立即清空消息队列中的数据,导致数据丢失
- 同步模式