Redisson 分布式锁

从分布式锁到Redisson实现非常详细,适合慢慢咀嚼~

1. Redisson概述

什么是Redisson?

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。

Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

一个基于Redis实现的分布式工具,有基本分布式对象和高级又抽象的分布式服务,为每个试图再造分布式轮子的程序员带来了大部分分布式问题的解决办法。

Redisson和Jedis、Lettuce有什么区别?倒也不是雷锋和雷锋塔

Redisson和它俩的区别就像一个用鼠标操作图形化界面,一个用命令行操作文件。Redisson是更高层的抽象,Jedis和Lettuce是Redis命令的封装。

  • Jedis是Redis官方推出的用于通过Java连接Redis客户端的一个工具包,提供了Redis的各种命令支持
  • Lettuce是一种可扩展的线程安全的 Redis 客户端,通讯框架基于Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。
  • Redisson是架设在Redis基础上,通讯基于Netty的综合的、新型的中间件,企业级开发中使用Redis的最佳范本

Jedis把Redis命令封装好,Lettuce则进一步有了更丰富的Api,也支持集群等模式。但是两者也都点到为止,只给了你操作Redis数据库的脚手架,而Redisson则是基于Redis、Lua和Netty建立起了成熟的分布式解决方案,甚至redis官方都推荐的一种工具集。

2. 分布式锁

分布式锁怎么实现?

分布式锁是并发业务下的刚需,虽然实现五花八门:ZooKeeper有Znode顺序节点,数据库有表级锁和乐/悲观锁,Redis有setNx,但是殊途同归,最终还是要回到互斥上来,本篇介绍Redisson,那就以redis为例。

怎么写一个简单的Redis分布式锁?

以Spring Data Redis为例,用RedisTemplate来操作Redis(setIfAbsent已经是setNx + expire的合并命令),如下

// 加锁
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
    return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
// 解锁,防止删错别人的锁,以uuid为value校验是否自己的锁
public void unlock(String lockName, String uuid) {
    if(uuid.equals(redisTemplate.opsForValue().get(lockName)){        redisTemplate.opsForValue().del(lockName);    }
}

// 结构
if(tryLock){
    // todo
}finally{
    unlock;
}

这是锁没错,但get和del操作非原子性,并发一旦大了,无法保证进程安全。

Lua脚本是什么?

Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval /evalsha 命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。

于是2.0版本通过Lua脚本删除

lockDel.lua如下

if redis.call('get', KEYS[1]) == ARGV[1] 
    then 
 -- 执行删除操作
        return redis.call('del', KEYS[1]) 
    else 
 -- 不成功,返回0
        return 0 
end

delete操作时执行Lua命令

// 解锁脚本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));

// 执行lua脚本解锁
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);

2.0似乎更像一把锁,但好像又缺少了什么,小张一拍脑袋,synchronized和ReentrantLock都很丝滑,因为他们都是可重入锁,一个线程多次拿锁也不会死锁,我们需要可重入。

怎么保证可重入?

重入就是,同一个线程多次获取同一把锁是允许的,不会造成死锁,这一点synchronized偏向锁提供了很好的思路,synchronized的实现重入是在JVM层面,JAVA对象头MARK WORD中便藏有线程ID和计数器来对当前线程做重入判断,避免每次CAS。

“当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark Word中偏向锁标志是否设置成1:没有则CAS竞争;设置了,则CAS将对象头偏向锁指向当前线程。

再维护一个计数器,同个线程进入则自增1,离开再减1,直到为0才能释放”

可重入锁

仿造该方案,我们需改造Lua脚本:

“1.需要存储 锁名称lockName 、获得该锁的线程id 和对应线程的进入次数count

2.加锁

每次线程获取锁时,判断是否已存在该锁

  • 不存在
  • 设置hash的key为线程id,value初始化为1
  • 设置过期时间
  • 返回获取锁成功true
  • 存在
  • 继续判断是否存在当前线程id的hash key
  • 存在,线程key的value + 1,重入次数增加1,设置过期时间
  • 不存在,返回加锁失败

3.解锁

每次线程来解锁时,判断是否已存在该锁

  • 存在
  • 是否有该线程的id的hash key,有则减1,无则返回解锁失败
  • 减1后,判断剩余count是否为0,为0则说明不再需要这把锁,执行del命令删除”

1.存储结构

为了方便维护这个对象,我们用Hash结构来存储这些字段。Redis的Hash类似Java的HashMap,适合存储对象。

a001

hset lockname1 threadId 1

设置一个名字为lockname1 的hash结构,该hash结构key为threadId ,值value为1

hget lockname1 threadId

获取lockname1的threadId的值

存储结构为

lockname 锁名称
    key1:   threadId   唯一键,线程id
    value1:  count     计数器,记录该线程获取锁的次数

redis中的结构

a002

2.计数器的加减

当同一个线程获取同一把锁时,我们需要将对应线程的计数器count做加减

判断一个redis key是否存在,可以用exists,而判断一个hash的key是否存在,可以用hexists

a003

而redis也有hash自增的命令hincrby

每次自增1时 hincrby lockname1 threadId 1,自减1时 hincrby lockname1 threadId -1

3.解锁的判断

当一把锁不再被需要了,每次解锁一次,count减1,直到为0时,执行删除

综合上述的存储结构和判断流程,加锁和解锁Lua如下

加锁 lock.lua

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];

-- lockname不存在
if(redis.call('exists', key) == 0) then
    redis.call('hset', key, threadId, '1');
    redis.call('expire', key, releaseTime);
    return 1;
end;

-- 当前线程已id存在
if(redis.call('hexists', key, threadId) == 1) then
    redis.call('hincrby', key, threadId, '1');
    redis.call('expire', key, releaseTime);
    return 1;
end;
return 0;

解锁 unlock.lua

local key = KEYS[1];
local threadId = ARGV[1];

-- lockname、threadId不存在
if (redis.call('hexists', key, threadId) == 0) then
    return nil;
end;

-- 计数器-1
local count = redis.call('hincrby', key, threadId, -1);

-- 删除lock
if (count == 0) then
    redis.call('del', key);
    return nil;
end;

代码

/**
 * @description 原生redis实现分布式锁
 **/
@Getter
@Setter
public class RedisLock {

    private RedisTemplate redisTemplate;
    private DefaultRedisScript<Long> lockScript;
    private DefaultRedisScript<Object> unlockScript;

    public RedisLock(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        // 加载加锁的脚本
        lockScript = new DefaultRedisScript<>();
        this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
        this.lockScript.setResultType(Long.class);
        // 加载释放锁的脚本
        unlockScript = new DefaultRedisScript<>();
        this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
    }

    /**
     * 获取锁
     */
    public String tryLock(String lockName, long releaseTime) {
        // 存入的线程信息的前缀
        String key = UUID.randomUUID().toString();

        // 执行脚本
        Long result = (Long) redisTemplate.execute(
                lockScript,
                Collections.singletonList(lockName),
                key + Thread.currentThread().getId(),
                releaseTime);

        if (result != null && result.intValue() == 1) {
            return key;
        } else {
            return null;
        }
    }

    /**
     * 解锁
     * @param lockName
     * @param key
     */
    public void unlock(String lockName, String key) {
        redisTemplate.execute(unlockScript,
                Collections.singletonList(lockName),
                key + Thread.currentThread().getId()
                );
    }
}

至此已经完成了一把分布式锁,符合互斥、可重入、防死锁的基本特点。

当普通互斥锁,已经稳稳够用,但是业务里总是又很多特殊情况的,

比如A进程在获取到锁的时候,因业务操作时间太长,锁释放了但是业务还在执行,而此刻B进程又可以正常拿到锁做业务操作,两个进程操作就会存在依旧有共享资源的问题 。

而且如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态 。

所以我们希望在这种情况时,可以延长锁的releaseTime延迟释放锁来直到完成业务期望结果,这种不断延长锁过期时间来保证业务执行完成的操作就是锁续约。

读写分离也是常见,一个读多写少的业务为了性能,常常是有读锁和写锁的。

3. Redisson分布式锁

号称简单的Redisson分布式锁的使用姿势是什么?

1. 依赖

<!-- 原生,本章使用-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

<!-- 另一种Spring集成starter,本章未使用 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.6</version>
</dependency>

2. 配置

@Configuration
public class RedissionConfig {
    @Value("${spring.redis.host}")
    private String redisHost;

    @Value("${spring.redis.password}")
    private String password;

    private int port = 6379;

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        config.useSingleServer().
                setAddress("redis://" + redisHost + ":" + port).
                setPassword(password);
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }
}

3. 启用分布式锁

@Resource
private RedissonClient redissonClient;

RLock rLock = redissonClient.getLock(lockName);
try {
    boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
    if (isLocked) {
        // TODO
    }
} catch (Exception e) {
    rLock.unlock();
}

简洁明了,只需要一个RLock,既然推荐Redisson,就往里面看看他是怎么实现的。

4. RLock

RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。

RLock如何加锁?

从RLock进入,找到RedissonLock类,找到tryLock 方法再递进到干事的tryAcquireOnceAsync 方法,这是加锁的主要代码(版本不一此处实现有差别,和最新3.15.x有一定出入,但是核心逻辑依然未变。此处以3.13.6为例)

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
        RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining) {
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约任务。

我们先来看有过期时间tryLockInnerAsync 部分,evalWriteAsync是eval命令执行lua的入口

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
        RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining) {
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

这里揭开真面目,eval命令执行Lua脚本的地方,此处的Lua脚本展开

-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then 
  -- 新增该锁并且hash中该线程id对应的count置1
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  -- 设置过期时间
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil;
    end; 

-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  -- 线程重入次数++
            redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil;
    end; 
return redis.call('pttl', KEYS[1]);

和前面我们写自定义的分布式锁的脚本几乎一致,看来redisson也是一样的实现,具体参数分析:

// keyName
    KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
    ARGV[1] = this.internalLockLeaseTime
// uuid+threadId组合的唯一值
    ARGV[2] = this.getLockName(threadId)

总共3个参数完成了一段逻辑:

“判断该锁是否已经有对应hash表存在,

• 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime

• 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime

• 最后返回这把锁的ttl剩余时间

也和上述自定义锁没有区别

既然如此,那解锁的步骤也肯定有对应的-1操作,再看unlock方法,同样查找方法名,一路到

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}

掏出Lua部分

-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
  return nil;
end;
-- 计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
  -- 过期时间重设
  redis.call('pexpire', KEYS[1], ARGV[2]); 
  return 0; 
else
  -- 删除并发布解锁消息
  redis.call('del', KEYS[1]); 
  redis.call('publish', KEYS[2], ARGV[1]); 
  return 1;
end; 
return nil;

该Lua KEYS有2个Arrays.asList(getName(), getChannelName())

name 锁名称
channelName,用于pubSub发布消息的channel名称

ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)

LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0
internalLockLeaseTime,watchDog配置的超时时间,默认为30s
lockName 这里的lockName指的是uuid和threadId组合的唯一值

步骤如下:

“1.如果该锁不存在则返回nil;

2.如果该锁存在则将其线程的hash key计数器-1,

3.计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;

其中unLock的时候使用到了Redis发布订阅PubSub完成消息通知。

而订阅的步骤就在RedissonLock的加锁入口的lock方法里

long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
    if (ttl != null) {
    // 订阅
    RFuture<RedissonLockEntry> future = this.subscribe(threadId);
    if (interruptibly) {
        this.commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        this.commandExecutor.syncSubscription(future);
    }
    // 省略

当锁被其他线程占用时,通过监听锁的释放通知(在其他线程通过RedissonLock释放锁时,会通过发布订阅pub/sub功能发起通知),等待锁被其他线程释放,也是为了避免自旋的一种常用效率手段。

1. 解锁消息

为了一探究竟通知了什么,通知后又做了什么,进入LockPubSub。

这里只有一个明显的监听方法onMessage,其订阅和信号量的释放都在父类PublishSubscribe,我们只关注监听事件的实际操作

protected void onMessage(RedissonLockEntry value, Long message) {
    Runnable runnableToExecute;
    if (message.equals(unlockMessage)) {
        // 从监听器队列取监听线程执行监听回调
        runnableToExecute = (Runnable) value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        // getLatch()返回的是Semaphore,信号量,此处是释放信号量
        // 释放信号量后会唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁
        value.getLatch().release();
    } else if (message.equals(readUnlockMessage)) {
        while (true) {
            runnableToExecute = (Runnable) value.getListeners().poll();
            if (runnableToExecute == null) {
                value.getLatch().release(value.getLatch().getQueueLength());
                break;
            }
            runnableToExecute.run();
        }
    }
}

发现一个是默认解锁消息 ,一个是读锁解锁消息 ,因为redisson是有提供读写锁的,而读写锁读读情况和读写、写写情况互斥情况不同,我们只看上面的默认解锁消息unlockMessage分支

LockPubSub监听最终执行了2件事

  1. runnableToExecute.run() 执行监听回调
  2. value.getLatch().release(); 释放信号量

Redisson通过LockPubSub 监听解锁消息,执行监听回调和释放信号量通知等待线程可以重新抢锁。

这时再回来看tryAcquireOnceAsync另一分支

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    } else {
        RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining) {
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

可以看到,无超时时间时,在执行加锁操作后,还执行了一段费解的逻辑

ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining) {
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });

此处涉及到Netty的Future/Promise-Listener模型,Redisson中几乎全部以这种方式通信(所以说Redisson是基于Netty通信机制实现的),理解这段逻辑可以试着先理解

“在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()或 run()执行完毕意味着业务逻辑的完结,在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。”

这块代码的表面意义就是,在执行异步加锁的操作后,加锁成功则根据加锁完成返回的ttl是否过期来确认是否执行一段定时任务。

这段定时任务的就是watchDog的核心。

2. 锁续约

查看RedissonLock.this.scheduleExpirationRenewal(threadId)

private void scheduleExpirationRenewal(long threadId) {
    RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
    RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry) EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        this.renewExpiration();
    }

}

private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry) EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry) RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                if (res) {
                                    RedissonLock.this.renewExpiration();
                                }

                            }
                        });
                    }
                }
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

拆分来看,这段连续嵌套且冗长的代码实际上做了几步

“• 添加一个netty的Timeout回调任务,每(internalLockLeaseTime / 3)毫秒执行一次,执行的方法是renewExpirationAsync• renewExpirationAsync重置了锁超时时间,又注册一个监听器,监听回调又执行了renewExpiration

renewExpirationAsync 的Lua如下

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

if(redis.call('hexists',KEYS[1],ARGV[2])==1)then 
  redis.call('pexpire',KEYS[1],ARGV[1]); 
  return 1;
    end; 
return 0;

重新设置了超时时间。

Redisson加这段逻辑的目的是什么?

目的是为了某种场景下保证业务不影响,如任务执行超时但未结束,锁已经释放的问题。

当一个线程持有了一把锁,由于并未设置超时时间leaseTime,Redisson默认配置了30S,开启watchDog,每10S对该锁进行一次续约,维持30S的超时时间,直到任务完成再删除锁。

这就是Redisson的锁续约 ,也就是WatchDog 实现的基本思路。

3. 流程概括

通过整体的介绍,流程简单概括:

“A、B线程争抢一把锁,A获取到后,B阻塞

  1. B线程阻塞时并非主动CAS,而是PubSub方式订阅该锁的广播消息
  2. A操作完成释放了锁,B线程收到订阅消息通知
  3. B被唤醒开始继续抢锁,拿到锁”

详细加锁解锁流程总结如下图:

640

5. 公平锁

以上介绍的可重入锁是非公平锁,Redisson还基于Redis的队列(List)和ZSet实现了公平锁

公平的定义是什么?

公平就是按照客户端的请求先来后到排队来获取锁,先到先得,也就是FIFO,所以队列和容器顺序编排必不可少

FairSync

回顾JUC的ReentrantLock公平锁的实现

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

AQS已经提供了整个实现,是否公平取决于实现类取出节点逻辑是否顺序取

640

AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,通过内置FIFO队列来完成资源获取线程的排队工作,他自身没有实现同步接口,仅仅定义了若干同步状态获取和释放的方法来供自定义同步组件使用(上图),支持独占和共享获取,这是基于模版方法模式的一种设计,给公平/非公平提供了土壤。

我们用2张图来简单解释AQS的等待流程(出自《JAVA并发编程的艺术》)

一张是同步队列(FIFO双向队列)管理 获取同步状态失败(抢锁失败)的线程引用、等待状态和前驱后继节点的流程图

001

一张是独占式获取同步状态的总流程 ,核心acquire(int arg)方法调用流程

002

可以看出锁的获取流程

“AQS维护一个同步队列,获取状态失败的线程都会加入到队列中进行自旋,移出队列或停止自旋的条件是前驱节点为头节点切成功获取了同步状态。”

而比较另一段非公平锁类NonfairSync可以发现,控制公平和非公平的关键代码,在于hasQueuedPredecessors方法。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

NonfairSync减少了了hasQueuedPredecessors判断条件,该方法的作用就是

查看同步队列中当前节点是否有前驱节点,如果有比当前线程更早请求获取锁则返回true。

保证每次都取队列的第一个节点(线程)来获取锁,这就是公平规则

为什么JUC以默认非公平锁呢?

“因为当一个线程请求锁时,只要获取来同步状态即成功获取。在此前提下,刚释放的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。但这样带来的好处是,非公平锁大大减少了系统线程上下文的切换开销。”

可见公平的代价是性能与吞吐量。

Redis里没有AQS,但是有List和zSet,看看Redisson是怎么实现公平的。

RedissonFairLock

RedissonFairLock 用法依然很简单

RLock fairLock = redissonClient.getFairLock(lockName);
fairLock.lock();

RedissonFairLock继承自RedissonLock,同样一路向下找到加锁实现方法tryLockInnerAsync 。

这里有2段冗长的Lua,但是Debug发现,公平锁的入口在 command == RedisCommands.EVAL_LONG 之后,此段Lua较长,参数也多,我们着重分析Lua的实现规则

参数

-- lua中的几个参数
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
KEYS[1]: lock_name, 锁名称                   
KEYS[2]: "redisson_lock_queue:{xxx}"  线程队列
KEYS[3]: "redisson_lock_timeout:{xxx}"  线程id对应的超时集合

ARGV =  internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
ARGV[1]: "{leaseTime}" 过期时间
ARGV[2]: "{Redisson.UUID}:{threadId}"   
ARGV[3] = 当前时间 + 线程等待时间:(10:00:00) + 5000毫秒 = 10:00:05
ARGV[4] = 当前时间(10:00:00)  部署服务器时间,非redis-server服务器时间

公平锁实现的Lua脚本

-- 1.死循环清除过期key
while true do 
  -- 获取头节点
    local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
    -- 首次获取必空跳出循环
  if firstThreadId2 == false then 
    break;
  end;
  -- 清除过期key
  local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
  if timeout <= tonumber(ARGV[4]) then
    redis.call('zrem', KEYS[3], firstThreadId2);
    redis.call('lpop', KEYS[2]);
  else
    break;
  end;
end;

-- 2.不存在该锁 && (不存在线程等待队列 || 存在线程等待队列而且第一个节点就是此线程ID),加锁部分主要逻辑
if (redis.call('exists', KEYS[1]) == 0) and 
  ((redis.call('exists', KEYS[2]) == 0)  or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
  -- 弹出队列中线程id元素,删除Zset中该线程id对应的元素
  redis.call('lpop', KEYS[2]);
  redis.call('zrem', KEYS[3], ARGV[2]);
  local keys = redis.call('zrange', KEYS[3], 0, -1);
  -- 遍历zSet所有key,将key的超时时间(score) - 当前时间ms
  for i = 1, #keys, 1 do 
    redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
  end;
    -- 加锁设置锁过期时间
  redis.call('hset', KEYS[1], ARGV[2], 1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;

-- 3.线程存在,重入判断
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
  redis.call('hincrby', KEYS[1], ARGV[2],1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;

-- 4.返回当前线程剩余存活时间
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
    if timeout ~= false then
  -- 过期时间timeout的值在下方设置,此处的减法算出的依旧是当前线程的ttl
  return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;

-- 5.尾节点剩余存活时间
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
-- 尾节点不空 && 尾节点非当前线程
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
  -- 计算队尾节点剩余存活时间
  ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else
  -- 获取lock_name剩余存活时间
  ttl = redis.call('pttl', KEYS[1]);
end;

-- 6.末尾排队
-- zSet 超时时间(score),尾节点ttl + 当前时间 + 5000ms + 当前时间,无则新增,有则更新
-- 线程id放入队列尾部排队,无则插入,有则不再插入
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
  redis.call('rpush', KEYS[2], ARGV[2]);
end;
return ttl;

1.公平锁加锁步骤

通过以上Lua,可以发现,lua操作的关键结构是列表(list)和有序集合(zSet)。

其中list维护了一个等待的线程队列redisson_lock_queue:{xxx},zSet维护了一个线程超时情况的有序集合redisson_lock_timeout:{xxx},尽管lua较长,但是可以拆分为6个步骤

1.队列清理

  • 保证队列中只有未过期的等待线程

2.首次加锁

  • hset加锁,pexpire过期时间

3.重入判断

  • 此处同可重入锁lua

4.返回ttl

5.计算尾节点ttl

  • 初始值为锁的剩余过期时间

6.末尾排队

  • ttl + 2 * currentTime + waitTime是score的默认值计算公式

2.模拟

如果模拟以下顺序,就会明了redisson公平锁整个加锁流程

假设 t1 10:00:00 < t2 10:00:10 < t3 10:00:20

t1:当线程1初次获取锁

“1.等待队列无头节点,跳出死循环->22.不存在该锁 && 不存在线程等待队列 成立

2.1 lpop和zerm、zincrby都是无效操作,只有加锁生效,说明是首次加锁,加锁后返回nil

加锁成功,线程1获取到锁,结束”

t2:线程2尝试获取锁(线程1未释放锁)

“1.等待队列无头节点,跳出死循环->22.不存在该锁 不成立->3

3.非重入线程 ->4

4.score无值 ->5

5.尾节点为空,设置ttl初始值为lock_name的ttl -> 6

6.按照ttl + waitTime + currentTime + currentTime 来设置zSet超时时间score,并且加入等待队列,线程2为头节点

score = 20S + 5000ms + 10:00:10 + 10:00:10 = 10:00:35 + 10:00:10”

t3:线程3尝试获取锁(线程1未释放锁)

“1.等待队列有头节点1.1未过期->2

2.不存在该锁不成立->3

3.非重入线程->4

4.score无值 ->5

5.尾节点不为空 && 尾节点线程为2,非当前线程

5.1取出之前设置的score,减去当前时间:ttl = score – currentTime ->6

6.按照ttl + waitTime + currentTime + currentTime 来设置zSet超时时间score,并且加入等待队列

score = 10S + 5000ms + 10:00:20 + 10:00:20 = 10:00:35 + 10:00:20”

如此一来,三个需要抢夺一把锁的线程,完成了一次排队,在list中排列他们等待线程id,在zSet中存放过期时间(便于排列优先级)。其中返回ttl的线程2客户端、线程3客户端将会一直按一定间隔自旋重复执行该段Lua,尝试加锁,如此一来便和AQS有了异曲同工之处。

而当线程1释放锁之后(这里依旧有通过Pub/Sub发布解锁消息,通知其他线程获取)

10:00:30 线程2尝试获取锁(线程1已释放锁)

“1.等待队列有头节点,未过期->22.不存在该锁 & 等待队列头节点是当前线程 成立

2.1删除当前线程的队列信息和zSet信息,超时时间为:

线程2 10:00:35 + 10:00:10 – 10:00:30 = 10:00:15

线程3 10:00:35 + 10:00:20 – 10:00:30 = 10:00:25

2.2线程2获取到锁,重新设置过期时间

加锁成功,线程2获取到锁,结束”

排队结构如图

001

公平锁的释放脚本和重入锁类似,多了一步加锁开头的清理过期key的while true逻辑,在此不再展开篇幅描述。

由上可以看出,Redisson公平锁的玩法类似于延迟队列的玩法,核心都在Redis的List和zSet结构的搭配,但又借鉴了AQS实现,在定时判断头节点上如出一辙(watchDog),保证了锁的竞争公平和互斥。并发场景下,lua脚本里,zSet的score很好地解决了顺序插入的问题,排列好优先级。

并且为了防止因异常而退出的线程无法清理,每次请求都会判断头节点的过期情况给予清理,最后释放时通过CHANNEL通知订阅线程可以来获取锁,重复一开始的步骤,顺利交接到下一个顺序线程。

6. 总结

Redisson整体实现分布式加解锁流程的实现稍显复杂,作者Rui Gu对Netty和JUC、Redis研究深入,利用了很多高级特性和语义,值得深入学习,本次介绍也只是单机Redis下锁实现。

Redisson也提供了多机情况下的联锁MultiLock:

“https://github.com/redisson/redisson/wiki/8.-分布式锁和同步器#81-可重入锁reentrant-lock”

和官方推荐的红锁RedLock:

“https://github.com/redisson/redisson/wiki/8.-分布式锁和同步器#84-红锁redlock”

所以,当你真的需要分布式锁时,不妨先来Redisson里找找。

Spring Reactor 入门

适合阅读的人群:本文适合对 Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream 等特性有基本认识,希望了解 Spring 5 的反应式编程特性的技术人员阅读。

一、前言

最近几年,随着 Node.js、Golang 等新技术、新语言的出现,Java 的服务器端开发语言老大的地位受到了不小的挑战。虽然,Java 的市场份额依旧很大,短时间内也不会改变,但 Java 社区对于挑战也并没有无动于衷。相反,Java 社区积极应对这些挑战,不断提高自身应对高并发服务器端开发场景的能力。

为了应对高并发的服务器端开发,在2009年的时候,微软提出了一个更优雅地实现异步编程的方式 —— Reactive Programming,中文称反应式编程。随后,其它技术也迅速地跟上了脚步,像 ES6 通过 Promise 引入了类似的异步编程方式。Java 社区也没有落后很多,Netflix 和 TypeSafe 公司提供了 RxJava 和 Akka Stream 技术,让 Java 平台也有了能够实现反应式编程的框架。

其实,在更早之前,像 Mina 和 Netty 这样的 NIO 框架其实也能搞定高并发的服务器端开发任务,但这样的技术相对来说只是少数高级开发人员手中的工具。对于更多的普通开发者来说,难度显得大了些,所以不容易普及。

很多年过去了,到了2017年,虽然已经有不少公司在实践反应式编程。但整体来说,应用范围依旧不大。原因在于缺少简单易用的技术将反应式编程推广普及,并同诸如 MVC 框架、HTTP 客户端、数据库技术等整合。

终于,在2017年9月28日,解决上面问题的利器浮出水面 —— Spring 5 正式发布。Spring 5 其最大的意义就是能将反应式编程技术的普及向前推进一大步。而作为在背后支持 Spring 5 反应式编程的框架 Reactor,也相应的发布了 3.1.0 版本。

本文接下来将会向大家介绍 Reactive Programming(反应式编程)、Reactor 的入门以及实践技巧等相关的内容。文章中的实践内容来自作者使用 Spring 5 和 Reactor 等技术改造实际项目的经历。

二、Reactor 简介

先介绍一下 Reactor 技术。Reactor 框架是 Pivotal 公司(开发 Spring 等技术的公司)开发的,实现了 Reactive Programming 思想,符合 Reactive Streams 规范(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司发起的)的一项技术。其名字有反应堆之意,反映了其背后的强大的性能。

Reactive Programming

Reactive Programming,中文称反应式编程,是一种高性能应用的编程方式。其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术。在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在大家比较熟知的 Hystrix 就是以 RxJava 为基础开发的。

反应式编程其实并不神秘,通过与我们熟悉的迭代器模式对比便可了解其基本思想:

event Iterable (pull) Observable (push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete !hasNext() onCompleted()

上面表格的中的 Observable 那一列便代表反应式编程的 API 使用方式。可见,它就是常见的观察者模式的一种延伸。如果将迭代器看作是拉模式,那观测者模式便是推模式。被订阅者(Publisher)主动的推送数据给订阅者(Subscriber),触发 onNext 方法。异常和完成时触发另外两个方法。如果 Publisher 发布消息太快了,超过了 Subscriber 的处理速度,那怎么办。这就是 Backpressure 的由来,Reactive Programming 框架需要提供机制,使得 Subscriber 能够控制消费消息的速度。

在 Java 平台上,Netflix(开发了 RxJava)、TypeSafe(开发了 Scala、Akka)、Pivatol(开发了 Spring、Reactor)共同制定了一个被称为 Reactive Streams 项目(规范),用于制定反应式编程相关的规范以及接口。其主要的接口有这三个:

  • Publisher
  • Subscriber
  • Subcription

其中,Subcriber 中便包含了上面表格提到的 onNextonErroronCompleted 这三个方法。

对于 Reactive Streams,大家只需要理解其思想就可以,包括基本思想以及 Backpressure 等思想即可。

Imperative vs Reactive

对于上面表格里提到的 Iterable 和 Observale 两种风格,还有另一个称呼,便是 Imperative(指令式编程)和 Reactive(反应式编程)这两种风格。其实就是拉模型和推模型的另一种表述,大家理解其中的思想即可。对于 Imperative,老外写的文章有时会用,直译就是指令式编程,其实就是我们大家平时用 Java、Python 等语言写代码的常见风格,代码执行顺序和编写顺序基本一致(这里不考虑 JVM 指令重排)

Reactor 的主要模块

Reactor 框架主要有两个主要的模块:reactor-core 和 reactor-ipc。前者主要负责 Reactive Programming 相关的核心 API 的实现,后者负责高性能网络通信的实现,目前是基于 Netty 实现的。

Reactor 的主要类

在 Reactor 中,经常使用的类并不是很多,主要有以下两个:

  • Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。
  • Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发表者。

可能会使用到的类

  • Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。

Web Flux

001

Spring 5 引入的一个基于 Netty 而不是 Servlet 的高性能的 Web 框架,但是使用方式并没有同传统的基于 Servlet 的 Spring MVC 有什么大的不同。

▼ Web Flux 中 MVC 接口的示例

@RequestMapping("/demo")
@RestController
public class DemoController {
    @RequestMapping(value = "/foobar")
    public Mono<Foobar> foobar() {
        return Mono.just(new Foobar());
    }
}

最大的变化就是返回值从 Foobar 所表示的一个对象变为 Mono<Foobar> (或 Flux<T>

当然,实际的程序并不会像示例那样就一行代码。关于如何开发实际的应用,这些正是后面介绍 Reactor 的部分所要详细叙述的。

Reactive Streams、Reactor 和 Web Flux

上面介绍了反应式编程的一些概念,以及 Reactor 和 Web Flux。可能读者看到这里有些乱。这里介绍一下三者的关系。其实很简单:

Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

其实,对于大部分业务开发人员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 MonoFlux。对于 SubscriberSubcription 这两个接口,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架用到的。如果不开发中间件,通常开发人员是不会接触到的。

比如,在 Web Flux,你的方法只需返回 MonoFlux 即可。你的代码基本也只和 MonoFlux 打交道。而 Web Flux 则会实现 SubscriberonNext 时将业务开发人员编写的 MonoFlux 转换为 HTTP Response 返回给客户端。

三、Reactor 入门

接下来介绍一下 Reactor 中 Mono 和 Flux 这两个类中的主要方法的使用。

如同 Java 8 所引入的 Stream 一样,Reactor 的使用方式基本上也是分三步:开始阶段的创建、中间阶段的处理和最终阶段的消费。只不过创建和消费可能是通过像 Spring 5 这样框架完成的(比如通过 Web Flux 中的 WebClient 调用 HTTP 接口,返回值便是一个 Mono)。但我们还是需要基本了解这些阶段的开发方式。

1. 创建 Mono 和 Flux(开始阶段)

使用 Reactor 编程的开始必然是先创建出 Mono 或 Flux。有些时候不需要我们自己创建,而是实现例如 WebFlux 中的 WebClient 或 Spring Data Reactive 得到一个 Mono 或 Flux。

▼ 使用 WebFlux WebClient 调用 HTTP 接口

WebClient webClient = WebClient.create("http://localhost:8080");

public Mono<User> findById(Long userId) {
    return webClient
            .get()
            .uri("/users/" + userId)
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .flatMap(cr -> cr.bodyToMono(User.class));
}

▼ 使用 ReactiveMongoRepository 查询 User

public interface UserRepository extends ReactiveMongoRepository<User, Long> {
    Mono<User> findByUsername(String username);
}

但有些时候,我们也需要主动地创建一个 Mono 或 Flux。

“普通”的创建方式

简单的创建方式是主要是使用像 just 这样的方法创建

Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);

这样的创建方式在什么时候用呢?一般是用在当你在经过一系列非 IO 型的操作之后,得到了一个对象。接下来要基于这个对象运用 Reactor 进行高性能的 IO 操作时,可以用这种方式将你之前得到的对象转换为 Mono 或 Flux。

“文艺”的创建方式

上述是我们通过一个同步调用得到的结果创建出 MonoFlux,但有时我们需要从一个非 Reactive 的异步调用的结果创建出 Mono 或 Flux。那如何实现呢。

如果这个异步方法返回一个 CompletableFuture,那我们可以基于这个 CompletableFuture 创建一个 Mono:

Mono.fromFuture(aCompletableFuture);

如果这个异步调用不会返回 CompletableFuture,是有自己的回调方法,那怎么创建 Mono 呢?我们可以使用 static <T> Mono<T> create(Consumer<MonoSink<T>> callback) 方法:

Mono.create(sink -> {
    ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
    entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
        @Override
        public void onFailure(Throwable ex) {
            sink.error(ex);
        }

        @Override
        public void onSuccess(ResponseEntity<String> result) {
            sink.success(result.getBody());
        }
    });
});

在使用 WebFlux 之后,AsyncRestTemplate 已经不推荐使用,这里只是做演示。

2. 处理 Mono 和 Flux(中间阶段)

中间阶段的 Mono 和 Flux 的方法主要有 filtermapflatMapthenzipreduce 等。这些方法使用方法和 Stream 中的方法类似。对于这些方法的介绍,将会放在下一节“Reactor 进阶”中,主要介绍这些方法不容易理解和使用容易出问题的点。

3. 消费 Mono 和 Flux(结束阶段)

直接消费的 Mono 或 Flux 的方式就是调用 subscribe 方法。如果在 Web Flux 接口中开发,直接返回 Mono 或 Flux 即可。Web Flux 框架会为我们完成最后的 Response 输出工作。

四、Reactor 进阶

接下来我将介绍一下我在使用 Reactor 开发实际项目时遇到的一些稍显复杂的问题,以及解决方法。

问题一:mapflatMapthen 分别在什么时候使用?

本段内容将涉及到如下类和方法:

  • 方法:Mono.map
  • 方法:Mono.flatMap
  • 方法:Mono.then
  • 类:Function

MonoFlux 中间环节处理的处理过程中,有三个有些类似的方法:mapflatMapthen。这三个方法可以说是 Reactor 中使用频率很高的方法。

▼ 传统的命令式编程

Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);

▼ 对应的反应式编程

Mono.just(params)
    .flatMap(v -> doStep1(v))
    .flatMap(v -> doStep2(v))
    .flatMap(v -> doStep3(v));

从上面两段代码的对比就很容易看出来 flatMap 方法在其中起到的作用,mapthen 方法也有类似的作用。但这些方法之间的区别是什么呢?我们先来看看这三个方法的签名(以 Mono 为例):

  • flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
  • map(Function<? super T, ? extends R> mapper)
  • then(Mono<V> other)

可见,最复杂的是 flatMap 方法,map 次之,then 最简单。从方法名字上看,flatMapmap 都是做映射之用。而 then 则是下一步的意思,最适合用于链式调用,但为什么上面的例子使用的是 flatMap 而不是 then

then 表面看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。这个语义同 ES6 Promise 中的 then 方法是不同的。从 then 方法的参数只是一个 Mono,无从接受上一步的执行结果。flatMapmap 的参数都是一个 Function,入参是上一步的执行结果。

flatMapmap 的区别在于,flatMap 中的入参 Function 的返回值要求是一个 Mono(不明白的复习一下 Function 接口的定义),而 map 的入参 Function 只要求返回一个普通对象。因为我们在业务处理中常需要调用 WebClientReactiveXxxRepository 中的方法,这些方法的返回值都是 Mono(或 Flux)。所以要将这些调用串联为一个整体链式调用,就必须使用 flatMap,而不是 map

所以,我们要正确理解 flatMapmapthen 这三个方法的用法和背后的含义,这样才能正确实践反应式编程。

问题二:如何实现并发执行

本段内容将涉及到如下类和方法:

  • 方法:Mono.zip
  • 类:Tuple2
  • 类:BiFunction

并发执行是常见的一个需求。Reactive Programming 虽然是一种异步编程方式,但是异步不代表就是并发并行的。

在传统的命令式开发方式中,并发执行是通过线程池加 Future 的方式实现的。

Future<Result1> result1Future = doStep1(params);
Future<Result2> result2Future = doStep2(params);
Result1 result1 = result1Future.get();
Result2 result2 = result2Future.get();
// Do merge;
return mergeResult;

因为上面的代码虽然有一些异步效果在里面,但 Future.get() 方法是阻塞的。所以,当我们使用 Reactor 开发有并发执行场景的反应式代码时,肯定不能用上面的方式。这时,需要使用到 MonoFlux 中的 zip 方法。这里我们以 Mono 为例演示。代码如下:

Mono<CustomType1> item1Mono = ...;
Mono<CustomType2> item2Mono = ...;
Mono.zip(items -> {
    CustomType1 item1 = CustomType1.class.cast(items[0]);
    CustomType2 item2 = CustomType2.class.cast(items[1]);
    // Do merge
    return mergeResult;
}, item1Mono, item2Mono);

上述代码中,产生 item1Monoitem2Mono 的过程是并行的。比如,调用一个 HTTP 接口的同时,执行一个数据库查询操作。这样就可以加快程序的执行。

但上述代码存在一个问题,就是 zip 方法需要做强制类型转换。而强制类型转换是不安全的。所以我们需要更优雅的方式。

好在 zip 方法存在多种重载形式。除了最基本的形式以外,还有多种类型安全的形式:

static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator); 
static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);

对于不超过7个元素的合并操作,都有类型安全的 zip 方法可选。

以两个元素的合并为例,介绍一下使用方法:

Mono.zip(item1Mono, item2Mono).map(tuple -> {
    CustomType1 item1 = tuple.getT1();
    CustomType2 item2 = tuple.getT2();
    // Do merge
    return mergeResult;
});

上述代码中,map 方法的参数是一个 Tuple2,表示一个二元数组,相应的还有 Tuple3Tuple4 等。

另外,对于两个元素的并发执行,也可以通过 zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator) 方法直接将结果合并。方法是传递 BiFunction 实现合并算法。

问题三:集合循环之后的汇聚

本段内容将涉及到如下类和方法:

  • 方法:Flux.fromIterable
  • 方法:Flux.reduce
  • 类:BiFunction

另外一个稍微复杂的场景是对一个对象中的一个类型为集合类的(List、Set)进行处理之后,再对原本的对象进行处理。使用 Imperative 风格的代码很容易编写:

List<SubData> subDataList = data.getSubDataList();
for (SubData item : subDataList) {
    // Do something on data and item
}
// Do something on data

是不是简单到无以复加的地步了。但当我们要用 Reactive 风格的代码实现上述逻辑时,就不是那么简单了。

要在 Reactive 风格的代码中实现上述逻辑,我们主要是要用到 Fluxreduce 方法。我们先来看 reduce 方法的签名:

<A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

从方法签名我们可以看出 reduce 方法的功能就是讲一个 Flux 聚合成一个 Mono。参数中第一个参数是返回值 Mono 中元素的初始值。

第二个参数是一个 BiFunction,用来实现聚合操作的逻辑。泛型参数 <A, ? super T, A> 中,第一个 A 表示每次聚合操作(因为需要对集合中每个元素进行操作)之后的结果的类型,它作为 BiFunction.apply 方法的第一个入参 ;? super T 表示集合中的每个元素,它作为 BiFunction.apply 方法的第二个入参;最后一个 A 表示聚合操作的结果,它作为 BiFunction.apply 方法的返回值。

接下来看一下示例:

Data initData = ...;
List<SubData> aList = ...;
Flux.fromIterable(aList)
    .reduce(initData, (data, itemInList) -> {
        // Do something on data and itemInList
        return data;
    });

上面的示例代码中,initDatadata 的类型相同,我们,但是命名不能重复。执行完上述代码之后, reduce 方法会返回 Mono<Data>

Java 20中有哪些新功能

Java 20将于2023年3月发布,预计将引入一系列变化和新功能。我们准备了一个先睹为快,看看哪些JEP最有可能被JDK 20接受,哪些JEP我们很有希望下一个被接受!
Java 20是计划作为一个非LTS版本,而随后的版本21将被设置为一个具有长期支持(LTS)的版本。

Java开发工具包(JDK)的开发基于JDK增强建议(JEP)的概念。实际上,这些增强建议是JDK发布项目和所有相关开发活动的路线图。
在编写本报告时,JEP索引列出了437项改进建议,以及一些JEP草案和已提交的JEP。

JDK 20中需要哪些JEP?
1、记录模式
作为JDK 19中的第一个预览引入,并预期作为JDK 20中的第二个预览。为了支持数据导航和处理,记录模式简化了与记录组件的使用。在不更改类型模式的语法或语义的情况下,这将扩展更复杂数据查询的模式匹配。

2、switch语句的模式匹配
作为JDK 17中的第一个预览版引入,并预期作为JDK 20中的第四个预览版。

switch (s) {
  case null ->
    { break; }
  case Triangle t
  when t.calculateArea() > 100 ->
    System.out.println("Large triangle");
  default ->
    System.out.println("A shape, possibly a small triangle");
}

3、外部函数&内存(FFM)API
FFM API的根源在于广泛的JEP组合,这导致了它在JDK 19中的第一个预览版,并有望在JDK 20中成为第二个预览版。它允许调用Java程序的外部函数,以与外部代码和数据一起操作,例如在Java运行时之外,而没有JNI的缺点和风险。FFM API的主要目标是拥有一个上级的纯Java开发模型,并支持更广泛的外部内存模型。

4、虚拟线程
作为JDK 19中的第一个预览引入,并预期作为JDK 20中的第二个预览。虚拟线程可提高编写、维护和监视并发应用程序时的效率。它们是轻量级的,支持每个请求的线程可伸缩性,并且可以在对现有代码进行最小更改的情况下应用。传统的Java线程精确地映射到一个操作系统线程,而虚拟线程并不绑定到特定的操作系统线程。因此,创建它们的成本很低,并且可以根据需要创建任意多个。

JDK20发布后,哪些JEP令我们兴奋
1、基元类primitive

JEP 401引入了对一种新的特殊类型的值类的支持,以定义基元类型。由于更好的存储器访问以及因为在CPU内更有效地执行基元操作,基元的使用使得能够改进性能。通过此更新,开发人员可以获得基元的性能优势,同时保留类声明的抽象和其他优势。
为了能够将基元的优点与典型的面向对象结合起来,基元类需要遵守以下两条规则:

  • 原始类中的所有字段都被隐式定义为final,即它们只能在构造函数或初始化函数中设置。
  • 此外,基元类不能具有隐式依赖于声明类的字段。
primitive class Point implements Shape {
   ...
}

2、值对象
传统的Java对象提供标识,即它们的内存位置用于区分一个对象和另一个对象。提供标识在运行时代价很高,而且通常不会在实现中使用。通过这次提交,Java的对象模型扩展了值类和值对象。值类是无标识的,即它们可以提供基元类型的性能优势,同时利用面向对象的概念。当在值对象上使用==时,它们的字段值用于确定对象是否相等,而不是它们的内存位置。请注意,值类的所有字段都是隐式final的,需要在初始化器或构造函数中设置!
实值类使用value,例如:

value class Point implements Shape {
  ...
}

3、Universal generics通用泛型
此提交通过还允许原始类作为类型参数,消除了类型参数必须是引用类型的要求。许多现有的泛型实现将开箱即用地使用这个新增功能,即它们可以用原始类作为其类型参数来实例化。但是,需要特别注意,以防代码对 null 进行赋值,因为原始类将因空指针异常而失败。

4、字符串模板
为了简化常规字符串组合,字符串模板(又名字符串文字)包含在运行时解释的嵌入式表达式。通过向 Java 添加一种新的表达式(字符串模板表达式),JEP 430使得使用包含在运行时计算的值的字符串编写代码变得更加简单。这使得需要用户输入值的程序具有更好的可读性、定义格式语法的灵活性以及更高的安全性。

多线程代码的补充
如果您正在使用多线程代码,请确保检查这些添加项,它们提供了使多线程代码更易于阅读或提供性能提升的选项。
1、Scoped values
使用 extent-local 变量,可以更容易地在 Java 中的线程内和子线程之间共享不可变数据。此更新的目标是简化关于数据流的推理以提高可用性;提高健壮性,以便只有合法的被调用者才能检索调用者共享的数据;并通过启用运行时优化和将共享数据视为不可变来提高性能 虽然此更改不需要从线程局部变量迁移,但它们更适合用于大量虚拟线程。

2、结构化并发
通过引入用于结构化并发的 API, JEP 428可以将在不同线程中运行的不同任务视为一个工作单元。这应该大大简化多线程编程,并通过更容易确保代码的可靠性、可维护性和可观察性来鼓励开发人员应用并发编程。

更多令人兴奋的 JEP

  • Sequenced collections:将 SequencedCollection 接口添加到标准库,提供第一个定义集合中元素顺序的集合。
  • Vector API:与标量计算相比,为了提高性能,这个新的 API 支持在运行时编译矢量计算。
  • 异步堆栈跟踪 API:引入了一个用于异步收集堆栈跟踪的新 API。
  • 类文件 API:添加一个新的 API 来替换 ASM(或 cglib,或其他字节码库),它将提供一种读取、写入和转换 Java 类文件的方法。

 

Java 17 新特性

Java 17 在 2021 年 9 月 14 日正式发布,Java 17 是一个长期支持(LTS)版本,这次更新共带来 14 个新功能。

OpenJDK Java 17 下载:https://jdk.java.net/archive/

OpenJDK Java 17 文档:https://openjdk.java.net/projects/jdk/17/

1. JEP 306: 恢复始终严格的浮点语义

既然是恢复严格的浮点语义,那么说明在某个时间点之前,是始终严格的浮点语义的。其实在 Java SE 1.2 之前,所有的浮点计算都是严格的,但是以当初的情况来看,过于严格的浮点计算在当初流行的 x86 架构和 x87 浮点协议处理器上运行,需要大量的额外的指令开销,所以在 Java SE 1.2 开始,需要手动使用关键字 strictfp(strict float point) 才能启用严格的浮点计算。

但是在 2021 年的今天,硬件早已发生巨变,当初的问题已经不存在了,所以从 Java 17 开始,恢复了始终严格的浮点语义这一特性。

扩展strictfp 是 Java 中的一个关键字,大多数人可能没有注意过它,它可以用在类、接口或者方法上,被 strictfp 修饰的部分中的 float 和 double 表达式会进行严格浮点计算。

下面是一个示例,其中的 testStrictfp() 被 strictfp 修饰。

package com.wdbyte;

public class Main {
    public static void main(String[] args) {
        testStrictfp();
    }

    public strictfp static void testStrictfp() {
        float aFloat = 0.6666666666666666666f;
        double aDouble = 0.88888888888888888d;
        double sum = aFloat + aDouble;
        System.out.println("sum: " + sum);
    }
}

2. JEP 356:增强的伪随机数生成器

为伪随机数生成器 RPNG(pseudorandom number generator)增加了新的接口类型和实现,让在代码中使用各种 PRNG 算法变得容易许多。

这次增加了 RandomGenerator 接口,为所有的 PRNG 算法提供统一的 API,并且可以获取不同类型的 PRNG 对象流。同时也提供了一个新类 RandomGeneratorFactory 用于构造各种 RandomGenerator 实例,在 RandomGeneratorFactory 中使用 ServiceLoader.provider 来加载各种 PRNG 实现。

下面是一个使用示例:随便选择一个 PRNG 算法生成 5 个 10 以内的随机数。

package com.wdbyte.java17;

import java.util.Date;
import java.util.random.RandomGenerator;
import java.util.random.RandomGeneratorFactory;
import java.util.stream.Stream;

/**
 * @author niulang
 */
public class JEP356 {

    public static void main(String[] args) {
        RandomGeneratorFactory<RandomGenerator> l128X256MixRandom = RandomGeneratorFactory.of("L128X256MixRandom");
        // 使用时间戳作为随机数种子
        RandomGenerator randomGenerator = l128X256MixRandom.create(System.currentTimeMillis());
        for (int i = 0; i < 5; i++) {
            System.out.println(randomGenerator.nextInt(10));
        }
    }
}

得到输出:

7
3
4
4
6

你也可以遍历出所有的 PRNG 算法。

RandomGeneratorFactory.all().forEach(factory -> {
    System.out.println(factory.group() + ":" + factory.name());
});

得到输出:

LXM:L32X64MixRandom
LXM:L128X128MixRandom
LXM:L64X128MixRandom
Legacy:SecureRandom
LXM:L128X1024MixRandom
LXM:L64X128StarStarRandom
Xoshiro:Xoshiro256PlusPlus
LXM:L64X256MixRandom
Legacy:Random
Xoroshiro:Xoroshiro128PlusPlus
LXM:L128X256MixRandom
Legacy:SplittableRandom
LXM:L64X1024MixRandom

可以看到 Legacy:Random 也在其中,新的 API 兼容了老的 Random 方式,所以你也可以使用新的 API 调用 Random 类生成随机数。

// 使用 Random
RandomGeneratorFactory<RandomGenerator> l128X256MixRandom = RandomGeneratorFactory.of("Random");
// 使用时间戳作为随机数种子
RandomGenerator randomGenerator = l128X256MixRandom.create(System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
    System.out.println(randomGenerator.nextInt(10));
}

扩展阅读:增强的伪随机数生成器

3. JEP 382:使用新的 macOS 渲染库

macOS 为了提高图形的渲染性能,在 2018 年 9 月抛弃了之前的 OpenGL 渲染库 ,而使用了 Apple Metal 进行代替。Java 17 这次更新开始支持 Apple Metal,不过对于 API 没有任何改变,这一些都是内部修改。

扩展阅读:macOS Mojave 10.14 Release NotesApple Metal

4. JEP 391:支持 macOS/AArch64 架构

起因是 Apple 在 2020 年 6 月的 WWDC 演讲中宣布,将开启一项长期的将 Macintosh 计算机系列从 x64 过度到 AArch64 的长期计划,因此需要尽快的让 JDK 支持 macOS/AArch64 。

Linux 上的 AArch64 支持以及在 Java 16 时已经支持,可以查看之前的文章了解。

扩展:Java 16 新功能介绍 – JEP 386

5. JEP 398:删除已弃用的 Applet API

Applet 是使用 Java 编写的可以嵌入到 HTML 中的小应用程序,嵌入方式是通过普通的 HTML 标记语法,由于早已过时,几乎没有场景在使用了。

示例:嵌入 Hello.class

<applet code="Hello.class" height=200 width=200></applet>

Applet API 在 Java 9 时已经标记了废弃,现在 Java 17 中将彻底删除。

6. JEP 403:更强的 JDK 内部封装

如 Java 16 的 JEP 396 中描述的一样,为了提高 JDK 的安全性,使 --illegal-access 选项的默认模式从允许更改为拒绝。通过此更改,JDK 的内部包和 API(关键内部 API 除外)将不再默认打开。

但是在 Java 17 中,除了 sun.misc.Unsafe ,使用 --illegal-access 命令也不能打开 JDK 内部的强封装模式了,除了 sun.misc.Unsafe API .

在 Java 17 中使用 --illegal-access 选项将会得到一个命令已经移除的警告。

➜  bin ./java -version
openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)
➜  bin ./java --illegal-access=warn
OpenJDK 64-Bit Server VM warning: Ignoring option --illegal-access=warn; support was removed in 17.0

扩展阅读:JEP 403:更强的 JDK 内部封装Java 16 新功能介绍

7. JEP 406:switch 的类型匹配(预览)

如 instanceof 一样,为 switch 也增加了类型匹配自动转换功能。

在之前,使用 instanceof 需要如下操作:

if (obj instanceof String) {
    String s = (String) obj;    // grr...
    ...
}

多余的类型强制转换,而现在:

if (obj instanceof String s) {
    // Let pattern matching do the work!
    ...
}

switch 也可以使用类似的方式了。

static String formatterPatternSwitch(Object o) {
    return switch (o) {
        case Integer i -> String.format("int %d", i);
        case Long l    -> String.format("long %d", l);
        case Double d  -> String.format("double %f", d);
        case String s  -> String.format("String %s", s);
        default        -> o.toString();
    };
}

对于 null 值的判断也有了新的方式。

// Java 17 之前
static void testFooBar(String s) {
    if (s == null) {
        System.out.println("oops!");
        return;
    }
    switch (s) {
        case "Foo", "Bar" -> System.out.println("Great");
        default           -> System.out.println("Ok");
    }
}
// Java 17
static void testFooBar(String s) {
    switch (s) {
        case null         -> System.out.println("Oops");
        case "Foo", "Bar" -> System.out.println("Great");
        default           -> System.out.println("Ok");
    }
}

扩展阅读: JEP 406:switch 的类型匹配(预览)

8. JEP 407:移除 RMI Activation

移除了在 JEP 385 中被标记废除的 RMI(Remote Method Invocation)Activation,但是 RMI 其他部分不会受影响。

RMI Activation 在 Java 15 中的 JEP 385 已经被标记为过时废弃,至今没有收到不良反馈,因此决定在 Java 17 中正式移除。

扩展阅读: JEP 407:移除 RMI Activation

9. JEP 409:密封类(Sealed Classes)

Sealed Classes 在 Java 15 中的 JEP 360 中提出,在 Java 16 中的 JEP 397 再次预览,现在 Java 17 中成为正式的功能,相比 Java 16 并没有功能变化,这里不再重复介绍,想了解的可以参考之前文章。

扩展阅读:Java 16 新功能介绍JEP 409: Sealed Classes

10. JEP 401:移除实验性的 AOT 和 JIT 编译器

在 Java 9 的 JEP 295 中,引入了实验性的提前编译 jaotc 工具,但是这个特性自从引入依赖用处都不太大,而且需要大量的维护工作,所以在 Java 17 中决定删除这个特性。

主要移除了三个 JDK 模块:

  1. jdk.aot – jaotc 工具。
  2. Jdk.internal.vm.compiler – Graal 编译器。
  3. jdk.internal.vm.compiler.management

同时也移除了部分与 AOT 编译相关的 HotSpot 代码:

  1. src/hotspot/share/aot — dumps and loads AOT code
  2. Additional code guarded by #if INCLUDE_AOT

11. JEP 411:弃用 Security Manager

Security Manager 在 JDK 1.0 时就已经引入,但是它一直都不是保护服务端以及客户端 Java 代码的主要手段,为了 Java 的继续发展,决定弃用 Security Manager,在不久的未来进行删除。

@Deprecated(since="17", forRemoval=true)
public class SecurityManager {
	// ...
}

12. JEP 412:外部函数和内存 API (孵化)

新的 API 允许 Java 开发者与 JVM 之外的代码和数据进行交互,通过调用外部函数,可以在不使用 JNI 的情况下调用本地库。

这是一个孵化功能;需要添加--add-modules jdk.incubator.foreign来编译和运行 Java 代码。

历史

  • Java 14 JEP 370引入了外部内存访问 API(孵化器)。
  • Java 15 JEP 383引入了外部内存访问 API(第二孵化器)。
  • Java 16 JEP 389引入了外部链接器 API(孵化器)。
  • Java 16 JEP 393引入了外部内存访问 API(第三孵化器)。
  • Java 17 JEP 412引入了外部函数和内存 API(孵化器)。

扩展阅读:JEP 412:外部函数和内存 API (孵化)

13. JEP 414:Vector API(二次孵化)

在 Java 16 中引入一个新的 API 来进行向量计算,它可以在运行时可靠的编译为支持的 CPU 架构,从而实现更优的计算能力。

现在 Java 17 中改进了 Vector API 性能,增强了例如对字符的操作、字节向量与布尔数组之间的相互转换等功能。

14. JEP 415:指定上下文的反序列化过滤器

Java 中的序列化一直都是非常重要的功能,如果没有序列化功能,Java 可能都不会占据开发语言的主导地位,序列化让远程处理变得容易和透明,同时也促进了 Java EE 的成功。

但是 Java 序列化的问题也很多,它几乎会犯下所有的可以想象的错误,为开发者带来持续的维护工作。但是要说明的是序列化的概念是没有错的,把对象转换为可以在 JVM 之间自由传输,并且可以在另一端重新构建的能力是完全合理的想法,问题在于 Java 中的序列化设计存在风险,以至于爆出过很多和序列化相关的漏洞。

反序列化危险的一个原因是,有时候我们不好验证将要进行反序列化的内容是否存在风险,而传入的数据流可以自由引用对象,很有可能这个数据流就是攻击者精心构造的恶意代码。

所以,JEP 415 允许在反序列化时,通过一个过滤配置,来告知本次反序列化允许或者禁止操作的类,反序列化时碰到被禁止的类,则会反序列化失败。

14.1. 反序列化示例

假设 Dog 类中的 Poc 是恶意构造的类,但是正常反序列化是可以成功的。

package com.wdbyte.java17;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
 * @author niulang
 */
public class JEP415 {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Dog dog = new Dog("哈士奇");
        dog.setPoc(new Poc());
        // 序列化 - 对象转字节数组
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);) {
            objectOutputStream.writeObject(dog);
        }
        byte[] bytes = byteArrayOutputStream.toByteArray();
        // 反序列化 - 字节数组转对象
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
        Object object = objectInputStream.readObject();
        System.out.println(object.toString());
    }
}

class Dog implements Serializable {
    private String name;
    private Poc poc;

    public Dog(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Dog{" + "name='" + name + '\'' + '}';
    }
		// get...set...
}

class Poc implements Serializable{

}

输出结果:

Dog{name='哈士奇'}

14.2. 反序列化过滤器

在 Java 17 中可以自定义反序列化过滤器,拦截不允许的类。

package com.wdbyte.java17;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputFilter;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
 * @author niulang
 */
public class JEP415 {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Dog dog = new Dog("哈士奇");
        dog.setPoc(new Poc());
        // 序列化 - 对象转字节数组
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);) {
            objectOutputStream.writeObject(dog);
        }
        byte[] bytes = byteArrayOutputStream.toByteArray();
        // 反序列化 - 字节数组转对象
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
        // 允许 com.wdbyte.java17.Dog 类,允许 java.base 中的所有类,拒绝其他任何类
        ObjectInputFilter filter = ObjectInputFilter.Config.createFilter(
                        "com.wdbyte.java17.Dog;java.base/*;!*");
        objectInputStream.setObjectInputFilter(filter);
        Object object = objectInputStream.readObject();
        System.out.println(object.toString());
    }
}

class Dog implements Serializable {
    private String name;
    private Poc poc;

    public Dog(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Dog{" + "name='" + name + '\'' + '}';
    }
		// get...set...
}

class Poc implements Serializable{
}

这时反序列化会得到异常。

Exception in thread "main" java.io.InvalidClassException: filter status: REJECTED
	at java.base/java.io.ObjectInputStream.filterCheck(ObjectInputStream.java:1412)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2053)
	at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1907)
	....

扩展阅读:JEP 415:指定上下文的反序列化过滤器

 

文章转自:https://www.cnblogs.com/niumoo/p/15522730.html

kafka-configs.sh命令行使用方法

kafka的一个动态配置工具,可以通过命令行对系统进行动态配置,但并不是所有的配置项都支持的。

$ ./kafka-configs.bat
        This tool helps to manipulate and describe entity config for a topic, client, user or broker
        Option                                 Description
        ------                                 -----------
        --add-config <String>                  Key Value pairs of configs to add.
        Square brackets can be used to group
        values which contain commas: 'k1=v1,
        k2=[v1,v2,v2],k3=v3'. The following
        is a list of valid configurations:
        For entity-type 'topics':
        cleanup.policy
        compression.type
        delete.retention.ms
        file.delete.delay.ms
        flush.messages
        flush.ms
        follower.replication.throttled.
        replicas
        index.interval.bytes
        leader.replication.throttled.replicas
        max.compaction.lag.ms
        max.message.bytes
        message.downconversion.enable
        message.format.version
        message.timestamp.difference.max.ms
        message.timestamp.type
        min.cleanable.dirty.ratio
        min.compaction.lag.ms
        min.insync.replicas
        preallocate
        retention.bytes
        retention.ms
        segment.bytes
        segment.index.bytes
        segment.jitter.ms
        segment.ms
        unclean.leader.election.enable
        For entity-type 'brokers':
        log.message.timestamp.type
        ssl.client.auth
        log.retention.ms
        sasl.login.refresh.window.jitter
        sasl.kerberos.ticket.renew.window.
        factor
        log.preallocate
        log.index.size.max.bytes
        sasl.login.refresh.window.factor
        ssl.truststore.type
        ssl.keymanager.algorithm
        log.cleaner.io.buffer.load.factor
        sasl.login.refresh.min.period.seconds
        ssl.key.password
        background.threads
        log.retention.bytes
        ssl.trustmanager.algorithm
        log.segment.bytes
        max.connections.per.ip.overrides
        log.cleaner.delete.retention.ms
        log.segment.delete.delay.ms
        min.insync.replicas
        ssl.keystore.location
        ssl.cipher.suites
        log.roll.jitter.ms
        log.cleaner.backoff.ms
        sasl.jaas.config
        principal.builder.class
log.flush.interval.ms
        log.cleaner.max.compaction.lag.ms
        max.connections
        log.cleaner.dedupe.buffer.size
        log.flush.interval.messages
        advertised.listeners
        num.io.threads
        listener.security.protocol.map
        log.message.downconversion.enable
        sasl.enabled.mechanisms
        sasl.login.refresh.buffer.seconds
        ssl.truststore.password
        listeners
        metric.reporters
        ssl.protocol
        sasl.kerberos.ticket.renew.jitter
        ssl.keystore.password
        sasl.mechanism.inter.broker.protocol
        log.cleanup.policy
        sasl.kerberos.principal.to.local.rules
        sasl.kerberos.min.time.before.relogin
        num.recovery.threads.per.data.dir
        log.cleaner.io.max.bytes.per.second
        log.roll.ms
        ssl.endpoint.identification.algorithm
        unclean.leader.election.enable
        message.max.bytes
        log.cleaner.threads
        log.cleaner.io.buffer.size
        max.connections.per.ip
        sasl.kerberos.service.name
        ssl.provider
        follower.replication.throttled.rate
        log.index.interval.bytes
        log.cleaner.min.compaction.lag.ms
        log.message.timestamp.difference.max.
        ms
        ssl.enabled.protocols
        log.cleaner.min.cleanable.ratio
        replica.alter.log.dirs.io.max.bytes.
        per.second
        ssl.keystore.type
        ssl.secure.random.implementation
        ssl.truststore.location
        sasl.kerberos.kinit.cmd
        leader.replication.throttled.rate
        num.network.threads
        compression.type
        num.replica.fetchers
        For entity-type 'users':
        request_percentage
        producer_byte_rate
        SCRAM-SHA-256
        SCRAM-SHA-512
        consumer_byte_rate
        For entity-type 'clients':
        request_percentage
        producer_byte_rate
        consumer_byte_rate
        Entity types 'users' and 'clients' may
        be specified together to update
        config for clients of a specific
        user.
        --alter                                Alter the configuration for the entity.
        --bootstrap-server <String: server to  The Kafka server to connect to. This
        connect to>                            is required for describing and
        altering broker configs.
        --command-config <String: command      Property file containing configs to be
        config property file>                  passed to Admin Client. This is used
        only with --bootstrap-server option
        for describing and altering broker
        configs.
        --delete-config <String>               config keys to remove 'k1,k2'
        --describe                             List configs for the given entity.
        --entity-default                       Default entity name for
        clients/users/brokers (applies to
        corresponding entity type in command
        line)
        --entity-name <String>                 Name of entity (topic name/client
        id/user principal name/broker id)
        --entity-type <String>                 Type of entity
        (topics/clients/users/brokers)
        --force                                Suppress console prompts
        --help                                 Print usage information.
        --version                              Display Kafka version.
        --zookeeper <String: urls>             REQUIRED: The connection string for
        the zookeeper connection in the form
        host:port. Multiple URLS can be
        given to allow fail-over.

网上找了一张图,翻译解释说明:

001

使用方法(例子):

  • 增加/编辑配置

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type topics –entity-name topicName –add-config ‘k1=v1, k2=v2, k3=v3′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type brokers –entity-name 1 –add-config ‘retention.bytes=1024072′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type brokers –entity-default –add-config ‘retention.bytes=1024072′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type clients –entity-default –add-config ‘retention.bytes=1024072′

  • 删除配置

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type topics –entity-name topicName –delete-config ‘k1,k2,k3’
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type clients –entity-name clientId –delete-config ‘k1,k2,k3’
./kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type brokers –entity-name $brokerId –delete-config ‘k1,k2,k3’
./kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type brokers –entity-default –delete-config ‘k1,k2,k3’

  • 列出配置项的描述信息

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –entity-type topics –entity-name topicName –describe
./kafka-configs.sh–bootstrap-server localhost:9092 –entity-type brokers –entity-name $brokerId –describe
./kafka-configs.sh –bootstrap-server localhost:9092 –entity-type brokers –entity-default –describe
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –entity-type users –entity-name user1 –entity-type clients –entity-name clientA –describe

 

 

123456114
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |