Kafka 日志存储的问题

在进行详解之前,我想先声明一下,本次我们进行讲解说明的是 Kafka 消息存储的信息文件内容,不是所谓的 Kafka 服务器运行产生的日志文件,这一点希望大家清楚。

Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。每个主题又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。也就是该文要着重关注的内容。我们根据如下的图进行进一步说明:

001

图中,创建了一个 demo-topic 主题,其存在 7 个 Parition,对应的每个 Parition 下存在一个 [Topic-Parition] 命名的消息日志文件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的文件,比如:.index、.timestamp、.log、.snapshot 等,其中,文件名一致的文件集合就称为 LogSement。我们先留有这样的一个整体的日志结构概念,接下来我们一一的进行详细的说明其中的设计。

LogSegment

我们已经知道分区日志文件中包含很多的 LogSegment ,Kafka 日志追加是顺序写入的,LogSegment 可以减小日志文件的大小,进行日志删除的时候和数据查找的时候可以快速定位。同时,ActiveLogSegment 也就是活跃的日志分段拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限。

日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log 三种类型。其他的日志类型功能作用,请查询下面图表:

类别 作用
.index 偏移量索引文件
.timestamp 时间戳索引文件
.log 日志文件
.snaphot 快照文件
.deleted
.cleaned 日志清理时临时文件
.swap Log Compaction 之后的临时文件
Leader-epoch-checkpoint

每个 LogSegment 都有一个基准偏移量,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特别说明一下,如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121,偏移量是从 0 开始的。

如果想要查看相应文件内容可以通过 kafka-run-class.sh 脚本查看 .log :

/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

002

2.0 中可以使用 kafka-dump-log.sh 查 看.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index

日志与索引文件

配置项 默认值 说明
log.index.interval.bytes 4096 (4K) 增加索引项字节间隔密度,会影响索引文件中的区间密度和查询效率
log.segment.bytes 1073741824 (1G) 日志文件最大值
log.roll.ms 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,毫秒维度
log.roll.hours 168 (7天) 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,小时维度
log.index.size.max.bytes 10485760 (10MB) 触发偏移量索引文件或时间戳索引文件分段字节限额

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。

Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,他并不保证每一个消息在索引文件中都有对应的索引项。每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,通过修改 log.index.interval.bytes 的值,改变索引项的密度。

切分文件

从上文中可知,日志文件和索引文件都会存在多个文件,组成多个 SegmentLog,那么其切分的规则是怎样的呢?

当满足如下几个条件中的其中之一,就会触发文件的切分:

  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes 参数的默认值为 1073741824,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的默认值为 10485760,即 10MB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量。
为什么是 Integer.MAX_VALUE ?

在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。相对偏移量和物理地址。

相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节

物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节

4 个字节刚好对应 Integer.MAX_VALUE ,如果大于 Integer.MAX_VALUE ,则不能用 4 个字节进行表示了。

索引文件切分过程

索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值,当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。

查找消息

offset 查询

偏移量索引由相对偏移量和物理地址组成。

003

可以通过如下命令解析.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
offset:0 position:0
offset:20 position:320
offset:43 position:1220

注意:offset 与 position 没有直接关系哦,由于存在数据删除和日志清理。

004

e.g. 如何查看 偏移量为 23 的消息?

Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在 00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即 offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

时间戳方式查询

在上文已经有所提及,通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。

时间戳索引索引格式

005

006+

 

e.g. 查找时间戳为 1557554753430 开始的消息?

  • 将 1557554753430 和每个日志分段中最大时间戳 largestTimeStamp 逐一对比,直到找到不小于 1557554753430 所对应的日志分段。日志分段中的 largestTimeStamp 的计算是先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于 0 ,则取该值,否则去该日志分段的最近修改时间。
  • 找到相应日志分段之后,使用二分法进行定位,与偏移量索引方式类似,找到不大于 1557554753430 最大索引项,也就是 [1557554753420 430]。
  • 拿着偏移量为 430 到偏移量索引文件中使用二分法找到不大于 430 最大索引项,即 [20,320] 。
  • 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。

注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的哦。因为数据的写入是各自追加。

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是 CreateTime 则无法保证顺序。

日志清理

日志清理,不是日志删除哦,这还是有所区别的,日志删除会在下文进行说明。

Kafka 提供两种日志清理策略:

日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除

日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。

Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值:delete,还可以选择 compact。

是否支持针对具体的 Topic 进行配置?

答案是肯定的,主题级别的配置项是 cleanup.policy 。

日志删除

配置 默认值 说明
log.retention.check.interval.ms 300000 (5分钟) 检测频率
log.retention.hours 168 (7天) 日志保留时间小时
log.retention.minutes 日志保留时间分钟
log.retention.ms 日志保留时间毫秒
file.delete.delay.ms 60000 (1分钟) 延迟执行删除时间
log.retention.bytes -1 无穷大 运行保留日志文件最大值
log.retention.bytes 1073741824 (1G) 日志文件最大值

Kafka 会周期性根据相应规则进行日志数据删除,保留策略有 3 种:基于时间的保留策略、基于日志大小的保留策略和基于日志其实偏移量的保留策略。

基于时间

日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天,log.retention.ms 优先级最高。

如何查找日志分段文件中已经过去的数据呢?

Kafka 依据日志分段中最大的时间戳进行定位,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间呢?

因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。

删除过程
  1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。
  2. 这些日志分段所有文件添加 上 .delete 后缀。
  3. 交由一个以 "delete-file" 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过 file.delete.delay.ms 进行设置

如果活跃的日志分段中也存在需要删除的数据时?

Kafka 会先切分出一个新的日志分段作为活跃日志分段,然后执行删除操作。

基于日志大小

日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日志分段的大小由 log.regment.bytes 进行设定。

删除过程

  1. 计算需要被删除的日志总大小 (当前日志文件大小-retention值)。
  2. 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
  3. 执行删除。

基于日志起始偏移量

基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段。

注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。

007

删除过程
  • 从头开始遍历每一个日志分段,日志分段 1 的下一个日志分段的起始偏移量为 11,小于 logStartOffset,将 日志分段 1 加入到删除队列中
  • 日志分段 2 的下一个日志分段的起始偏移量为 23,小于 logStartOffset,将 日志分段 2 加入到删除队列中
  • 日志分段 3 的下一个日志分段的起始偏移量为 30,大于 logStartOffset,则不进行删除。

Elasticsearch教程

索引(动词)
索引一个文档 就是存储一个文档到一个 索引 (名词)中以便它可以被检索和查询到。这非常类似于 SQL 语句中的 INSERT 关键词,除了文档已存在时新chaj文档会替换旧文档情况之外。

倒排索引
倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(inverted index)。带有倒排索引的文件我们称为倒排索引文件,简称倒排文件(inverted file)。

文档
存储在ES上的主要实体叫文档

文档类型(废弃)
在ES中,一个索引对象可以存储很多不同用途的对象。

映射
存储有关字段的信息,每一个文档类型都有自己的映射。

面向文档
在应用程序中对象很少只是一个简单的键和值的列表。通常,它们拥有更复杂的数据结构,可能包括日期、地理信息、其他对象或者数组等。

Elasticsearch面向文档的,意味着它存储整个对象或 文档。Elasticsearch不仅存储文档,而且 索引每个文档的内容使之可以被检索。在Elasticsearch 中,你对文档进行索引、检索、排序和过滤而不是对行列数据。这是一种完全不同的思考数据的方式,也是Elasticsearch 能支持复杂全文检索的原因。

Spring Cloud Alibaba 简单理解

1、Spring Cloud Alibaba

阿里巴巴提供的微服务开发一站式解决方案,是阿里巴巴开源中间件与 Spring Cloud 体系的融合。

Spring Cloud

SpringCloud 是若干个框架的集合,包括 spring-cloud-config、spring-cloud-bus 等近 20 个子项目,提供了

  • 服务治理
  • 服务网关
  • 智能路由
  • 负载均衡
  • 断路器
  • 监控跟踪
  • 分布式消息队列
  • 配置管理等领域的解决方案。

Spring Cloud 通过 Spring Boot 风格的封装,屏蔽掉了复杂的配置和实现原理,最终给开发者留出了一套简单易懂、容易部署的分布式系统开发工具包。

一般来说,Spring Cloud 包含以下组件,主要以 Netflix 开源为主:

1639101201-6904-e7019b9788478345059a9a3-720w

Spring Cloud Alibaba

与 Spring Cloud 一样,Spring Cloud Alibaba 也是一套微服务解决方案,包含开发分布式应用微服务的必需组件,方便开发者通过 Spring Cloud 编程模型轻松使用这些组件来开发分布式应用服务。

依托 Spring Cloud Alibaba,您只需要添加一些注解和少量配置,就可以将 Spring Cloud 应用接入阿里微服务解决方案,通过阿里中间件来迅速搭建分布式应用系统。

作为 Spring Cloud 体系下的新实现,Spring Cloud Alibaba 跟官方的组件或其它的第三方实现如 Netflix, Consul,Zookeeper 等对比,具备了更多的功能:

1639101201-3196-bb7cde19314acc8a4866c4b-720w

2、Spring Cloud Alibaba 包含组件

这幅图是 Spring Cloud Alibaba 系列组件,其中包含了阿里开源组件,阿里云商业化组件,以及集成Spring Cloud 组件。

1639101201-6677-41c441d222390c79a4cd53b-720w

阿里开源组件

  • Nacos:一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
  • Sentinel:把流量作为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
  • RocketMQ:开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
  • Dubbo:这个就不用多说了,在国内应用非常广泛的一款高性能 Java RPC 框架。
  • Seata:阿里巴巴开源产品,一个易于使用的高性能微服务分布式事务解决方案。
  • Arthas:开源的Java动态追踪工具,基于字节码增强技术,功能非常强大。

阿里商业化组件

  • Alibaba Cloud ACM:一款在分布式架构环境中对应用配置进行集中管理和推送的应用配置中心产品。
  • Alibaba Cloud OSS:阿里云对象存储服务(Object Storage Service,简称 OSS),是阿里云提供的云存储服务。
  • Alibaba Cloud SchedulerX:阿里中间件团队开发的一款分布式任务调度产品,提供秒级、精准的定时(基于 Cron 表达式)任务调度服务。

集成 Spring Cloud 组件

Spring Cloud Alibaba 作为整套的微服务解决组件,只依靠目前阿里的开源组件是不够的,更多的是集成当前的社区组件,所以 Spring Cloud Alibaba 可以集成 Zuul,OpenFeign等网关,也支持 Spring Cloud Stream 消息组件。

3、Spring Cloud Alibaba 功能

那么作为微服务解决方案, Spring Cloud Alibaba是如何支持微服务治理的各个功能。

服务注册与发现

Spring Cloud Alibaba 基于 Nacos 提供

spring-cloud-alibaba-starter-nacos-discovery

spring-cloud-alibaba-starter-nacos-config

实现了服务注册 & 配置管理功能。依靠 @EnableDiscoveryClient 进行服务的注册,兼容 RestTemplate & OpenFeign 的客户端进行服务调用。

适配 Spring Cloud 服务注册与发现标准,默认集成了 Ribbon 的支持。

支持多协议的服务调用

Spring Cloud 默认的服务调用依赖 OpenFeign 或 RestTemplate 使用 REST 进行调用。

使用 @DubboTransported 注解可将底层的 Rest 协议无缝切换成 Dubbo RPC 协议,进行 RPC 调用。

@FeignClient("dubbo-provider")
@DubboTransported(protocol = "dubbo")
public interface DubboFeignRestService {
  @GetMapping(value = "/param")
  String param(@RequestParam("param") String param);

  @PostMapping("/saveB")
  String saveB(@RequestParam("a") int a, @RequestParam("b") String b);
}

服务限流降级

作为稳定性的核心要素之一,服务限流和降级是微服务领域特别重要的一环,Spring Cloud Alibaba 基于 Sentinel,对 Spring 体系内基本所有的客户端,网关进行了适配,

默认支持

  1. WebServlet
  2. WebFlux
  3. OpenFeign
  4. RestTemplate
  5. Spring Cloud Gateway
  6. Zuul
  7. Dubbo
  8. RocketMQ

限流降级功能的接入。

Sentinel应用比较简单,只需引入 starter,即可生效,可以在运行时通过控制台实时修改限流降级规则,还支持查看限流降级 Metrics 监控。

微服务消息驱动

支持为微服务应用构建消息驱动能力,基于 Spring Cloud Stream 提供 Binder 的新实现: Spring Cloud Stream RocketMQ Binder,也新增了 Spring Cloud Bus 消息总线的新实现 Spring Cloud Bus RocketMQ。

分布式事务

使用 Seata 解决微服务场景下面临的分布式事务问题。

使用 @GlobalTransactional 注解,在微服务中传递事务上下文,可以对业务零侵入地解决分布式事务问题。

阿里云提供的商业能力

通过上面提到的OSS,schedulerx等组件,开发者可以在阿里云上实现对象存储,分布式任务调度等功能。

Spring Cloud Alibaba 虽然诞生时间不久,赖于阿里巴巴强大的技术影响力,已经成为微服务解决方案的重要选择之一。

为什么MySQL数据库索引选择使用B+树?

学过数据结构的一般对最基础的树都有所认识,因此我们就从与我们主题更为相近的二叉查找树开始。

一、二叉查找树

(1)二叉树简介:二叉查找树也称为有序二叉查找树,满足二叉查找树的一般性质,是指一棵空树具有如下性质:

1、任意节点左子树不为空,则左子树的值均小于根节点的值;

2、任意节点右子树不为空,则右子树的值均大于于根节点的值;

3、任意节点的左右子树也分别是二叉查找树;

4、没有键值相等的节点;
QQ截图20211117173142

上图为一个普通的二叉查找树,按照中序遍历的方式可以从小到大的顺序排序输出:2、3、5、6、7、8。

对上述二叉树进行查找,如查键值为5的记录,先找到根,其键值是6,6大于5,因此查找6的左子树,找到3;而5大于3,再找其右子树;一共找了3次。如果按2、3、5、6、7、8的顺序来找同样需求3次。用同样的方法在查找键值为8的这个记录,这次用了3次查找,而顺序查找需要6次。计算平均查找次数得:顺序查找的平均查找次数为(1+2+3+4+5+6)/ 6 = 3.3次,二叉查找树的平均查找次数为(3+3+3+2+2+1)/6=2.3次。二叉查找树的平均查找速度比顺序查找来得更快。

(2)局限性及应用

一个二叉查找树是由n个节点随机构成,所以,对于某些情况,二叉查找树会退化成一个有n个节点的线性链。如下图:

大家看上图,如果我们的根节点选择是最小或者最大的数,那么二叉查找树就完全退化成了线性结构。上图中的平均查找次数为(1+2+3+4+5+5)/6=3.16次,和顺序查找差不多。显然这个二叉树的查询效率就很低,因此若想最大性能的构造一个二叉查找树,需要这个二叉树是平衡的(这里的平衡从一个显著的特点可以看出这一棵树的高度比上一个输的高度要大,在相同节点的情况下也就是不平衡),从而引出了一个新的定义-平衡二叉树AVL。

二、AVL树

(1)简介

AVL树是带有平衡条件的二叉查找树,一般是用平衡因子差值判断是否平衡并通过旋转来实现平衡,左右子树树高不超过1,和红黑树相比,它是严格的平衡二叉树,平衡条件必须满足(所有节点的左右子树高度差不超过1)。不管我们是执行插入还是删除操作,只要不满足上面的条件,就要通过旋转来保持平衡,而旋转是非常耗时的,由此我们可以知道AVL树适合用于插入删除次数比较少,但查找多的情况。

从上面是一个普通的平衡二叉树,这张图我们可以看出,任意节点的左右子树的平衡因子差值都不会大于1。

(2)局限性

由于维护这种高度平衡所付出的代价比从中获得的效率收益还大,故而实际的应用不多,更多的地方是用追求局部而不是非常严格整体平衡的红黑树。当然,如果应用场景中对插入删除不频繁,只是对查找要求较高,那么AVL还是较优于红黑树。

(3)应用

1、Windows NT内核中广泛存在;

三、红黑树

(1)简介

一种二叉查找树,但在每个节点增加一个存储位表示节点的颜色,可以是red或black。通过对任何一条从根到叶子的路径上各个节点着色的方式的限制,红黑树确保没有一条路径会比其它路径长出两倍。它是一种弱平衡二叉树(由于是若平衡,可以推出,相同的节点情况下,AVL树的高度低于红黑树),相对于要求严格的AVL树来说,它的旋转次数变少,所以对于搜索、插入、删除操作多的情况下,我们就用红黑树。

(2)性质

1、每个节点非红即黑;
2、根节点是黑的;
3、每个叶节点(叶节点即树尾端NULL指针或NULL节点)都是黑的;
4、如果一个节点是红的,那么它的两儿子都是黑的;
5、对于任意节点而言,其到叶子点树NULL指针的每条路径都包含相同数目的黑节点;
6、每条路径都包含相同的黑节点;

QQ截图20211117173340

(3)应用

1、广泛用于C++的STL中,Map和Set都是用红黑树实现的;
2、著名的Linux进程调度Completely Fair Scheduler,用红黑树管理进程控制块,进程的虚拟内存区域都存储在一颗红黑树上,每个虚拟地址区域都对应红黑树的一个节点,左指针指向相邻的地址虚拟存储区域,右指针指向相邻的高地址虚拟地址空间;
3、IO多路复用epoll的实现采用红黑树组织管理sockfd,以支持快速的增删改查;
4、Nginx中用红黑树管理timer,因为红黑树是有序的,可以很快的得到距离当前最小的定时器;
5、Java中TreeMap的实现;

四、B/B+树

说了上述的三种树:二叉查找树、AVL和红黑树,似乎我们还没有摸到MySQL为什么要使用B+树作为索引的实现,不要急,接下来我们就先探讨一下什么是B树。

(1)简介

我们在MySQL中的数据一般是放在磁盘中的,读取数据的时候肯定会有访问磁盘的操作,磁盘中有两个机械运动的部分,分别是盘片旋转和磁臂移动。盘片旋转就是我们市面上所提到的多少转每分钟,而磁盘移动则是在盘片旋转到指定位置以后,移动磁臂后开始进行数据的读写。那么这就存在一个定位到磁盘中的块的过程,而定位是磁盘的存取中花费时间比较大的一块,毕竟机械运动花费的时候要远远大于电子运动的时间。当大规模数据存储到磁盘中的时候,显然定位是一个非常花费时间的过程,但是我们可以通过B树进行优化,提高磁盘读取时定位的效率。

为什么B类树可以进行优化呢?我们可以根据B类树的特点,构造一个多阶的B类树,然后在尽量多的在结点上存储相关的信息,保证层数尽量的少,以便后面我们可以更快的找到信息,磁盘的I/O操作也少一些,而且B类树是平衡树,每个结点到叶子结点的高度都是相同,这也保证了每个查询是稳定的。

总的来说,B/B+树是为了磁盘或其它存储设备而设计的一种平衡多路查找树(相对于二叉,B树每个内节点有多个分支),与红黑树相比,在相同的的节点的情况下,一颗B/B+树的高度远远小于红黑树的高度(在下面B/B+树的性能分析中会提到)。B/B+树上操作的时间通常由存取磁盘的时间和CPU计算时间这两部分构成,而CPU的速度非常快,所以B树的操作效率取决于访问磁盘的次数,关键字总数相同的情况下B树的高度越小,磁盘I/O所花的时间越少。

注意B-树就是B树,-只是一个符号。

(2)B树的性质

1、定义任意非叶子结点最多只有M个儿子,且M>2;
2、根结点的儿子数为[2, M];
3、除根结点以外的非叶子结点的儿子数为[M/2, M];
4、每个结点存放至少M/2-1(取上整)和至多M-1个关键字;(至少2个关键字)
5、非叶子结点的关键字个数=指向儿子的指针个数-1;
6、非叶子结点的关键字:K[1], K[2], …, K[M-1];且K[i] < K[i+1];
7、非叶子结点的指针:P[1], P[2], …, P[M];其中P[1]指向关键字小于K[1]的子树,P[M]指向关键字大于K[M-1]的子树,其它P[i]指向关键字属于(K[i-1], K[i])的子树;
8、所有叶子结点位于同一层;

这里只是一个简单的B树,在实际中B树节点中关键字很多的,上面的图中比如35节点,35代表一个key(索引),而小黑块代表的是这个key所指向的内容在内存中实际的存储位置,是一个指针。

五、B+树

(1)简介

B+树是应文件系统所需而产生的一种B树的变形树(文件的目录一级一级索引,只有最底层的叶子节点(文件)保存数据)非叶子节点只保存索引,不保存实际的数据,数据都保存在叶子节点中,这不就是文件系统文件的查找吗?

我们就举个文件查找的例子:有3个文件夹a、b、c, a包含b,b包含c,一个文件yang.c,a、b、c就是索引(存储在非叶子节点), a、b、c只是要找到的yang.c的key,而实际的数据yang.c存储在叶子节点上。

所有的非叶子节点都可以看成索引部分!

(2)B+树的性质(下面提到的都是和B树不相同的性质)

1、非叶子节点的子树指针与关键字个数相同;
2、非叶子节点的子树指针p[i],指向关键字值属于[k[i],k[i+1]]的子树.(B树是开区间,也就是说B树不允许关键字重复,B+树允许重复);
3、为所有叶子节点增加一个链指针;
4、所有关键字都在叶子节点出现(稠密索引). (且链表中的关键字恰好是有序的);
5、非叶子节点相当于是叶子节点的索引(稀疏索引),叶子节点相当于是存储(关键字)数据的数据层;
6、更适合于文件系统;

QQ截图20211117173414

非叶子节点(比如5,28,65)只是一个key(索引),实际的数据存在叶子节点上(5,8,9)才是真正的数据或指向真实数据的指针。

(3)应用  

1、B和B+树主要用在文件系统以及数据库做索引,比如MySQL;

六、B/B+树性能分析

n个节点的平衡二叉树的高度为H(即logn),而n个节点的B/B+树的高度为logt((n+1)/2)+1;
若要作为内存中的查找表,B树却不一定比平衡二叉树好,尤其当m较大时更是如此。因为查找操作CPU的时间在B-树上是O(mlogtn)=O(lgn(m/lgt)),而m/lgt>1;所以m较大时O(mlogtn)比平衡二叉树的操作时间大得多。因此在内存中使用B树必须取较小的m。(通常取最小值m=3,此时B-树中每个内部结点可以有2或3个孩子,这种3阶的B-树称为2-3树)。

七、为什么说B+树比B树更适合数据库索引?

1、 B+树的磁盘读写代价更低:B+树的内部节点并没有指向关键字具体信息的指针,因此其内部节点相对B树更小,如果把所有同一内部节点的关键字存放在同一盘块中,那么盘块所能容纳的关键字数量也越多,一次性读入内存的需要查找的关键字也就越多,相对IO读写次数就降低了。

2、B+树的查询效率更加稳定:由于非终结点并不是最终指向文件内容的结点,而只是叶子结点中关键字的索引。所以任何关键字的查找必须走一条从根结点到叶子结点的路。所有关键字查询的路径长度相同,导致每一个数据的查询效率相当。

3、由于B+树的数据都存储在叶子结点中,分支结点均为索引,方便扫库,只需要扫一遍叶子结点即可,但是B树因为其分支结点同样存储着数据,我们要找到具体的数据,需要进行一次中序遍历按序来扫,所以B+树更加适合在区间查询的情况,所以通常B+树用于数据库索引。

PS:我在知乎上看到有人是这样说的,我感觉说的也挺有道理的:

他们认为数据库索引采用B+树的主要原因是:

B树在提高了IO性能的同时并没有解决元素遍历的我效率低下的问题,正是为了解决这个问题,B+树应用而生。B+树只需要去遍历叶子节点就可以实现整棵树的遍历。而且在数据库中基于范围的查询是非常频繁的,而B树不支持这样的操作或者说效率太低。

总结

数据库使用B+树肯定是为了提升查找效率。具体如何提升查找效率?

查找数据,最简单的方式是顺序查找。但是对于几十万上百万,甚至上亿的数据库查询就很慢了。

所以要对查找的方式进行优化,熟悉的二分查找,二叉树可以把速度提升到O(log(n,2)),查询的瓶颈在于树的深度,最坏的情况要查找到二叉树的最深层,由于每查找深一层,就要访问更深一层的索引文件。在多达数G的索引文件中,这将是很大的开销。所以尽量把数据结构设计的更为‘矮胖’一点就可以减少访问的层数。

在众多的解决方案中,B-/B+树很好的适合。简而言之就是中间节点可以多余两个子节点,而且中间的元素可以是一个域。

相比B-树,B+树的父节点也必须存在于子节点中,是其中最大或者最小元素,B+树的节点只存储索引key值,具体信息的地址存在于叶子节点的地址中。这就使以页为单位的索引中可以存放更多的节点,减少更多的I/O支出。

因此,B+树成为了数据库比较优秀的数据结构,MySQL中MyIsAM和InnoDB都是采用的B+树结构;不同的是前者是非聚集索引,后者主键是聚集索引,所谓聚集索引是物理地址连续存放的索引,在取区间的时候,查找速度非常快,但同样的,插入的速度也会受到影响而降低。聚集索引的物理位置使用链表来进行存储。

分布式锁的三种实现方式

分布式锁三种实现方式:

  1. 基于数据库实现分布式锁;
  2. 基于缓存(Redis等)实现分布式锁;
  3. 基于Zookeeper实现分布式锁;

一, 基于数据库实现分布式锁

1. 悲观锁

利用select … where … for update 排他锁

*其他附加功能与实现一基本一致,这里需要注意的是“where name=lock ”,name字段必须要走索引,否则会锁表。有些情况下,比如表不大,mysql优化器会不走这个索引,导致锁表问题。

2. 乐观锁

所谓乐观锁与前边最大区别在于基于CAS思想,是不具有互斥性,不会产生锁等待而消耗资源,操作过程中认为不存在并发冲突,只有update version失败后才能觉察到。

我们的抢购、秒杀就是用了这种实现以防止超卖。通过增加递增的版本号字段实现乐观锁

1637136916-5676-e3b761944edd6611defde7ee28-0

二, 基于缓存(Redis等)实现分布式锁

1. 使用命令介绍:

(1)SETNX
SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
(2)expire
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
(3)delete
delete key:删除key

在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。

2. 实现思想:

(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。

3. 分布式锁的简单实现代码:

package com.demo.app;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;

import java.util.List;
import java.util.UUID;

/**
 * 分布式锁的简单实现代码
 */
class DistributedLock {

    private final JedisPool jedisPool;

    public DistributedLock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    /**
     * 加锁
     *
     * @param lockName       锁的key
     * @param acquireTimeout 获取超时时间
     * @param timeout        锁的超时时间
     * @return 锁标识
     */

    public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {

        Jedis conn = null;
        String retIdentifier = null;
        try {
            // 获取连接
            conn = jedisPool.getResource();
            // 随机生成一个value
            String identifier = UUID.randomUUID().toString();
            // 锁名,即key值
            String lockKey = "lock:" + lockName;
            // 超时时间,上锁后超过此时间则自动释放锁
            int lockExpire = (int) (timeout / 1000);
            // 获取锁的超时时间,超过这个时间则放弃获取锁
            long end = System.currentTimeMillis() + acquireTimeout;

            while (System.currentTimeMillis() < end) {
                if (conn.setnx(lockKey, identifier) == 1) {
                    conn.expire(lockKey, lockExpire);
                    // 返回value值,用于释放锁时间确认
                    retIdentifier = identifier;
                    return retIdentifier;
                }

                // 返回-1代表key没有设置超时时间,为key设置一个超时时间
                if (conn.ttl(lockKey) == -1) {
                    conn.expire(lockKey, lockExpire);
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retIdentifier;
    }

    /**
     * 释放锁
     *
     * @param lockName   锁的key
     * @param identifier 释放锁的标识
     * @return
     */
    public boolean releaseLock(String lockName, String identifier) {
        Jedis conn = null;
        String lockKey = "lock:" + lockName;
        boolean retFlag = false;

        try {
            conn = jedisPool.getResource();
            while (true) {
                // 监视lock,准备开始事务
                conn.watch(lockKey);
                // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
                if (identifier.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                    if (results == null) {
                        continue;
                    }
                    retFlag = true;
                }
                conn.unwatch();
                break;
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retFlag;
    }
}

4. 测试刚才实现的分布式锁

例子中使用50个线程模拟秒杀一个商品,使用–运算符来实现商品减少,从结果有序性就可以看出是否为加锁状态。
模拟秒杀服务,在其中配置了jedis线程池,在初始化的时候传给分布式锁,供其使用。

public class Service {
    private static JedisPool pool = null;
    private DistributedLock lock = new DistributedLock(pool);
    int n = 500;
    
    static {
        JedisPoolConfig config = new JedisPoolConfig();
        // 设置最大连接数
        config.setMaxTotal(200);
        // 设置最大空闲数
        config.setMaxIdle(8);
        // 设置最大等待时间
        config.setMaxWaitMillis(1000 * 100);
        // 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
        config.setTestOnBorrow(true);
        pool = new JedisPool(config, "127.0.0.1", 6379, 3000);

    }

    public void secKill() {
        // 返回锁的value值,供释放锁时候进行判断
        String identifier = lock.lockWithTimeout("resource", 5000, 1000);
        System.out.println(Thread.currentThread().getName() + "获得了锁");
        System.out.println(--n);
        lock.releaseLock("resource", identifier);

    }
}

模拟线程进行秒杀服务;

public class ThreadA extends Thread {
    private Service service;
    public ThreadA(Service service) {
        this.service = service;
    }
    
    @Override
    public void run() {
        service.secKill();
    }
}

public class Test {
    public static void main(String[] args) {
        Service service = new Service();
        for (int i = 0; i < 50; i++) {
            ThreadA threadA = new ThreadA(service);
            threadA.start();
        }
    }
}

结果如下,结果为有序的:

1637136917-5764-ceea4c8cbfa184228a02822f99-1

若注释掉使用锁的部分:

public void secKill() {
    // 返回锁的value值,供释放锁时候进行判断
    //String indentifier = lock.lockWithTimeout("resource", 5000, 1000);
    System.out.println(Thread.currentThread().getName() + "获得了锁");
    System.out.println(--n);
    //lock.releaseLock("resource", indentifier);
}

从结果可以看出,有一些是异步进行的:

1637136917-6356-ceea4c8cbfa184228a02822f99-2

三, 基于Zookeeper实现分布式锁

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:

  • (1)创建一个目录mylock;
  • (2)线程A想获取锁就在mylock目录下创建临时顺序节点;
  • (3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
  • (4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
  • (5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

这里推荐一个Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。

实现源码如下:

public class ZkLock implements DistributionLock {

    private String zkAddress = "zk_adress";
    private static final String root = "package root";
    private CuratorFramework zkClient;
    private final String LOCK_PREFIX = "/lock_";
    
    @Bean
    public DistributionLock initZkLock() {
        if (StringUtils.isBlank(root)) {
            throw new RuntimeException("zookeeper 'root' can't be null");
        }
        zkClient = CuratorFrameworkFactory
                .builder()
                .connectString(zkAddress)
                .retryPolicy(new RetryNTimes(2000, 20000))
                .namespace(root)
                .build();
        zkClient.start();
        return this;
    }

    public boolean tryLock(String lockName) {
        lockName = LOCK_PREFIX + lockName;
        boolean locked = true;

        try {
            Stat stat = zkClient.checkExists().forPath(lockName);
            if (stat == null) {
                log.info("tryLock:{}", lockName);
                stat = zkClient.checkExists().forPath(lockName);
                if (stat == null) {
                    zkClient
                            .create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.EPHEMERAL)
                            .forPath(lockName, "1".getBytes());
                } else {
                    log.warn("double-check stat.version:{}", stat.getAversion());
                    locked = false;
                }
            } else {
                log.warn("check stat.version:{}", stat.getAversion());
                locked = false;
            }

        } catch (Exception e) {
            locked = false;
        }
        return locked;
    }

    public void release(String lockName) {
        lockName = LOCK_PREFIX + lockName;
        try {
            zkClient
                    .delete()
                    .guaranteed()
                    .deletingChildrenIfNeeded()
                    .forPath(lockName);
            log.info("release:{}", lockName);
        } catch (Exception e) {
            log.error("删除", e);
        }

    }

    public void setZkAddress(String zkAddress) {
        this.zkAddress = zkAddress;
    }

}

优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。

缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式。

四,对比

数据库分布式锁实现

缺点:

1.db操作性能较差,并且有锁表的风险

2.非阻塞操作失败后,需要轮询,占用cpu资源;

3.长时间不commit或者长时间轮询,可能会占用较多连接资源

Redis(缓存)分布式锁实现

缺点:

1.锁删除失败 过期时间不好控制

2.非阻塞,操作失败后,需要轮询,占用cpu资源;

ZK分布式锁实现

缺点:性能不如redis实现,主要原因是写操作(获取锁释放锁)都需要在Leader上执行,然后同步到follower。

ZooKeeper有较好的性能和可靠性。

从理解的难易程度角度(从低到高)数据库 > 缓存 > Zookeeper

从实现的复杂性角度(从低到高)Zookeeper >= 缓存 > 数据库

从性能角度(从高到低)缓存 > Zookeeper >= 数据库

从可靠性角度(从高到低)Zookeeper > 缓存 > 数据库

12328
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |