Kafka 日志存储的问题

在进行详解之前,我想先声明一下,本次我们进行讲解说明的是 Kafka 消息存储的信息文件内容,不是所谓的 Kafka 服务器运行产生的日志文件,这一点希望大家清楚。

Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。每个主题又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。也就是该文要着重关注的内容。我们根据如下的图进行进一步说明:

001

图中,创建了一个 demo-topic 主题,其存在 7 个 Parition,对应的每个 Parition 下存在一个 [Topic-Parition] 命名的消息日志文件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的文件,比如:.index、.timestamp、.log、.snapshot 等,其中,文件名一致的文件集合就称为 LogSement。我们先留有这样的一个整体的日志结构概念,接下来我们一一的进行详细的说明其中的设计。

LogSegment

我们已经知道分区日志文件中包含很多的 LogSegment ,Kafka 日志追加是顺序写入的,LogSegment 可以减小日志文件的大小,进行日志删除的时候和数据查找的时候可以快速定位。同时,ActiveLogSegment 也就是活跃的日志分段拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限。

日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log 三种类型。其他的日志类型功能作用,请查询下面图表:

类别 作用
.index 偏移量索引文件
.timestamp 时间戳索引文件
.log 日志文件
.snaphot 快照文件
.deleted
.cleaned 日志清理时临时文件
.swap Log Compaction 之后的临时文件
Leader-epoch-checkpoint

每个 LogSegment 都有一个基准偏移量,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特别说明一下,如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121,偏移量是从 0 开始的。

如果想要查看相应文件内容可以通过 kafka-run-class.sh 脚本查看 .log :

/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

002

2.0 中可以使用 kafka-dump-log.sh 查 看.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index

日志与索引文件

配置项 默认值 说明
log.index.interval.bytes 4096 (4K) 增加索引项字节间隔密度,会影响索引文件中的区间密度和查询效率
log.segment.bytes 1073741824 (1G) 日志文件最大值
log.roll.ms 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,毫秒维度
log.roll.hours 168 (7天) 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,小时维度
log.index.size.max.bytes 10485760 (10MB) 触发偏移量索引文件或时间戳索引文件分段字节限额

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。

Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,他并不保证每一个消息在索引文件中都有对应的索引项。每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,通过修改 log.index.interval.bytes 的值,改变索引项的密度。

切分文件

从上文中可知,日志文件和索引文件都会存在多个文件,组成多个 SegmentLog,那么其切分的规则是怎样的呢?

当满足如下几个条件中的其中之一,就会触发文件的切分:

  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes 参数的默认值为 1073741824,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的默认值为 10485760,即 10MB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量。
为什么是 Integer.MAX_VALUE ?

在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。相对偏移量和物理地址。

相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节

物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节

4 个字节刚好对应 Integer.MAX_VALUE ,如果大于 Integer.MAX_VALUE ,则不能用 4 个字节进行表示了。

索引文件切分过程

索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值,当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。

查找消息

offset 查询

偏移量索引由相对偏移量和物理地址组成。

003

可以通过如下命令解析.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
offset:0 position:0
offset:20 position:320
offset:43 position:1220

注意:offset 与 position 没有直接关系哦,由于存在数据删除和日志清理。

004

e.g. 如何查看 偏移量为 23 的消息?

Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在 00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即 offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

时间戳方式查询

在上文已经有所提及,通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。

时间戳索引索引格式

005

006+

 

e.g. 查找时间戳为 1557554753430 开始的消息?

  • 将 1557554753430 和每个日志分段中最大时间戳 largestTimeStamp 逐一对比,直到找到不小于 1557554753430 所对应的日志分段。日志分段中的 largestTimeStamp 的计算是先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于 0 ,则取该值,否则去该日志分段的最近修改时间。
  • 找到相应日志分段之后,使用二分法进行定位,与偏移量索引方式类似,找到不大于 1557554753430 最大索引项,也就是 [1557554753420 430]。
  • 拿着偏移量为 430 到偏移量索引文件中使用二分法找到不大于 430 最大索引项,即 [20,320] 。
  • 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。

注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的哦。因为数据的写入是各自追加。

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是 CreateTime 则无法保证顺序。

日志清理

日志清理,不是日志删除哦,这还是有所区别的,日志删除会在下文进行说明。

Kafka 提供两种日志清理策略:

日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除

日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。

Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值:delete,还可以选择 compact。

是否支持针对具体的 Topic 进行配置?

答案是肯定的,主题级别的配置项是 cleanup.policy 。

日志删除

配置 默认值 说明
log.retention.check.interval.ms 300000 (5分钟) 检测频率
log.retention.hours 168 (7天) 日志保留时间小时
log.retention.minutes 日志保留时间分钟
log.retention.ms 日志保留时间毫秒
file.delete.delay.ms 60000 (1分钟) 延迟执行删除时间
log.retention.bytes -1 无穷大 运行保留日志文件最大值
log.retention.bytes 1073741824 (1G) 日志文件最大值

Kafka 会周期性根据相应规则进行日志数据删除,保留策略有 3 种:基于时间的保留策略、基于日志大小的保留策略和基于日志其实偏移量的保留策略。

基于时间

日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天,log.retention.ms 优先级最高。

如何查找日志分段文件中已经过去的数据呢?

Kafka 依据日志分段中最大的时间戳进行定位,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间呢?

因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。

删除过程
  1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。
  2. 这些日志分段所有文件添加 上 .delete 后缀。
  3. 交由一个以 "delete-file" 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过 file.delete.delay.ms 进行设置

如果活跃的日志分段中也存在需要删除的数据时?

Kafka 会先切分出一个新的日志分段作为活跃日志分段,然后执行删除操作。

基于日志大小

日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日志分段的大小由 log.regment.bytes 进行设定。

删除过程

  1. 计算需要被删除的日志总大小 (当前日志文件大小-retention值)。
  2. 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
  3. 执行删除。

基于日志起始偏移量

基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段。

注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。

007

删除过程
  • 从头开始遍历每一个日志分段,日志分段 1 的下一个日志分段的起始偏移量为 11,小于 logStartOffset,将 日志分段 1 加入到删除队列中
  • 日志分段 2 的下一个日志分段的起始偏移量为 23,小于 logStartOffset,将 日志分段 2 加入到删除队列中
  • 日志分段 3 的下一个日志分段的起始偏移量为 30,大于 logStartOffset,则不进行删除。

kafka常见问题

1 启动advertised.listeners配置异常:

java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
    at scala.Predef$.require(Predef.scala:277)
    at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
    at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
    at kafka.Kafka$.main(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala)

1.1 解决方法:修改server.properties

advertised.listeners=PLAINTEXT://{ip}:9092  # ip可以内网、外网ip、127.0.0.1 或域名

1.2 解析:

server.properties中有两个listeners。 listeners:启动kafka服务监听的ip和端口,可以监听内网ip和0.0.0.0(不能为外网ip),默认为java.net.InetAddress.getCanonicalHostName()获取的ip。advertised.listeners:生产者和消费者连接的地址,kafka会把该地址注册到zookeeper中,所以只能为除0.0.0.0之外的合法ip或域名 ,默认和listeners的配置一致。

2 启动PrintGCDateStamps异常

[0.004s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/data/service/kafka_2.11-0.11.0.2/bin/../logs/kafkaServer-gc.log instead.
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

2.1 解决方法: 更换jdk1.8.x版本或者使用>=kafka1.0.x的版本。

2.2 解析:

只有在jdk1.9并且kafka版本在1.0.x之前的版本才会出现。

3 生成者发送message失败或消费者不能消费(kafka1.0.1)

#(java)org.apache.kafka警告
Connection to node 0 could not be established. Broker may not be available.


# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ TimeoutError: Request timed out after 30000ms
    at new TimeoutError (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
    at Timeout.setTimeout [as _onTimeout] (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:737:14)
    at ontimeout (timers.js:466:11)
    at tryOnTimeout (timers.js:304:5)
    at Timer.listOnTimeout (timers.js:264:5) message: 'Request timed out after 30000ms' }

3.1 解决方法: 检查advertised.listeners的配置(如果有多个Broker可根据java版本的对应的node号检查配置),判断当前的网络是否可以连接到地址(telnet等)

4 partitions配置的值过小造成错误(kafka1.0.1)

#(java)org.apache.kafka(执行producer.send)
Exception in thread "main" org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
    at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:908)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:778)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
    at com.wenshao.dal.TestProducer.main(TestProducer.java:36)


# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ BrokerNotAvailableError: Could not find the leader
    at new BrokerNotAvailableError (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\errors\BrokerNotAvailableError.js:11:9)
    at refreshMetadata.error (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:831:16)
    at D:\project\node\kafka-test\src\node_modules\kafka-node\lib\client.js:514:9
    at KafkaClient.wrappedFn (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:379:14)
    at KafkaClient.Client.handleReceivedData (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\client.js:770:60)
    at Socket.<anonymous> (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:618:10)
    at Socket.emit (events.js:159:13)
    at addChunk (_stream_readable.js:265:12)
    at readableAddChunk (_stream_readable.js:252:11)
    at Socket.Readable.push (_stream_readable.js:209:10) message: 'Could not find the leader' }

4.1 解决方法: 修改num.partitions的值,partitions在是在创建topic的时候默认创建的partitions节点的个数,只对新创建的topic生效,所有尽量在项目规划时候定一个合理的值。也可以通过命令行动态扩容()

./bin/kafka-topics.sh --zookeeper  localhost:2181 --alter --partitions 2 --topic  foo

 

Apache Kafka

Kafka说明

Apache Kafka® 是一个分布式流媒体平台,这是什么意思呢?

分布式流媒体平台它有三大特性:

  • 发布与订阅流媒体数据,类似于消息队列或企业消息传递系统
  • 能够容错并持久性存储流媒体数据
  • 处理流媒体数据

Kafka通常适用于两大类应用场景:

  • 在系统或应用程序之间构建实时数据流管道,使其可靠地获取数据
  • 在应用程序中对数据流进行实时转换或响应

要了解Kafka如何做这些事情,让我们深入探讨Kafka的能力。

首先是几个概念:

  • Kafka作为一个集群运行在一个或多个可跨多个数据中心的服务器上。
  • Kafka集群以称为topics类别存储记录流。
  • 每条记录流都由一个键、值和时间戳组成。

Kafka有四个核心API:

  • 生产者(Producer API )允许应用程序发布一条数据记录到一个或更多的Kafka topics。
  • 消费者(Consumer API)允许应用程序订阅一个或多个主题,并处理为其生成的记录流。
  • 数据流(Streams API)允许应用程序充当流处理器( stream processor),使用来自一个或多个topics输入流,并将输出流生成到一个或多个输出topics,从而有效地将输入流转换为输出流。
  • 连接器(Connector API)允许构建并运行可重用的生产者或消费者,将Kafka topics连接到现有的应用程序或数据系统。例如,数据库的连接器可以捕获每个表的更改。

1556898600-9189-kafka-apis

在Kafka中,客户机和服务器之间的通信是通过一种简单、高性能、与语言无关的TCP协议来完成。 此协议已经版本化,并保持与旧版本的向后兼容性。我们为Kafka提供Java客户端,客户端可以使用多语言版本。

Topics and Logs

让我们首先深入探讨Kafka为记录流提供的核心抽象 – topic。

topic是发布记录的类别或源名称。Kafka中的Topics总是多个订阅用户;也就是说,一个topic可以有0个、1个或多个订户订阅者。

对于每个topic,Kafka群集都维护一个分区日志,如下所示:

 

1556898598-1342-log-anatomy每个分区都是一个有序的,不可变的记录序列,不断附加到一个结构化的日志中。分区中的记录每个都被分配一个 offset 的顺序ID号,它标识分区中的每个记录。

Kafka集群持久地保留所有已发布的记录,无论它们是否已被消耗,可以使用可配置方式设置的过期时间。例如,如果保留策略设置为两天,则在发布记录后的两天内,它可供使用,之后将被丢弃以释放空间。Kafka的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。

1556898607-4067-log-consumer事实上,在消费者的日志中使用偏移量(offset)或分区来保持唯一的元数据。这种偏移(offset)由消费者控制:通常消费者在读取记录时会线性地提高其偏移量(offset),但事实上,由于消费者控制偏移量(offset)的位置,它可以按照自己喜欢的任何顺序消费记录。例如,消费者可以重置为已处理过或较旧的偏移量(offset)以重新处理过去的数据,或者跳到最近的记录并从“现在”开始消费。

这些功能组合意味着Kafka消费者非常简单,他们可以来来往往对集群或其他消费者没有太大影响。例如,您可以使用我们的命令行工具“tail”任何主题的内容,而无需更改任何现有消费者所消费的内容。

日志中的分区有多种用途。首先,它们允许日志扩展到超出单个服务器的大小。每个单独的分区必须适合托管它的服务器,但topic可能有许多分区,因此它可以处理任意数量的数据。其次,在一点上他们更像是并行的单元。

分配

日志的分区分布在Kafka集群中的服务器上,每个服务器处理数据并请求分区的共享。每个分区都在可配置数量的服务器上进行复制,以实现容错。

每个分区都有一个服务器充当“领导者(leader)”,0个或多个服务器充当“追随者(followers)”。领导者处理分区的所有读取和写入请求,而关注者被动地复制领导者。如果领导者出现故障,其中一个追随者将自动成为新的领导者。每个服务器都充当其某些分区的领导者和其他服务器的追随者,因此负载在群集中很均衡。

地理复制

Kafka MirrorMaker为群集提供地理复制的支持。使用MirrorMaker,消息跨多个数据中心或云区域进行复制。你可以使用它在active/passive方案中进行备份和恢复; 或者在active/active方案中,按地理的方式,使数据更接近用户,或支持数据位置要求。

生产者

生产者将数据发布到他们选择的topics。生产者负责选择要分配给topic中哪个分区的记录。这可以通过循环方式完成,只是为了平衡负载,或者可以根据一些语义分区功能(例如:基于记录中的某些键)来完成。

消费者

消费者使用 consumer group 名称标记自己,每一个记录都会发布到一个topic中,并传递给每一个订阅的 consumer group 中其中一个消费者。消费者实例可以在同一个进程中,也可以在不同的机器

如果所有消费者实例具有相同的 consumer group,那么记录将在消费者实例上进行负载平衡(只有其中一个能收到消息)。

如果所有消费者实例具有不同的 consumer groups,那么每个记录将使用广播的方式,发送到所有 consumer group 进程。

 

1556898599-7054-consumer-groups两个服务器Kafka群集,托管四个分区(P0-P3),包含两个 consumer groups。 group A有两个消费者, group B有四个消费者。

然而,更常见的是,我们发现topics具有少量的 consumer groups,每个“logical subscriber”一个。每个组由许多用于可伸缩性和容错的消费者实例组成。这只不过是发布 – 订阅语义,其中订阅者是消费者群集而不是单个进程。

在Kafka中实现消费的方式是通过在消费者实例上划分日志中的分区,以便每个实例在任何时间点都是分配的“公平份额”的独占消费者。这个维护组成员身份的过程是由kafka协议动态处理的。如果有新的消费者实例加入该组,他们将从该组的其他成员接管一些分区; 如果实例出现故障,那么其分区将分配给其余消费者实例。

Kafka仅提供分区内记录的总订单,而不是主题中不同分区之间的记录。对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了。但是,如果需要对记录进行总排序,可以使用只有一个分区的主题来实现,但这将意味着每个 consumer group 只有一个消费者进程。

多租户

您可以将Kafka部署为多租户解决方案。通过配置哪些主题可以生成或使用数据来启用多租户。配额也有运营支持。管理员可以定义和强制执行配额,以控制客户端使用的代理资源。有关更多信息,请参阅安全文档

担保

在高级别Kafka提供以下保证:

  • 生产者按顺序将消息发送到特定主题分区。也就是说,如果记录数据M1由与数据M2是由同一个生产者发送,并且首先发送M1,则M1将具有比M2更低的偏移并在日志中更早出现。
  • 消费者实例按照它们存储在日志中的顺序查看记录。
  • 对于具有复制因子N的主题,我们将容忍最多N-1个服务器故障,而不会丢失任何提交到日志的记录。

下一篇,将Kafka集成到Spring中。

有关Kafka提供的API和功能的更多信息,请参阅官方文档

Kafka与Zookeeper常用操作

一、Kafka操作

1.启动kafka命令:

#cd /opt/kafka_2.10-0.10.1.1/bin;

# ./kafka-server-start.sh /opt/kafka_2.10-0.10.1.1/config/server.properties &;

2.停止kafka命令:

# ./kafka-server-stop.sh

3.创建Topic:(创建一个名为test的topic,只有一个副本,一个分区。)

#./kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 1 –topic test

4.列出所有Topic:

#./kafka-topics.sh -list -zookeeper 127.0.0.1:2181

5.启动Producer并发送消息:

#./kafka-console-producer.sh –broker-list localhost:9092 –topic test

(输入相应的消息,eg:hello kafka;按Ctrl+C结束)

6.启动Consumer并接收消息:

#./kafka-console-consumer.sh –zookeeper 127.0.0.1:2181 –topic test –from-beginning

7.前台启动kafka:

./kafka-server-start.sh ../config/server.properties

8.后台启动kafka:

./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

9.指定监听端口

JMX_PORT=2898

./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

 

二、Zookeeper常用操作

1.Zookeeper服务端启动:

# cd /opt/zookeeper-3.4.10/bin/

#./zkServer.sh start

2.Zookeeper服务端停止:

# cd /opt/zookeeper-3.4.10/bin/

#./zkServer.sh stop

3.Zookeeper服务端重启:

# cd /opt/zookeeper-3.4.10/bin/

#./zkServer.sh restart

4.查看Zookeeper进程:

#ps -ef|grep zookeeper;

5.查看Zookeeper服务端状态:

# cd /opt/zookeeper-3.4.10/bin/

#./zkServer.sh status

6.Zookeeper客户端登陆:

# cd /opt/zookeeper-3.4.10/bin/

#./zkCli.sh -server 127.0.0.1:2181

 

Kafka、SpringMVC整合例子

一、安装zookeeper

1.下载安装包:http://zookeeper.apache.org/releases.html#download;

2.进入Zookeeper设置目录,笔者D:\kafka\zookeeper-3.4.11\conf;

3. 将“zoo_sample.cfg”重命名为“zoo.cfg” ;

3. 编辑zoo.cfg配置文件;

4. 找到并编辑

dataDir=/tmp/zookeeper 并更改成您当前的路径;

5. 系统环境变量:

a. 在系统变量中添加ZOOKEEPER_HOME = D:\kafka\zookeeper-3.4.11

b. 编辑path系统变量,添加为路径%ZOOKEEPER_HOME%\bin;

6. 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181);

7.打开新的cmd,输入zkServer,运行Zookeeper;

出现如下图片表示成功:

002

 

二、安装并运行Kafka

1.下载Kafka:http://kafka.apache.org/downloads.html

2. 进入Kafka配置目录,D:\kafka\kafka_2.12-1.0.1\config;

3. 编辑文件“server.properties” ;

4. 找到并编辑log.dirs=/tmp/kafka-logs 改成您当前可用的目录;

5. 找到并编辑zookeeper.connect=localhost:2181;

6. Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。

运行Kafka代码:.\bin\windows\kafka-server-start.bat .\config\server.properties 

003

 

 

注:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行。

三、Kafka代码的实现

1.生产者配置文件:

@Bean
public Map<String,Object> getDefaultFactoryArg(){
    Map<String,Object> arg = new HashMap<>();
    arg.put("bootstrap.servers",ConstantKafka.KAFKA_SERVER);
    arg.put("group.id","100");
    arg.put("retries","1");
    arg.put("batch.size","16384");
    arg.put("linger.ms","1");
    arg.put("buffer.memory","33554432");
    arg.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    arg.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    return arg;
}

@Bean
public DefaultKafkaProducerFactory defaultKafkaProducerFactory(){
    DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(this.getDefaultFactoryArg());
    return factory;
}

@Bean
public KafkaTemplate kafkaTemplate(){
    KafkaTemplate template = new KafkaTemplate(defaultKafkaProducerFactory());
    template.setDefaultTopic(ConstantKafka.KAFKA_TOPIC1);
    template.setProducerListener(kafkaProducerListener());
    return template;
}

@Bean
public KafkaProducerListener kafkaProducerListener(){
    KafkaProducerListener listener = new KafkaProducerListener();
    return listener;
}

2.消费者配置文件:

@Bean
public Map<String,Object> getDefaultArgOfConsumer(){
    Map<String,Object> arg = new HashMap<>();
    arg.put("bootstrap.servers",ConstantKafka.KAFKA_SERVER);
    arg.put("group.id","100");
    arg.put("enable.auto.commit","false");
    arg.put("auto.commit.interval.ms","1000");
    arg.put("auto.commit.interval.ms","15000");
    arg.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    arg.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    return arg;
}

@Bean
public DefaultKafkaConsumerFactory defaultKafkaConsumerFactory(){
    DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(getDefaultArgOfConsumer());
    return factory;
}

@Bean
public KafkaConsumerMessageListener kafkaConsumerMessageListener(){
    KafkaConsumerMessageListener listener = new KafkaConsumerMessageListener();
    return listener;
}

/**
 * 监听频道-log
 * @return
 */
@Bean
public ContainerProperties containerPropertiesOfLog(){
    ContainerProperties properties = new ContainerProperties(ConstantKafka.KAFKA_TOPIC1);
    properties.setMessageListener(kafkaConsumerMessageListener());
    return properties;
}

/**
 * 监听频道-other
 * @return
 */
@Bean
public ContainerProperties containerPropertiesOfOther(){
    ContainerProperties properties = new ContainerProperties(ConstantKafka.KAFKA_TOPIC2);
    properties.setMessageListener(kafkaConsumerMessageListener());
    return properties;
}

@Bean(initMethod = "doStart")
public KafkaMessageListenerContainer kafkaMessageListenerContainerOfLog(){
    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(defaultKafkaConsumerFactory(),containerPropertiesOfLog());
    return container;
}

@Bean(initMethod = "doStart")
public KafkaMessageListenerContainer kafkaMessageListenerContainerOfOther(){
    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(defaultKafkaConsumerFactory(),containerPropertiesOfOther());
    return container;
}

3.生产消息服务

@Service
public class KafkaProducerServer implements IKafkaProducerServer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public static final String ROLE_LOG = "log";
    public static final String ROLE_web = "web";
    public static final String ROLE_APP = "app";

    /**
     * 发送消息
     * @param topic 频道
     * @param msg 消息对象
     * @param isUsePartition 是否使用分区
     * @param partitionNum 分区数,如果isUsePartition为true,此数值必须>0
     * @param role 角色:app,web
     * @return
     * @throws IllegalServiceException
     */
    @Override
    public ResultResp<Void> send(String topic, Object msg, boolean isUsePartition, Integer partitionNum, String role) throws IllegalServiceException {
        if (role == null){
            role = ROLE_LOG;
        }

        String key = role + "_" + msg.hashCode();
        String valueString = JsonUtil.ObjectToJson(msg, true);

        if (isUsePartition) {
            //表示使用分区
            int partitionIndex = getPartitionIndex(key, partitionNum);
            ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(topic, partitionIndex, key, valueString);
            return checkProRecord(result);
        } else {
            ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(topic, key, valueString);
            return checkProRecord(result);
        }
    }

    /**
     * 根据key值获取分区索引
     *
     * @param key
     * @param num
     * @return
     */
    private int getPartitionIndex(String key, int num) {
        if (key == null) {
            Random random = new Random();
            return random.nextInt(num);
        } else {
            int result = Math.abs(key.hashCode()) % num;
            return result;
        }
    }

    /**
     * 检查发送返回结果record
     *
     * @param res
     * @return
     */

    private ResultResp<Void> checkProRecord(ListenableFuture<SendResult<String, Object>> res) {
        ResultResp<Void> resp = new ResultResp<>();
        resp.setCode(ConstantKafka.KAFKA_NO_RESULT_CODE);
        resp.setInfo(ConstantKafka.KAFKA_NO_RESULT_MES);

        if (res != null) {
            try {
                SendResult r = res.get();//检查result结果集
                /*检查recordMetadata的offset数据,不检查producerRecord*/
                Long offsetIndex = r.getRecordMetadata().offset();
                if (offsetIndex != null && offsetIndex >= 0) {
                    resp.setCode(ConstantKafka.SUCCESS_CODE);
                    resp.setInfo(ConstantKafka.SUCCESS_MSG);
                } else {
                    resp.setCode(ConstantKafka.KAFKA_NO_OFFSET_CODE);
                    resp.setInfo(ConstantKafka.KAFKA_NO_OFFSET_MES);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
                resp.setCode(ConstantKafka.KAFKA_SEND_ERROR_CODE);
                resp.setInfo(ConstantKafka.KAFKA_SEND_ERROR_MES + ":" + e.getMessage());

            } catch (ExecutionException e) {
                e.printStackTrace();
                resp.setCode(ConstantKafka.KAFKA_SEND_ERROR_CODE);
                resp.setInfo(ConstantKafka.KAFKA_SEND_ERROR_MES + ":" + e.getMessage());
            }
        }

        return resp;
    }

}

4.生产者监听服务

public class KafkaProducerListener implements ProducerListener {

    protected final Logger logger = Logger.getLogger(KafkaProducerListener.class.getName());

    public KafkaProducerListener(){

    }

    @Override
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        logger.info("-----------------kafka发送数据成功");
        logger.info("----------topic:"+topic);
        logger.info("----------partition:"+partition);
        logger.info("----------key:"+key);
        logger.info("----------value:"+value);
        logger.info("----------RecordMetadata:"+recordMetadata);
        logger.info("-----------------kafka发送数据结束");
    }

    @Override
    public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
        logger.info("-----------------kafka发送数据失败");
        logger.info("----------topic:"+topic);
        logger.info("----------partition:"+partition);
        logger.info("----------key:"+key);
        logger.info("----------value:"+value);
        logger.info("-----------------kafka发送数据失败结束");
        e.printStackTrace();
    }

    /**
     * 是否启动Producer监听器
     * @return
     */
    @Override
    public boolean isInterestedInSuccess() {
        return false;
    }
}

5.消费者监听服务

public class KafkaConsumerMessageListener implements MessageListener<String,Object> {

    private Logger logger = Logger.getLogger(KafkaConsumerMessageListener.class.getName());

    public KafkaConsumerMessageListener(){

    }

    /**
     * 消息接收-LOG日志处理
     * @param record
     */
    @Override
    public void onMessage(ConsumerRecord<String, Object> record) {
        logger.info("=============kafka消息订阅=============");

        String topic = record.topic();
        String key = record.key();
        Object value = record.value();
        long offset = record.offset();
        int partition = record.partition();

        if (ConstantKafka.KAFKA_TOPIC1.equals(topic)){
            doSaveLogs(value.toString());
        }

        logger.info("-------------topic:"+topic);
        logger.info("-------------value:"+value);
        logger.info("-------------key:"+key);
        logger.info("-------------offset:"+offset);
        logger.info("-------------partition:"+partition);
        logger.info("=============kafka消息订阅=============");
    }

    private void doSaveLogs(String data){
        SocketIOPojo<BikeLogPojo> logs = JsonUtil.JsonToObject(data.toString(),SocketIOPojo.class);
        /**
         * 写入到数据库中
         */
    }
}

 

测试图片:

004

 

 

 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |