Rsyslog+kafka+ELK(单节点)部署

开源产品:Rsyslog、Kafka、ELK
处理流程为:Vm Rsyslog–> Rsyslog Server –omkafka–> Kafka –> Logstash –> Elasticsearch –> Kibana
ps:omkafka模块在rsyslog v8.7.0之后的版本才支持

环境:

ELK SERVER 10.10.27.123
Rsyslog Server 10.10.27.121
Rsyslog client 10.10.27.122

rsyslog日志收集

Rsyslog是高速的日志收集处理服务,它具有高性能、安全可靠和模块化设计的特点,能够接收来自各种来源的日志输入(例如:file,tcp,udp,uxsock等),并通过处理后将结果输出的不同的目的地(例如:mysql,mongodb,elasticsearch,kafka等),每秒处理日志量能够超过百万条。
Rsyslog作为syslog的增强升级版本已经在各linux发行版默认安装了,无需额外安装

1、rsyslog服务端

~]# cat /etc/rsyslog.conf 
# Provides UDP syslog reception
$ModLoad imudp
$UDPServerRun 514

# Provides TCP syslog reception
$ModLoad imtcp
$InputTCPServerRun 514

~]# cat /etc/rsyslog.d/default.conf
#### GLOBAL DIRECTIVES ####
# Use default timestamp format  # 使用自定义的日志格式
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
$template myFormat,"%timestamp% %fromhost-ip% %syslogtag% %msg%\n"
$ActionFileDefaultTemplate myFormat

# 根据客户端的IP单独存放主机日志在不同目录,rsyslog需要手动创建
$template RemoteLogs,"/data/rsyslog/%fromhost-ip%/%fromhost-ip%_%$YEAR%-%$MONTH%-%$DAY%.log"
# 排除本地主机IP日志记录,只记录远程主机日志
:fromhost-ip, !isequal, "127.0.0.1" ?RemoteLogs
~]# systemctl restart rsyslog

为了把rsyslog server收集的日志数据导入到ELK中,需要在rsyslog server使用到omkafka的模块

~]# yum -y install rsyslog-kafka
~]# cat /etc//rsyslog.d/kafka.conf
# 加载omkafka和imfile模块
module(load="omkafka")
module(load="imfile")
 
# nginx template
template(name="SystemlogTemplate" type="string" string="%hostname%<-+>%syslogtag%<-+>%msg%\n")
 
# ruleset
ruleset(name="systemlog-kafka") {
    #日志转发kafka
    action (
        type="omkafka"
	template="SystemlogTemplate"
        topic="system-log"
        broker="10.10.27.123:9092"
    )
}

input(type="imfile" Tag="Systemlog" File="/data/rsyslog/*/*.log" Ruleset="systemlog-kafka"

~]# systemctl restart rsyslog

2、 Rsyslog客户端

~]# cat /etc/rsyslog.conf #追加一行
*.*	@10.10.27.121:514
#所有日志通过UDP传输给rsyslog server
~]# systemctl restart rsyslog

至此,rsyslog准备完毕,验证/data/rsyslog下是否产生日志文件

kafka搭建

1、搭建kafka依赖的zookeeper

~]# docker login
~]# docker pull wurstmeister/zookeeper
~]# mkdir -p /data/zookeeper
~]# docker run -d \
--name zookeeper \
--net=host \
-p 2181:2181 \
--restart always \
-v /data/zookeeper:/data/zookeeper \
-e ZOO_PORT=2181 \
-e ZOO_DATA_DIR=/data/zookeeper/data \
-e ZOO_DATA_LOG_DIR=/data/zookeeper/logs \
-e ZOO_MY_ID=1 \
-e ZOO_SERVERS="server.1=10.10.27.123:2888:3888" \
wurstmeister/zookeeper:latest

参数说明:
--net=host: 容器网络设置为 host, 能够和宿主机共享网络
-p 2181:2181: 容器的 2181 端口映射到宿主机的 2181 端口
-v /data/zookeeper:/data/zookeeper:容器的/data/zookeeper 目录挂载到宿主机的 /data/zookeeper 目录
ZOO_PORT:zookeeper 的运行端口
ZOO_DATA_DIR:数据存放目录
ZOO_DATA_LOG_DIR: 日志存放目录
ZOO_MY_ID:zk 的节点唯一标识
ZOO_SERVERS:zk 集群服务配置

如果需要部署zookeeper集群:其他 2 个节点同理部署, 只需要修改 ZOO_MY_ID; 节点 2:ZOO_MY_ID=2,节点 3:ZOO_MY_ID=3

验证

~]# docker exec -it zookeeper /bin/bash
bash # cd /opt/zookeeper-3.4.13/bin
bash # zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: standalone

2、搭建kafka

~]# mkdir -p /data/kafka
~]# docker pull wurstmeister/kafka
~]# docker run -d \
--name kafka \
--net=host \
--restart always \
-v /data:/data \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_PORT=9092 \
-e KAFKA_HEAP_OPTS="-Xms1g -Xmx1g" \
-e KAFKA_HOST_NAME=10.10.27.123 \
-e KAFKA_ADVERTISED_HOST_NAME=10.10.27.123 \
-e KAFKA_LOG_DIRS=/data/kafka \
-e KAFKA_ZOOKEEPER_CONNECT="10.10.27.123:2181" \
wurstmeister/kafka:latest

参数说明:
--net=host: 容器网络设置为 host, 能够和宿主机共享网络
-v /data:/data:容器的/data 目录挂载到宿主机的 /data 目录
KAFKA_BROKER_ID:kafka 的 broker 集群标识, 每台节点 broker 不一样
KAFKA_PORT:kafka 运行端口
KAFKA_HEAP_OPTS:kafka 启动时的 jvm 大小
KAFKA_HOST_NAME:kafka 主机名称,这里随便写,但是要与主机 IP 做 dns 映射
KAFKA_LOG_DIRS:kafka 日志存储目录
KAFKA_ZOOKEEPER_CONNECT:kafka 运行在 zk 里面,zk 提供的连接地址,集群的话写多个地址,逗号隔开

如果需要部署zookeeper集群:其他 2 个节点同理部署, 只需要修改 KAFKA_BROKER_ID、KAFKA_HOST_NAME、KAFKA_ADVERTISED_HOST_NAME 对应的值即可

验证

进入 kafka 容器
~]# docker exec -it kafka /bin/bash
bash-4.4# bin/kafka-topics.sh --list --zookeeper 10.10.27.123:2181
__consumer_offsets
log-api
system-log  #rsyslog自动创建的topic

ELK搭建

1、搭建elasticsearch

~]# docker pull elasticsearch:7.7.0
~]# mkdir /data/es/conf -p
~]# cd /data/es/conf
~]# vim elasticsearch.yml
cluster.name: es-cluster
network.host: 10.10.27.123
http.cors.enabled: true
http.cors.allow-origin: "*"
network.publish_host: 10.10.27.123
discovery.zen.minimum_master_nodes: 1
discovery.zen.ping.unicast.hosts: ["10.10.27.123"]
discovery.type: single-node

多节点集群使用以下配置:
~]# vim elasticsearch.yml
#集群名称,多个节点用一个名称
cluster.name: es-cluster
## es 1.0 版本的默认配置是 “0.0.0.0”,所以不绑定 ip 也可访问
network.host: 0.0.0.0
node.name: master
#跨域设置
http.cors.enabled: true
http.cors.allow-origin: "*"
node.master: true
node.data: true
network.publish_host: 192.168.1.140
discovery.zen.ping.unicast.hosts: ["192.168.1.140,192.168.1.142,192.168.1.147"]
discovery.zen.minimum_master_nodes: 1

~]# docker run -d \
--name=elasticsearch -p 9200:9200 -p 9300:9300 -p 5601:5601 \
--net=host \
--restart always \
-e ES_JAVA_OPTS="-Xms1g -Xmx1g" \   #启动内存设置
-v /data/es/conf/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /data/es/data:/usr/share/elasticsearch/data \
elasticsearch:7.7.0

启动过程会有报错,提前做以下操作
~]# mkdir /data/es/data
~]# chmod u+x /data/es/data
~]# vim /etc/sysctl.conf
vm.max_map_count=655350
~]# sysctl -p

如果需要部署zookeeper集群:其他 2 个节点同样的部署思路,只需要修改 elasticsearch 的配置文件中的 node.name 和 network.publish_host

2、部署kibana

~]# docker pull daocloud.io/library/kibana:7.7.0
~]# mkdir -p /data/kibana/conf
~]# vi /data/kibana/conf/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
# ES
elasticsearch.hosts: ["http://10.10.27.123:9200"]  
i18n.locale: "zh-CN"
xpack.security.enabled: false
ps:多集群可忽略此配置文件,使用默认即可

~]# docker run -d \
--restart always \
--name kibana \
--network=container:elasticsearch \   #容器网络要和连接的 es 容器共享网络
-v /data/kibana/config:/usr/share/kibana/config \
daocloud.io/library/kibana:7.7.0

3、部署logstash

~]# docker pull daocloud.io/library/logstash:7.7.0
~]# mkdir /data/logstash/conf -p
~]# vim logstash.conf
input{
   kafka{
        topics => ["system-log"]   #必须可前文的topic统一
        bootstrap_servers => ["10.10.27.123:9092"]
    }
}
output{
    elasticsearch {
        hosts => ["10.10.27.123:9200"]
        index => "system-log-%{+YYYY.MM.dd}"
        }
   stdout {
    codec => rubydebug
  }
}

~]# docker run -d \
--restart always \
--name logstash \
--net=host \
--link elasticsearch \
-v /data/logstash/conf/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
daocloud.io/library/logstash:7.7.0

kibana添加索引

 

微信截图_20240118175914 微信截图_20240118175924

镜像下载

由于中国下载docker镜像很慢,配置镜像仓库

--registry-mirror=https://registry.docker-cn.com

建议在本地电脑,比如win10中安装windows版docker,配置好上面镜像仓库地址(setting-Docker Engine),打开powershell进行下载打包镜像:

PS C:\Users\suixin> docker pull kibana:7.7.0
PS C:\Users\suixin> docker save -o kibana.gz kibana:7.7.0

然后上传到服务器导入:

# docker load -i kibana.gz

Kafka删除topic的方法

在运营环境中,删除kafka topic的方法,没有必要当然不要这么干,迫不得已才会去删除topic。

首先配置文件server.properties 增加参数 delete.topic.enable=true

其次呢,使用删除命令行:

./bin/kafka-topics.sh –delete –topic THIS_IS_MY_TOPIC –zookeeper localhost:2181

*红色字体需要替换。

或者使用kafka-manager集群管理工具删除。

如果删除之前,没有配置 delete.topic.enable=true 参数,那么topic只会标记为marked for deletion ,也就是说,只是标记并没有删除;

此时只要加上配置项,然后重启kafka就可以真正删除该topic。

如果重启还是没有被删除,那么就使用zookeeper的命令删除试试;

输入工具行命令: ./zookeeper-3.4.10/bin/zkCli.sh

此时进入了一个shell中端。

执行:ls /brokers/topics 命令可以查看当前brokers下所有topics。

然后执行: rmr /brokers/topics/[topic name] 删除指定的topic,但是可能会提示:

The command ‘rmr’ has been deprecated. Please use ‘deleteall’ instead.

只要把rmr换成deleteall就可以了,如:

deleteall /brokers/topics/[topic name]

再次使用ls /brokers/topics 命令查看topics,如果发现你要删除的topic还在,

执行:ls /admin/delete_topics  这里会显示出你刚刚要删除的topic,然后执行:deleteall /admin/delete_topics/[topic name]  就可以完全删除了。

总结一下:

1.执行:

./bin/kafka-topics.sh –delete –topic THIS_IS_MY_TOPIC –zookeeper localhost:2181

2.如果没有删除:

配置文件server.properties 增加参数 delete.topic.enable=true

3.重启kafka

4.还是没有删除:

执行:./zookeeper-3.4.10/bin/zkCli.sh

执行:ls /brokers/topics 命令可以查看当前brokers下所有topics

执行:deleteall /brokers/topics/[topic name]

 

5.再次使用ls /brokers/topics 命令查看topics,如果发现你要删除的topic还在

执行:ls /admin/delete_topics

执行:deleteall /admin/delete_topics/[topic name] 

结束完成。比较麻烦,但是有效解决在kafka使用过程中如果有需要删除topic而又删除失败。

kafka集群服务broker扩容

原先的3节点的kafka假设为node1、node2、node3

准备2台空闲点的服务器(这里假设为node4和node5)

系统版本:CentOS7

node1  192.168.2.187

node2  192.168.2.188

node3  192.168.2.189

node4  192.168.2.190

node5  192.168.2.191

kafka的扩容操作分为2步:

1、zk 节点扩容

2、kafka 节点扩容

首先在node4 node5上把相关的软件部署好:

cd /root/
tar xf zookeeper-3.4.9.tar.gz
tar xf kafka_2.11-0.10.1.0.tar.gz 
tar xf jdk1.8.0_101.tar.gz 

mv kafka_2.11-0.10.1.0  zookeeper-3.4.9   jdk1.8.0_101   /usr/local/

cd /usr/local/ 
ln -s zookeeper-3.4.9   zookeeper-default
ln -s kafka_2.11-0.10.1.0  kafka-default
ln -s jdk1.8.0_101    jdk-default

第一部分:zk节点的扩容:

1、在node4上执行:

mkdir /usr/local/zookeeper-default/data/ 

vim  /usr/local/zookeeper-default/conf/zoo.cfg  在原有的基础上,增加最后的2行配置代码:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-default/data/
clientPort=2181
maxClientCnxns=2000
maxSessionTimeout=240000
server.1=192.168.2.187:2888:3888
server.2=192.168.2.188:2888:3888
server.3=192.168.2.189:2888:3888
server.4=192.168.2.190:2888:3888
server.5=192.168.2.191:2888:3888

## 清空目录防止有脏数据
rm -fr /usr/local/zookeeper-default/data/*

## 添加对应的myid文件到zk数据目录下
echo 4 > /usr/local/zookeeper-default/data/myid

2、启动node4的zk进程:

/usr/local/zookeeper-default/bin/zkServer.sh start

/usr/local/zookeeper-default/bin/zkServer.sh  status   类似如下效果:
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-default/bin/../conf/zoo.cfg
Mode: follower

/usr/local/zookeeper-default/bin/zkCli.sh

echo stat | nc 127.0.0.1 2181  结果类似如下:
Zookeeper version: 3.4.9-1757313, built on 08/23/2016 06:50 GMT
Clients:
 /127.0.0.1:50072[1](queued=0,recved=6,sent=6)
 /127.0.0.1:50076[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/2/13
Received: 24
Sent: 23
Connections: 2
Outstanding: 0
Zxid: 0x10000009a
Mode: follower
Node count: 63

3、在node5上执行:

vim  /usr/local/zookeeper-default/conf/zoo.cfg  增加最后的2行代码:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-default/data/
clientPort=2181
maxClientCnxns=2000
maxSessionTimeout=240000
server.1=192.168.2.187:2888:3888
server.2=192.168.2.188:2888:3888
server.3=192.168.2.189:2888:3888
server.4=192.168.2.190:2888:3888
server.5=192.168.2.191:2888:3888

## 清空目录防止有脏数据
rm -fr /usr/local/zookeeper-default/data/*

## 添加对应的myid文件到zk数据目录下
echo 5 > /usr/local/zookeeper-default/data/myid

4、启动node5的zk进程:

/usr/local/zookeeper-default/bin/zkServer.sh start

/usr/local/zookeeper-default/bin/zkServer.sh  status
 
echo stat | nc  127.0.0.1 2181  结果类似如下:
Zookeeper version: 3.4.9-1757313, built on 08/23/2016 06:50 GMT
Clients:
 /127.0.0.1:45582[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0/0
Received: 3
Sent: 2
Connections: 1
Outstanding: 0
Zxid: 0x10000009a
Mode: follower
Node count: 63
也可以使用 echo mntr   | nc  127.0.0.1 2181  这个结果更详细,类似如下:
zk_version3.4.9-1757313, built on 08/23/2016 06:50 GMT
zk_avg_latency0
zk_max_latency194
zk_min_latency0
zk_packets_received101436
zk_packets_sent102624
zk_num_alive_connections4
zk_outstanding_requests0
zk_server_statefollower
zk_znode_count141
zk_watch_count190
zk_ephemerals_count7
zk_approximate_data_size10382
zk_open_file_descriptor_count35
zk_max_file_descriptor_count102400

5、当我们确认 新加的2个zk节点没问题后,我们需要去修改之前的老的3台zk的配置,然后重启这3个zk

修改 node1 node2 node3的 zk配置,如下:

vim  /usr/local/zookeeper-default/conf/zoo.cfg  增加最后的2行代码:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-default/data/
clientPort=2181
maxClientCnxns=2000
maxSessionTimeout=240000
server.1=192.168.2.187:2888:3888
server.2=192.168.2.188:2888:3888
server.3=192.168.2.189:2888:3888
server.4=192.168.2.190:2888:3888
server.5=192.168.2.191:2888:3888

注意重启的时候,我们先重启 follower节点(例如我这里follower是 node2、node3,leader是 node1)

/usr/local/zookeeper-default/bin/zkServer.sh stop
/usr/local/zookeeper-default/bin/zkServer.sh status

/usr/local/zookeeper-default/bin/zkServer.sh start
/usr/local/zookeeper-default/bin/zkServer.sh status

第二部分:kafka节点的扩容:

1、node4 (192.168.2.190)上修改:

mkdir -pv /usr/local/kafka-default/kafka-logs

vim /usr/local/kafka-default/config/server.properties  修改后的文件如下:
broker.id=4   # 注意修改这里
listeners=PLAINTEXT://:9094,TRACE://:9194
advertised.listeners=PLAINTEXT://192.168.2.190:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka-default/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.2.187:2181,192.168.2.188:2181,192.168.2.189:2181,192.168.2.190:2181,192.168.2.191:2181  # 注意修改这里
zookeeper.connection.timeout.ms=6000
default.replication.factor=2
compression.type=gzip
offsets.retention.minutes=2880
controlled.shutdown.enable=true
delete.topic.enable=true

2、启动node4的kafka程序:

/usr/local/kafka-default/bin/kafka-server-start.sh -daemon /usr/local/kafka-default/config/server.properties

3、node5(192.168.2.191)上修改

mkdir -pv /usr/local/kafka-default/kafka-logs

vim /usr/local/kafka-default/config/server.properties  修改后的文件如下:
broker.id=5   # 注意修改这里
listeners=PLAINTEXT://:9094,TRACE://:9194
advertised.listeners=PLAINTEXT://192.168.2.191:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka-default/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.2.187:2181,192.168.2.188:2181,192.168.2.189:2181,192.168.2.190:2181,192.168.2.191:2181   # 注意修改这里
zookeeper.connection.timeout.ms=6000
default.replication.factor=2
compression.type=gzip
offsets.retention.minutes=2880
controlled.shutdown.enable=true
delete.topic.enable=true

4、启动node5的kafka程序:

/usr/local/kafka-default/bin/kafka-server-start.sh -daemon /usr/local/kafka-default/config/server.properties

5、测试是否有问题

这里我们可以自己先用 kafka-console-producer.sh 和 kafka-console-consumer.sh  自测下是否 正常工作,然后看看 kafka-manager上是否有需要重新均衡的副本。。

第三部分:对存在风险broker节点的数据迁移(我这里需要这么操作,单纯的扩容不需要这个步骤):

这里我们可以使用kafka-manager这个web平台来做 topic的迁移操作,很简单,这里就不截图了。

第四部分: 对node2 node3下线操作

1、关闭node2 node3节点上面的zk进程,让zk leader节点自动选举

2、关闭node2 node3上面的kafka进程,让kafka controller节点自动选举

## 可能遇到的问题: 

在迁移过程中,遇到consumergroup在我们迁移topic的时候发生异常,让业务方重启了consumer后 报错消失。。

 

转自:https://blog.51cto.com/lee90/2423980

kafka-configs.sh命令行使用方法

kafka的一个动态配置工具,可以通过命令行对系统进行动态配置,但并不是所有的配置项都支持的。

$ ./kafka-configs.bat
        This tool helps to manipulate and describe entity config for a topic, client, user or broker
        Option                                 Description
        ------                                 -----------
        --add-config <String>                  Key Value pairs of configs to add.
        Square brackets can be used to group
        values which contain commas: 'k1=v1,
        k2=[v1,v2,v2],k3=v3'. The following
        is a list of valid configurations:
        For entity-type 'topics':
        cleanup.policy
        compression.type
        delete.retention.ms
        file.delete.delay.ms
        flush.messages
        flush.ms
        follower.replication.throttled.
        replicas
        index.interval.bytes
        leader.replication.throttled.replicas
        max.compaction.lag.ms
        max.message.bytes
        message.downconversion.enable
        message.format.version
        message.timestamp.difference.max.ms
        message.timestamp.type
        min.cleanable.dirty.ratio
        min.compaction.lag.ms
        min.insync.replicas
        preallocate
        retention.bytes
        retention.ms
        segment.bytes
        segment.index.bytes
        segment.jitter.ms
        segment.ms
        unclean.leader.election.enable
        For entity-type 'brokers':
        log.message.timestamp.type
        ssl.client.auth
        log.retention.ms
        sasl.login.refresh.window.jitter
        sasl.kerberos.ticket.renew.window.
        factor
        log.preallocate
        log.index.size.max.bytes
        sasl.login.refresh.window.factor
        ssl.truststore.type
        ssl.keymanager.algorithm
        log.cleaner.io.buffer.load.factor
        sasl.login.refresh.min.period.seconds
        ssl.key.password
        background.threads
        log.retention.bytes
        ssl.trustmanager.algorithm
        log.segment.bytes
        max.connections.per.ip.overrides
        log.cleaner.delete.retention.ms
        log.segment.delete.delay.ms
        min.insync.replicas
        ssl.keystore.location
        ssl.cipher.suites
        log.roll.jitter.ms
        log.cleaner.backoff.ms
        sasl.jaas.config
        principal.builder.class
log.flush.interval.ms
        log.cleaner.max.compaction.lag.ms
        max.connections
        log.cleaner.dedupe.buffer.size
        log.flush.interval.messages
        advertised.listeners
        num.io.threads
        listener.security.protocol.map
        log.message.downconversion.enable
        sasl.enabled.mechanisms
        sasl.login.refresh.buffer.seconds
        ssl.truststore.password
        listeners
        metric.reporters
        ssl.protocol
        sasl.kerberos.ticket.renew.jitter
        ssl.keystore.password
        sasl.mechanism.inter.broker.protocol
        log.cleanup.policy
        sasl.kerberos.principal.to.local.rules
        sasl.kerberos.min.time.before.relogin
        num.recovery.threads.per.data.dir
        log.cleaner.io.max.bytes.per.second
        log.roll.ms
        ssl.endpoint.identification.algorithm
        unclean.leader.election.enable
        message.max.bytes
        log.cleaner.threads
        log.cleaner.io.buffer.size
        max.connections.per.ip
        sasl.kerberos.service.name
        ssl.provider
        follower.replication.throttled.rate
        log.index.interval.bytes
        log.cleaner.min.compaction.lag.ms
        log.message.timestamp.difference.max.
        ms
        ssl.enabled.protocols
        log.cleaner.min.cleanable.ratio
        replica.alter.log.dirs.io.max.bytes.
        per.second
        ssl.keystore.type
        ssl.secure.random.implementation
        ssl.truststore.location
        sasl.kerberos.kinit.cmd
        leader.replication.throttled.rate
        num.network.threads
        compression.type
        num.replica.fetchers
        For entity-type 'users':
        request_percentage
        producer_byte_rate
        SCRAM-SHA-256
        SCRAM-SHA-512
        consumer_byte_rate
        For entity-type 'clients':
        request_percentage
        producer_byte_rate
        consumer_byte_rate
        Entity types 'users' and 'clients' may
        be specified together to update
        config for clients of a specific
        user.
        --alter                                Alter the configuration for the entity.
        --bootstrap-server <String: server to  The Kafka server to connect to. This
        connect to>                            is required for describing and
        altering broker configs.
        --command-config <String: command      Property file containing configs to be
        config property file>                  passed to Admin Client. This is used
        only with --bootstrap-server option
        for describing and altering broker
        configs.
        --delete-config <String>               config keys to remove 'k1,k2'
        --describe                             List configs for the given entity.
        --entity-default                       Default entity name for
        clients/users/brokers (applies to
        corresponding entity type in command
        line)
        --entity-name <String>                 Name of entity (topic name/client
        id/user principal name/broker id)
        --entity-type <String>                 Type of entity
        (topics/clients/users/brokers)
        --force                                Suppress console prompts
        --help                                 Print usage information.
        --version                              Display Kafka version.
        --zookeeper <String: urls>             REQUIRED: The connection string for
        the zookeeper connection in the form
        host:port. Multiple URLS can be
        given to allow fail-over.

网上找了一张图,翻译解释说明:

001

使用方法(例子):

  • 增加/编辑配置

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type topics –entity-name topicName –add-config ‘k1=v1, k2=v2, k3=v3′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type brokers –entity-name 1 –add-config ‘retention.bytes=1024072′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type brokers –entity-default –add-config ‘retention.bytes=1024072′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type clients –entity-default –add-config ‘retention.bytes=1024072′

  • 删除配置

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type topics –entity-name topicName –delete-config ‘k1,k2,k3’
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type clients –entity-name clientId –delete-config ‘k1,k2,k3’
./kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type brokers –entity-name $brokerId –delete-config ‘k1,k2,k3’
./kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type brokers –entity-default –delete-config ‘k1,k2,k3’

  • 列出配置项的描述信息

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –entity-type topics –entity-name topicName –describe
./kafka-configs.sh–bootstrap-server localhost:9092 –entity-type brokers –entity-name $brokerId –describe
./kafka-configs.sh –bootstrap-server localhost:9092 –entity-type brokers –entity-default –describe
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –entity-type users –entity-name user1 –entity-type clients –entity-name clientA –describe

 

 

Kafka 日志存储的问题

在进行详解之前,我想先声明一下,本次我们进行讲解说明的是 Kafka 消息存储的信息文件内容,不是所谓的 Kafka 服务器运行产生的日志文件,这一点希望大家清楚。

Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。每个主题又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。也就是该文要着重关注的内容。我们根据如下的图进行进一步说明:

001

图中,创建了一个 demo-topic 主题,其存在 7 个 Parition,对应的每个 Parition 下存在一个 [Topic-Parition] 命名的消息日志文件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的文件,比如:.index、.timestamp、.log、.snapshot 等,其中,文件名一致的文件集合就称为 LogSement。我们先留有这样的一个整体的日志结构概念,接下来我们一一的进行详细的说明其中的设计。

LogSegment

我们已经知道分区日志文件中包含很多的 LogSegment ,Kafka 日志追加是顺序写入的,LogSegment 可以减小日志文件的大小,进行日志删除的时候和数据查找的时候可以快速定位。同时,ActiveLogSegment 也就是活跃的日志分段拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限。

日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log 三种类型。其他的日志类型功能作用,请查询下面图表:

类别 作用
.index 偏移量索引文件
.timestamp 时间戳索引文件
.log 日志文件
.snaphot 快照文件
.deleted
.cleaned 日志清理时临时文件
.swap Log Compaction 之后的临时文件
Leader-epoch-checkpoint

每个 LogSegment 都有一个基准偏移量,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特别说明一下,如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121,偏移量是从 0 开始的。

如果想要查看相应文件内容可以通过 kafka-run-class.sh 脚本查看 .log :

/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

002

2.0 中可以使用 kafka-dump-log.sh 查 看.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index

日志与索引文件

配置项 默认值 说明
log.index.interval.bytes 4096 (4K) 增加索引项字节间隔密度,会影响索引文件中的区间密度和查询效率
log.segment.bytes 1073741824 (1G) 日志文件最大值
log.roll.ms 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,毫秒维度
log.roll.hours 168 (7天) 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,小时维度
log.index.size.max.bytes 10485760 (10MB) 触发偏移量索引文件或时间戳索引文件分段字节限额

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。

Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,他并不保证每一个消息在索引文件中都有对应的索引项。每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,通过修改 log.index.interval.bytes 的值,改变索引项的密度。

切分文件

从上文中可知,日志文件和索引文件都会存在多个文件,组成多个 SegmentLog,那么其切分的规则是怎样的呢?

当满足如下几个条件中的其中之一,就会触发文件的切分:

  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes 参数的默认值为 1073741824,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的默认值为 10485760,即 10MB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量。
为什么是 Integer.MAX_VALUE ?

在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。相对偏移量和物理地址。

相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节

物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节

4 个字节刚好对应 Integer.MAX_VALUE ,如果大于 Integer.MAX_VALUE ,则不能用 4 个字节进行表示了。

索引文件切分过程

索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值,当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。

查找消息

offset 查询

偏移量索引由相对偏移量和物理地址组成。

003

可以通过如下命令解析.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
offset:0 position:0
offset:20 position:320
offset:43 position:1220

注意:offset 与 position 没有直接关系哦,由于存在数据删除和日志清理。

004

e.g. 如何查看 偏移量为 23 的消息?

Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在 00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即 offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

时间戳方式查询

在上文已经有所提及,通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。

时间戳索引索引格式

005

006+

 

e.g. 查找时间戳为 1557554753430 开始的消息?

  • 将 1557554753430 和每个日志分段中最大时间戳 largestTimeStamp 逐一对比,直到找到不小于 1557554753430 所对应的日志分段。日志分段中的 largestTimeStamp 的计算是先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于 0 ,则取该值,否则去该日志分段的最近修改时间。
  • 找到相应日志分段之后,使用二分法进行定位,与偏移量索引方式类似,找到不大于 1557554753430 最大索引项,也就是 [1557554753420 430]。
  • 拿着偏移量为 430 到偏移量索引文件中使用二分法找到不大于 430 最大索引项,即 [20,320] 。
  • 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。

注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的哦。因为数据的写入是各自追加。

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是 CreateTime 则无法保证顺序。

日志清理

日志清理,不是日志删除哦,这还是有所区别的,日志删除会在下文进行说明。

Kafka 提供两种日志清理策略:

日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除

日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。

Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值:delete,还可以选择 compact。

是否支持针对具体的 Topic 进行配置?

答案是肯定的,主题级别的配置项是 cleanup.policy 。

日志删除

配置 默认值 说明
log.retention.check.interval.ms 300000 (5分钟) 检测频率
log.retention.hours 168 (7天) 日志保留时间小时
log.retention.minutes 日志保留时间分钟
log.retention.ms 日志保留时间毫秒
file.delete.delay.ms 60000 (1分钟) 延迟执行删除时间
log.retention.bytes -1 无穷大 运行保留日志文件最大值
log.retention.bytes 1073741824 (1G) 日志文件最大值

Kafka 会周期性根据相应规则进行日志数据删除,保留策略有 3 种:基于时间的保留策略、基于日志大小的保留策略和基于日志其实偏移量的保留策略。

基于时间

日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天,log.retention.ms 优先级最高。

如何查找日志分段文件中已经过去的数据呢?

Kafka 依据日志分段中最大的时间戳进行定位,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间呢?

因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。

删除过程
  1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。
  2. 这些日志分段所有文件添加 上 .delete 后缀。
  3. 交由一个以 "delete-file" 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过 file.delete.delay.ms 进行设置

如果活跃的日志分段中也存在需要删除的数据时?

Kafka 会先切分出一个新的日志分段作为活跃日志分段,然后执行删除操作。

基于日志大小

日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日志分段的大小由 log.regment.bytes 进行设定。

删除过程

  1. 计算需要被删除的日志总大小 (当前日志文件大小-retention值)。
  2. 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
  3. 执行删除。

基于日志起始偏移量

基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段。

注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。

007

删除过程
  • 从头开始遍历每一个日志分段,日志分段 1 的下一个日志分段的起始偏移量为 11,小于 logStartOffset,将 日志分段 1 加入到删除队列中
  • 日志分段 2 的下一个日志分段的起始偏移量为 23,小于 logStartOffset,将 日志分段 2 加入到删除队列中
  • 日志分段 3 的下一个日志分段的起始偏移量为 30,大于 logStartOffset,则不进行删除。
12
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |