文章目录
分布式消息队列Kafka
架构
组成架构
* 好处
* Api
命令行操作
* Java Api
* 1.producer
* )异步提交 * )同步提交
* .Consumer
* )自动提交
* )手动提交
* )自定义存储offset
* .自定义Interceptor
* 运行机制
生产者
* .分区的原因 * .分区原则
* .数据的可靠性
* .ISR机制
* .ack应答机制
* .数据的一致性保证(HW、LEO机制)
* .exactly机制()
* .server.properties配置说明
* .producer配置文件说明
* .consumer配置文件说明
* 消费者
* .消费方式
* .分区分配策略
* .offset的维护
* . 消费者组
* kafka高效读写数据
* .顺序写磁盘
* .零复制拷贝
* kafka事务
* .zookeeper在kafka中的作用
* .producer事务
* .customer事务
分布式消息队列Kafka
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的日志系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于的特性,但是在实现上完全不同,此外它并不是规范的实现。对消息保存时根据进行归类,发送消息者成为消息接受者成为此外集群有多个实例组成,每个实例成为。无论是集群,还是和都依赖于来保证系统可用性集群保存一些信息
架构
组成架构
Producer:消息生产者,就是向 kafka broker 发消息的客户端;
2)Consumer:消息消费者,向 kafka broker 取消息的客户端;
Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
Topic:可以理解为一个队列, 生产者和消费者面向的都是一个 topic ;
Partition :为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个partition ,每个 partition 是一个有序的队列;
Replica :副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
leader :每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
follower: 每个分区多个副本中的从,实时从中同步数据,保持和数据的同步。发生故障时,某个follower
好处
可靠性:分布式的,分区,复制和容错的。
可扩展性:kafka消息传递系统轻松缩放,无需停机。
耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。
性能:kafka对于发布和订阅消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。
kafka非常快:保证零停机和零数据丢失。
Api
命令行操作
1.查看所有topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
2.创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic
first
3.删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
4.启用produce发送消息
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
5.消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first //读取当前的消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first //读取所有的消息(包含之前的消息)
6.查看某个topic的信息
bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
7.修改分区信息
bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
Java Api
1.producer
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程—main线程和Sender线程,以及一个线程共享变量—RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafkabroker。
1)异步提交
需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord 对象
producer的send方法默认是异步,由两个重载方法组成。带回调函数的方法参数为RecordMetadata 和 Exception。如果Exception为null,说明没有产生异常。
异步提交失败会自动重试。
2)同步提交
send方法返回的是一个Future对象,我们可以利用这一点来实现同步提交。如:send().get(),若提交没有返回结果,则会阻塞当前线程。
2.Consumer
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。
1)自动提交
需要用到的类:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个ConsumerRecord 对象
我们可以通过设置两个参数来实现自动提交
enable.auto.commit:是否开启自动提交 offset 功能
auto.commit.interval.ms:自动提交 offset 的时间间隔
2)手动提交
手动提交需要先将自动提交关闭。一般使用异步的,因为同步提交会阻塞线程,降低吞吐量
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,
commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
3)自定义存储offset
Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
要实现自定义存储 offset需要用到ConsumerRebalanceListener类。
3.自定义Interceptor
自定义拦截器中的方法
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息从RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
(4)close:
关闭 interceptor,主要用于执行一些资源清理工作.
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
运行机制
生产者
1.分区的原因
(1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个Partition 组成,因此整个集群就可以适应任意大小的数据了;
(2 )可以提高并发,因为可以以Partition 为单位读写了。
2.分区原则
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
(1)指明 partition的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到 partition 值;
(3)都未指定,使用partitionround-robin策略
3.数据的可靠性
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个partition 收到producer
发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer
收到 ack,就会进行下一轮的发送,否则重新发送数据。
4.ISR机制
Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader
发生故障之后,就会从 ISR 中选举新的 leader。
5.ack应答机制
0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到,但是还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1:等待的leader的落盘成功后返回,如果在同步成功之前故障,那么将会丢失数据
-1(all):producer 等待 broker的 ack,partition 的 leader和 follower 全部落盘成功后才返回 ack。但是如果在 follower同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复。
6.数据的一致性保证(HW、LEO机制)
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO
(1) follower 故障
follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
(2) leader 故障
leader 发生故障之后,会从中选出一个新的,之后,为保证多个副本之间的数据一致性,其余的会先将各自的文件高于的部分截掉,然后从新的同步数据
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复.
7.exactly机制()
1) kafka的数据消费模型:
exactly once:消费且仅消费一次
at least once:最少消费一次 出现数据重复消费的问题
at most once : 至多消费一次 出现数据丢失的问题
2) 0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的Exactly Once 语义。即:
At Least Once + 幂等性 (开启幂等性需要将 Producer 的参数中 enable.idompotence 设置为 true)
3) 开启幂等性后,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
8.server.properties配置说明
broker.id
num.network.threads
num.io.threads
socket.send.buffer.bytes
socket.receive.buffer.bytes
socket.request.max.bytes
log.dirs/export/data/kafka/
num.partitions
num.recovery.threads.per.data.dir
log.retention.hours
log.roll.hours
log.segment.bytes
log.retention.check.interval.ms
log.cleaner.enabletrue
zookeeper.connectzk01:2181,zk02:2181,zk03:2181
zookeeper.connection.timeout.ms
log.flush.interval.messages
log.flush.interval.ms
delete.topic.enabletrue
host.namekafka01
advertised.host.name.140.128
9.producer配置文件说明
metadata.broker.listnode01:9092,node02:9092,node03:9092
compression.codecnone
serializer.classkafka.serializer.DefaultEncoder
request.required.acks
request.timeout.ms
也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.typesync
queue.buffering.max.ms
queue.buffering.max.messages
batch.num.messages
queue.enqueue.timeout.ms-1
message.send.max.retries
topic.metadata.refresh.interval.ms
10.consumer配置文件说明
zookeeper.connectzk01:2181,zk02:2181,zk03:2181
zookeeper.session.timeout.ms
zookeeper.connection.timeout.ms
zookeeper.sync.time.ms
group.iditcast
auto.commit.enabletrue
auto.commit.interval.ms
conusmer.idxxx
client.idxxxx
queued.max.message.chunks
rebalance.max.retries
fetch.min.bytes
fetch.wait.max.ms
socket.receive.buffer.bytes
auto.offset.resetsmallest
derializer.classkafka.serializer.DefaultDecoder
消费者
1.消费方式
kafka支持pull(拉)和push(推)两种方式。但是push有可能会导致customer来不及消费,导致网络拥堵或者拒绝连接。pull由消费者自己选择拉取的数据内容、大小,但如果生产者没有数据,可能会导致死循环,所以我们通常设置一个超时时间来返回。
2.分区分配策略
1.RoundRobin 按照消费者组轮询选择消费的分区
2.Range 按照topic分配分区
3.offset的维护
Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,
consumer 默认将 保存在 一个内置的 中,该 topic 为 consumer_offsets。
1)修改配置文件 consumer.properties
exclude.internal.topics=false
2)读取 offset
0.11.0.0 之前版本:
bin/kafka-console-consumer.sh –topic __consumer_offsets – zookeeper hadoop102:2181 --formatter “kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter” --consumer.config config/consumer.properties --from-beginning
0.11.0.0 之后版本(含):
4. 消费者组
需要修改 consumer.properties 中的group.id
kafka高效读写数据
1.顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
2.零复制拷贝
kafka事务
1.zookeeper在kafka中的作用
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。
2.producer事务
为了跨分区、跨会话的事务,我们需要引入一个事务id,并将producer获得的pid与之绑定。我们可以通过Transaction Coordinator来获得事务的状态以及将事务写入一个事务topic。
3.customer事务
customer并没有严格的事务,因为customer可以访问任意信息,无法保证commit的事务被精准消费。同一事务的消息重启后可能会被删除。
还没有评论,来说两句吧...