Java 17 新特性

Java 17 在 2021 年 9 月 14 日正式发布,Java 17 是一个长期支持(LTS)版本,这次更新共带来 14 个新功能。

OpenJDK Java 17 下载:https://jdk.java.net/archive/

OpenJDK Java 17 文档:https://openjdk.java.net/projects/jdk/17/

1. JEP 306: 恢复始终严格的浮点语义

既然是恢复严格的浮点语义,那么说明在某个时间点之前,是始终严格的浮点语义的。其实在 Java SE 1.2 之前,所有的浮点计算都是严格的,但是以当初的情况来看,过于严格的浮点计算在当初流行的 x86 架构和 x87 浮点协议处理器上运行,需要大量的额外的指令开销,所以在 Java SE 1.2 开始,需要手动使用关键字 strictfp(strict float point) 才能启用严格的浮点计算。

但是在 2021 年的今天,硬件早已发生巨变,当初的问题已经不存在了,所以从 Java 17 开始,恢复了始终严格的浮点语义这一特性。

扩展strictfp 是 Java 中的一个关键字,大多数人可能没有注意过它,它可以用在类、接口或者方法上,被 strictfp 修饰的部分中的 float 和 double 表达式会进行严格浮点计算。

下面是一个示例,其中的 testStrictfp() 被 strictfp 修饰。

package com.wdbyte;

public class Main {
    public static void main(String[] args) {
        testStrictfp();
    }

    public strictfp static void testStrictfp() {
        float aFloat = 0.6666666666666666666f;
        double aDouble = 0.88888888888888888d;
        double sum = aFloat + aDouble;
        System.out.println("sum: " + sum);
    }
}

2. JEP 356:增强的伪随机数生成器

为伪随机数生成器 RPNG(pseudorandom number generator)增加了新的接口类型和实现,让在代码中使用各种 PRNG 算法变得容易许多。

这次增加了 RandomGenerator 接口,为所有的 PRNG 算法提供统一的 API,并且可以获取不同类型的 PRNG 对象流。同时也提供了一个新类 RandomGeneratorFactory 用于构造各种 RandomGenerator 实例,在 RandomGeneratorFactory 中使用 ServiceLoader.provider 来加载各种 PRNG 实现。

下面是一个使用示例:随便选择一个 PRNG 算法生成 5 个 10 以内的随机数。

package com.wdbyte.java17;

import java.util.Date;
import java.util.random.RandomGenerator;
import java.util.random.RandomGeneratorFactory;
import java.util.stream.Stream;

/**
 * @author niulang
 */
public class JEP356 {

    public static void main(String[] args) {
        RandomGeneratorFactory<RandomGenerator> l128X256MixRandom = RandomGeneratorFactory.of("L128X256MixRandom");
        // 使用时间戳作为随机数种子
        RandomGenerator randomGenerator = l128X256MixRandom.create(System.currentTimeMillis());
        for (int i = 0; i < 5; i++) {
            System.out.println(randomGenerator.nextInt(10));
        }
    }
}

得到输出:

7
3
4
4
6

你也可以遍历出所有的 PRNG 算法。

RandomGeneratorFactory.all().forEach(factory -> {
    System.out.println(factory.group() + ":" + factory.name());
});

得到输出:

LXM:L32X64MixRandom
LXM:L128X128MixRandom
LXM:L64X128MixRandom
Legacy:SecureRandom
LXM:L128X1024MixRandom
LXM:L64X128StarStarRandom
Xoshiro:Xoshiro256PlusPlus
LXM:L64X256MixRandom
Legacy:Random
Xoroshiro:Xoroshiro128PlusPlus
LXM:L128X256MixRandom
Legacy:SplittableRandom
LXM:L64X1024MixRandom

可以看到 Legacy:Random 也在其中,新的 API 兼容了老的 Random 方式,所以你也可以使用新的 API 调用 Random 类生成随机数。

// 使用 Random
RandomGeneratorFactory<RandomGenerator> l128X256MixRandom = RandomGeneratorFactory.of("Random");
// 使用时间戳作为随机数种子
RandomGenerator randomGenerator = l128X256MixRandom.create(System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
    System.out.println(randomGenerator.nextInt(10));
}

扩展阅读:增强的伪随机数生成器

3. JEP 382:使用新的 macOS 渲染库

macOS 为了提高图形的渲染性能,在 2018 年 9 月抛弃了之前的 OpenGL 渲染库 ,而使用了 Apple Metal 进行代替。Java 17 这次更新开始支持 Apple Metal,不过对于 API 没有任何改变,这一些都是内部修改。

扩展阅读:macOS Mojave 10.14 Release NotesApple Metal

4. JEP 391:支持 macOS/AArch64 架构

起因是 Apple 在 2020 年 6 月的 WWDC 演讲中宣布,将开启一项长期的将 Macintosh 计算机系列从 x64 过度到 AArch64 的长期计划,因此需要尽快的让 JDK 支持 macOS/AArch64 。

Linux 上的 AArch64 支持以及在 Java 16 时已经支持,可以查看之前的文章了解。

扩展:Java 16 新功能介绍 – JEP 386

5. JEP 398:删除已弃用的 Applet API

Applet 是使用 Java 编写的可以嵌入到 HTML 中的小应用程序,嵌入方式是通过普通的 HTML 标记语法,由于早已过时,几乎没有场景在使用了。

示例:嵌入 Hello.class

<applet code="Hello.class" height=200 width=200></applet>

Applet API 在 Java 9 时已经标记了废弃,现在 Java 17 中将彻底删除。

6. JEP 403:更强的 JDK 内部封装

如 Java 16 的 JEP 396 中描述的一样,为了提高 JDK 的安全性,使 --illegal-access 选项的默认模式从允许更改为拒绝。通过此更改,JDK 的内部包和 API(关键内部 API 除外)将不再默认打开。

但是在 Java 17 中,除了 sun.misc.Unsafe ,使用 --illegal-access 命令也不能打开 JDK 内部的强封装模式了,除了 sun.misc.Unsafe API .

在 Java 17 中使用 --illegal-access 选项将会得到一个命令已经移除的警告。

➜  bin ./java -version
openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)
➜  bin ./java --illegal-access=warn
OpenJDK 64-Bit Server VM warning: Ignoring option --illegal-access=warn; support was removed in 17.0

扩展阅读:JEP 403:更强的 JDK 内部封装Java 16 新功能介绍

7. JEP 406:switch 的类型匹配(预览)

如 instanceof 一样,为 switch 也增加了类型匹配自动转换功能。

在之前,使用 instanceof 需要如下操作:

if (obj instanceof String) {
    String s = (String) obj;    // grr...
    ...
}

多余的类型强制转换,而现在:

if (obj instanceof String s) {
    // Let pattern matching do the work!
    ...
}

switch 也可以使用类似的方式了。

static String formatterPatternSwitch(Object o) {
    return switch (o) {
        case Integer i -> String.format("int %d", i);
        case Long l    -> String.format("long %d", l);
        case Double d  -> String.format("double %f", d);
        case String s  -> String.format("String %s", s);
        default        -> o.toString();
    };
}

对于 null 值的判断也有了新的方式。

// Java 17 之前
static void testFooBar(String s) {
    if (s == null) {
        System.out.println("oops!");
        return;
    }
    switch (s) {
        case "Foo", "Bar" -> System.out.println("Great");
        default           -> System.out.println("Ok");
    }
}
// Java 17
static void testFooBar(String s) {
    switch (s) {
        case null         -> System.out.println("Oops");
        case "Foo", "Bar" -> System.out.println("Great");
        default           -> System.out.println("Ok");
    }
}

扩展阅读: JEP 406:switch 的类型匹配(预览)

8. JEP 407:移除 RMI Activation

移除了在 JEP 385 中被标记废除的 RMI(Remote Method Invocation)Activation,但是 RMI 其他部分不会受影响。

RMI Activation 在 Java 15 中的 JEP 385 已经被标记为过时废弃,至今没有收到不良反馈,因此决定在 Java 17 中正式移除。

扩展阅读: JEP 407:移除 RMI Activation

9. JEP 409:密封类(Sealed Classes)

Sealed Classes 在 Java 15 中的 JEP 360 中提出,在 Java 16 中的 JEP 397 再次预览,现在 Java 17 中成为正式的功能,相比 Java 16 并没有功能变化,这里不再重复介绍,想了解的可以参考之前文章。

扩展阅读:Java 16 新功能介绍JEP 409: Sealed Classes

10. JEP 401:移除实验性的 AOT 和 JIT 编译器

在 Java 9 的 JEP 295 中,引入了实验性的提前编译 jaotc 工具,但是这个特性自从引入依赖用处都不太大,而且需要大量的维护工作,所以在 Java 17 中决定删除这个特性。

主要移除了三个 JDK 模块:

  1. jdk.aot – jaotc 工具。
  2. Jdk.internal.vm.compiler – Graal 编译器。
  3. jdk.internal.vm.compiler.management

同时也移除了部分与 AOT 编译相关的 HotSpot 代码:

  1. src/hotspot/share/aot — dumps and loads AOT code
  2. Additional code guarded by #if INCLUDE_AOT

11. JEP 411:弃用 Security Manager

Security Manager 在 JDK 1.0 时就已经引入,但是它一直都不是保护服务端以及客户端 Java 代码的主要手段,为了 Java 的继续发展,决定弃用 Security Manager,在不久的未来进行删除。

@Deprecated(since="17", forRemoval=true)
public class SecurityManager {
	// ...
}

12. JEP 412:外部函数和内存 API (孵化)

新的 API 允许 Java 开发者与 JVM 之外的代码和数据进行交互,通过调用外部函数,可以在不使用 JNI 的情况下调用本地库。

这是一个孵化功能;需要添加--add-modules jdk.incubator.foreign来编译和运行 Java 代码。

历史

  • Java 14 JEP 370引入了外部内存访问 API(孵化器)。
  • Java 15 JEP 383引入了外部内存访问 API(第二孵化器)。
  • Java 16 JEP 389引入了外部链接器 API(孵化器)。
  • Java 16 JEP 393引入了外部内存访问 API(第三孵化器)。
  • Java 17 JEP 412引入了外部函数和内存 API(孵化器)。

扩展阅读:JEP 412:外部函数和内存 API (孵化)

13. JEP 414:Vector API(二次孵化)

在 Java 16 中引入一个新的 API 来进行向量计算,它可以在运行时可靠的编译为支持的 CPU 架构,从而实现更优的计算能力。

现在 Java 17 中改进了 Vector API 性能,增强了例如对字符的操作、字节向量与布尔数组之间的相互转换等功能。

14. JEP 415:指定上下文的反序列化过滤器

Java 中的序列化一直都是非常重要的功能,如果没有序列化功能,Java 可能都不会占据开发语言的主导地位,序列化让远程处理变得容易和透明,同时也促进了 Java EE 的成功。

但是 Java 序列化的问题也很多,它几乎会犯下所有的可以想象的错误,为开发者带来持续的维护工作。但是要说明的是序列化的概念是没有错的,把对象转换为可以在 JVM 之间自由传输,并且可以在另一端重新构建的能力是完全合理的想法,问题在于 Java 中的序列化设计存在风险,以至于爆出过很多和序列化相关的漏洞。

反序列化危险的一个原因是,有时候我们不好验证将要进行反序列化的内容是否存在风险,而传入的数据流可以自由引用对象,很有可能这个数据流就是攻击者精心构造的恶意代码。

所以,JEP 415 允许在反序列化时,通过一个过滤配置,来告知本次反序列化允许或者禁止操作的类,反序列化时碰到被禁止的类,则会反序列化失败。

14.1. 反序列化示例

假设 Dog 类中的 Poc 是恶意构造的类,但是正常反序列化是可以成功的。

package com.wdbyte.java17;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
 * @author niulang
 */
public class JEP415 {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Dog dog = new Dog("哈士奇");
        dog.setPoc(new Poc());
        // 序列化 - 对象转字节数组
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);) {
            objectOutputStream.writeObject(dog);
        }
        byte[] bytes = byteArrayOutputStream.toByteArray();
        // 反序列化 - 字节数组转对象
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
        Object object = objectInputStream.readObject();
        System.out.println(object.toString());
    }
}

class Dog implements Serializable {
    private String name;
    private Poc poc;

    public Dog(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Dog{" + "name='" + name + '\'' + '}';
    }
		// get...set...
}

class Poc implements Serializable{

}

输出结果:

Dog{name='哈士奇'}

14.2. 反序列化过滤器

在 Java 17 中可以自定义反序列化过滤器,拦截不允许的类。

package com.wdbyte.java17;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputFilter;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
 * @author niulang
 */
public class JEP415 {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Dog dog = new Dog("哈士奇");
        dog.setPoc(new Poc());
        // 序列化 - 对象转字节数组
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);) {
            objectOutputStream.writeObject(dog);
        }
        byte[] bytes = byteArrayOutputStream.toByteArray();
        // 反序列化 - 字节数组转对象
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
        // 允许 com.wdbyte.java17.Dog 类,允许 java.base 中的所有类,拒绝其他任何类
        ObjectInputFilter filter = ObjectInputFilter.Config.createFilter(
                        "com.wdbyte.java17.Dog;java.base/*;!*");
        objectInputStream.setObjectInputFilter(filter);
        Object object = objectInputStream.readObject();
        System.out.println(object.toString());
    }
}

class Dog implements Serializable {
    private String name;
    private Poc poc;

    public Dog(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Dog{" + "name='" + name + '\'' + '}';
    }
		// get...set...
}

class Poc implements Serializable{
}

这时反序列化会得到异常。

Exception in thread "main" java.io.InvalidClassException: filter status: REJECTED
	at java.base/java.io.ObjectInputStream.filterCheck(ObjectInputStream.java:1412)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2053)
	at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1907)
	....

扩展阅读:JEP 415:指定上下文的反序列化过滤器

 

文章转自:https://www.cnblogs.com/niumoo/p/15522730.html

kafka-configs.sh命令行使用方法

kafka的一个动态配置工具,可以通过命令行对系统进行动态配置,但并不是所有的配置项都支持的。

$ ./kafka-configs.bat
        This tool helps to manipulate and describe entity config for a topic, client, user or broker
        Option                                 Description
        ------                                 -----------
        --add-config <String>                  Key Value pairs of configs to add.
        Square brackets can be used to group
        values which contain commas: 'k1=v1,
        k2=[v1,v2,v2],k3=v3'. The following
        is a list of valid configurations:
        For entity-type 'topics':
        cleanup.policy
        compression.type
        delete.retention.ms
        file.delete.delay.ms
        flush.messages
        flush.ms
        follower.replication.throttled.
        replicas
        index.interval.bytes
        leader.replication.throttled.replicas
        max.compaction.lag.ms
        max.message.bytes
        message.downconversion.enable
        message.format.version
        message.timestamp.difference.max.ms
        message.timestamp.type
        min.cleanable.dirty.ratio
        min.compaction.lag.ms
        min.insync.replicas
        preallocate
        retention.bytes
        retention.ms
        segment.bytes
        segment.index.bytes
        segment.jitter.ms
        segment.ms
        unclean.leader.election.enable
        For entity-type 'brokers':
        log.message.timestamp.type
        ssl.client.auth
        log.retention.ms
        sasl.login.refresh.window.jitter
        sasl.kerberos.ticket.renew.window.
        factor
        log.preallocate
        log.index.size.max.bytes
        sasl.login.refresh.window.factor
        ssl.truststore.type
        ssl.keymanager.algorithm
        log.cleaner.io.buffer.load.factor
        sasl.login.refresh.min.period.seconds
        ssl.key.password
        background.threads
        log.retention.bytes
        ssl.trustmanager.algorithm
        log.segment.bytes
        max.connections.per.ip.overrides
        log.cleaner.delete.retention.ms
        log.segment.delete.delay.ms
        min.insync.replicas
        ssl.keystore.location
        ssl.cipher.suites
        log.roll.jitter.ms
        log.cleaner.backoff.ms
        sasl.jaas.config
        principal.builder.class
log.flush.interval.ms
        log.cleaner.max.compaction.lag.ms
        max.connections
        log.cleaner.dedupe.buffer.size
        log.flush.interval.messages
        advertised.listeners
        num.io.threads
        listener.security.protocol.map
        log.message.downconversion.enable
        sasl.enabled.mechanisms
        sasl.login.refresh.buffer.seconds
        ssl.truststore.password
        listeners
        metric.reporters
        ssl.protocol
        sasl.kerberos.ticket.renew.jitter
        ssl.keystore.password
        sasl.mechanism.inter.broker.protocol
        log.cleanup.policy
        sasl.kerberos.principal.to.local.rules
        sasl.kerberos.min.time.before.relogin
        num.recovery.threads.per.data.dir
        log.cleaner.io.max.bytes.per.second
        log.roll.ms
        ssl.endpoint.identification.algorithm
        unclean.leader.election.enable
        message.max.bytes
        log.cleaner.threads
        log.cleaner.io.buffer.size
        max.connections.per.ip
        sasl.kerberos.service.name
        ssl.provider
        follower.replication.throttled.rate
        log.index.interval.bytes
        log.cleaner.min.compaction.lag.ms
        log.message.timestamp.difference.max.
        ms
        ssl.enabled.protocols
        log.cleaner.min.cleanable.ratio
        replica.alter.log.dirs.io.max.bytes.
        per.second
        ssl.keystore.type
        ssl.secure.random.implementation
        ssl.truststore.location
        sasl.kerberos.kinit.cmd
        leader.replication.throttled.rate
        num.network.threads
        compression.type
        num.replica.fetchers
        For entity-type 'users':
        request_percentage
        producer_byte_rate
        SCRAM-SHA-256
        SCRAM-SHA-512
        consumer_byte_rate
        For entity-type 'clients':
        request_percentage
        producer_byte_rate
        consumer_byte_rate
        Entity types 'users' and 'clients' may
        be specified together to update
        config for clients of a specific
        user.
        --alter                                Alter the configuration for the entity.
        --bootstrap-server <String: server to  The Kafka server to connect to. This
        connect to>                            is required for describing and
        altering broker configs.
        --command-config <String: command      Property file containing configs to be
        config property file>                  passed to Admin Client. This is used
        only with --bootstrap-server option
        for describing and altering broker
        configs.
        --delete-config <String>               config keys to remove 'k1,k2'
        --describe                             List configs for the given entity.
        --entity-default                       Default entity name for
        clients/users/brokers (applies to
        corresponding entity type in command
        line)
        --entity-name <String>                 Name of entity (topic name/client
        id/user principal name/broker id)
        --entity-type <String>                 Type of entity
        (topics/clients/users/brokers)
        --force                                Suppress console prompts
        --help                                 Print usage information.
        --version                              Display Kafka version.
        --zookeeper <String: urls>             REQUIRED: The connection string for
        the zookeeper connection in the form
        host:port. Multiple URLS can be
        given to allow fail-over.

网上找了一张图,翻译解释说明:

001

使用方法(例子):

  • 增加/编辑配置

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type topics –entity-name topicName –add-config ‘k1=v1, k2=v2, k3=v3′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type brokers –entity-name 1 –add-config ‘retention.bytes=1024072′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type brokers –entity-default –add-config ‘retention.bytes=1024072′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type clients –entity-default –add-config ‘retention.bytes=1024072′

  • 删除配置

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type topics –entity-name topicName –delete-config ‘k1,k2,k3’
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type clients –entity-name clientId –delete-config ‘k1,k2,k3’
./kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type brokers –entity-name $brokerId –delete-config ‘k1,k2,k3’
./kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type brokers –entity-default –delete-config ‘k1,k2,k3’

  • 列出配置项的描述信息

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –entity-type topics –entity-name topicName –describe
./kafka-configs.sh–bootstrap-server localhost:9092 –entity-type brokers –entity-name $brokerId –describe
./kafka-configs.sh –bootstrap-server localhost:9092 –entity-type brokers –entity-default –describe
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –entity-type users –entity-name user1 –entity-type clients –entity-name clientA –describe

 

 

WRONGTYPE Operation against a key holding the wrong kind of value

redis报错:

2022/07/20 18:12:32.277 ERROR [0a2cd65e1bef3f0c][task-304318095] o.s.a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.test.app.system.bus.listener.DeviceUpwardListener.onApplicationEvent(com.test.app.system.bus.e.DeviceEvent)
redis.clients.jedis.exceptions.JedisDataException: WRONGTYPE Operation against a key holding the wrong kind of value
at redis.clients.jedis.Protocol.processError(Protocol.java:132) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.Protocol.process(Protocol.java:166) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.Protocol.read(Protocol.java:220) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:318) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.Connection.getIntegerReply(Connection.java:260) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.BinaryJedis.hexists(BinaryJedis.java:1034) ~[jedis-3.1.0.jar!/:na]

 

原因是写入redis的key存在两个(多个),但是类型不一样;

如:

jedis.set(“test”,1);

jedis.hset(“test”,”key”1);

此时testkey就存在两种类型且两个,则有可能会报这个错误。

解决方法就是删除重复的键值,建议每一种set方法都加入一个redis方法名作为前缀,这样子就不可能会出现重复的键。

如:

jedis.set(“set:test”,1);

jedis.hset(“hset:test”,”key”1);

Kafka 日志存储的问题

在进行详解之前,我想先声明一下,本次我们进行讲解说明的是 Kafka 消息存储的信息文件内容,不是所谓的 Kafka 服务器运行产生的日志文件,这一点希望大家清楚。

Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。每个主题又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。也就是该文要着重关注的内容。我们根据如下的图进行进一步说明:

001

图中,创建了一个 demo-topic 主题,其存在 7 个 Parition,对应的每个 Parition 下存在一个 [Topic-Parition] 命名的消息日志文件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的文件,比如:.index、.timestamp、.log、.snapshot 等,其中,文件名一致的文件集合就称为 LogSement。我们先留有这样的一个整体的日志结构概念,接下来我们一一的进行详细的说明其中的设计。

LogSegment

我们已经知道分区日志文件中包含很多的 LogSegment ,Kafka 日志追加是顺序写入的,LogSegment 可以减小日志文件的大小,进行日志删除的时候和数据查找的时候可以快速定位。同时,ActiveLogSegment 也就是活跃的日志分段拥有文件拥有写入权限,其余的 LogSegment 只有只读的权限。

日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log 三种类型。其他的日志类型功能作用,请查询下面图表:

类别 作用
.index 偏移量索引文件
.timestamp 时间戳索引文件
.log 日志文件
.snaphot 快照文件
.deleted
.cleaned 日志清理时临时文件
.swap Log Compaction 之后的临时文件
Leader-epoch-checkpoint

每个 LogSegment 都有一个基准偏移量,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特别说明一下,如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121,偏移量是从 0 开始的。

如果想要查看相应文件内容可以通过 kafka-run-class.sh 脚本查看 .log :

/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

002

2.0 中可以使用 kafka-dump-log.sh 查 看.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index

日志与索引文件

配置项 默认值 说明
log.index.interval.bytes 4096 (4K) 增加索引项字节间隔密度,会影响索引文件中的区间密度和查询效率
log.segment.bytes 1073741824 (1G) 日志文件最大值
log.roll.ms 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,毫秒维度
log.roll.hours 168 (7天) 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值允许的最大范围,小时维度
log.index.size.max.bytes 10485760 (10MB) 触发偏移量索引文件或时间戳索引文件分段字节限额

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。

Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,他并不保证每一个消息在索引文件中都有对应的索引项。每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,通过修改 log.index.interval.bytes 的值,改变索引项的密度。

切分文件

从上文中可知,日志文件和索引文件都会存在多个文件,组成多个 SegmentLog,那么其切分的规则是怎样的呢?

当满足如下几个条件中的其中之一,就会触发文件的切分:

  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes 参数的默认值为 1073741824,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的默认值为 10485760,即 10MB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量。
为什么是 Integer.MAX_VALUE ?

在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。相对偏移量和物理地址。

相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节

物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节

4 个字节刚好对应 Integer.MAX_VALUE ,如果大于 Integer.MAX_VALUE ,则不能用 4 个字节进行表示了。

索引文件切分过程

索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值,当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。

查找消息

offset 查询

偏移量索引由相对偏移量和物理地址组成。

003

可以通过如下命令解析.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
offset:0 position:0
offset:20 position:320
offset:43 position:1220

注意:offset 与 position 没有直接关系哦,由于存在数据删除和日志清理。

004

e.g. 如何查看 偏移量为 23 的消息?

Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在 00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即 offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

时间戳方式查询

在上文已经有所提及,通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。

时间戳索引索引格式

005

006+

 

e.g. 查找时间戳为 1557554753430 开始的消息?

  • 将 1557554753430 和每个日志分段中最大时间戳 largestTimeStamp 逐一对比,直到找到不小于 1557554753430 所对应的日志分段。日志分段中的 largestTimeStamp 的计算是先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于 0 ,则取该值,否则去该日志分段的最近修改时间。
  • 找到相应日志分段之后,使用二分法进行定位,与偏移量索引方式类似,找到不大于 1557554753430 最大索引项,也就是 [1557554753420 430]。
  • 拿着偏移量为 430 到偏移量索引文件中使用二分法找到不大于 430 最大索引项,即 [20,320] 。
  • 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。

注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的哦。因为数据的写入是各自追加。

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是 CreateTime 则无法保证顺序。

日志清理

日志清理,不是日志删除哦,这还是有所区别的,日志删除会在下文进行说明。

Kafka 提供两种日志清理策略:

日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除

日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。

Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值:delete,还可以选择 compact。

是否支持针对具体的 Topic 进行配置?

答案是肯定的,主题级别的配置项是 cleanup.policy 。

日志删除

配置 默认值 说明
log.retention.check.interval.ms 300000 (5分钟) 检测频率
log.retention.hours 168 (7天) 日志保留时间小时
log.retention.minutes 日志保留时间分钟
log.retention.ms 日志保留时间毫秒
file.delete.delay.ms 60000 (1分钟) 延迟执行删除时间
log.retention.bytes -1 无穷大 运行保留日志文件最大值
log.retention.bytes 1073741824 (1G) 日志文件最大值

Kafka 会周期性根据相应规则进行日志数据删除,保留策略有 3 种:基于时间的保留策略、基于日志大小的保留策略和基于日志其实偏移量的保留策略。

基于时间

日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天,log.retention.ms 优先级最高。

如何查找日志分段文件中已经过去的数据呢?

Kafka 依据日志分段中最大的时间戳进行定位,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间呢?

因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。

删除过程
  1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。
  2. 这些日志分段所有文件添加 上 .delete 后缀。
  3. 交由一个以 "delete-file" 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行时间可以通过 file.delete.delay.ms 进行设置

如果活跃的日志分段中也存在需要删除的数据时?

Kafka 会先切分出一个新的日志分段作为活跃日志分段,然后执行删除操作。

基于日志大小

日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日志分段的大小由 log.regment.bytes 进行设定。

删除过程

  1. 计算需要被删除的日志总大小 (当前日志文件大小-retention值)。
  2. 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
  3. 执行删除。

基于日志起始偏移量

基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以删除此日志分段。

注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。

007

删除过程
  • 从头开始遍历每一个日志分段,日志分段 1 的下一个日志分段的起始偏移量为 11,小于 logStartOffset,将 日志分段 1 加入到删除队列中
  • 日志分段 2 的下一个日志分段的起始偏移量为 23,小于 logStartOffset,将 日志分段 2 加入到删除队列中
  • 日志分段 3 的下一个日志分段的起始偏移量为 30,大于 logStartOffset,则不进行删除。

Elasticsearch教程

索引(动词)
索引一个文档 就是存储一个文档到一个 索引 (名词)中以便它可以被检索和查询到。这非常类似于 SQL 语句中的 INSERT 关键词,除了文档已存在时新chaj文档会替换旧文档情况之外。

倒排索引
倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(inverted index)。带有倒排索引的文件我们称为倒排索引文件,简称倒排文件(inverted file)。

文档
存储在ES上的主要实体叫文档

文档类型(废弃)
在ES中,一个索引对象可以存储很多不同用途的对象。

映射
存储有关字段的信息,每一个文档类型都有自己的映射。

面向文档
在应用程序中对象很少只是一个简单的键和值的列表。通常,它们拥有更复杂的数据结构,可能包括日期、地理信息、其他对象或者数组等。

Elasticsearch面向文档的,意味着它存储整个对象或 文档。Elasticsearch不仅存储文档,而且 索引每个文档的内容使之可以被检索。在Elasticsearch 中,你对文档进行索引、检索、排序和过滤而不是对行列数据。这是一种完全不同的思考数据的方式,也是Elasticsearch 能支持复杂全文检索的原因。

12345631
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |