Kafka删除topic的方法

在运营环境中,删除kafka topic的方法,没有必要当然不要这么干,迫不得已才会去删除topic。

首先配置文件server.properties 增加参数 delete.topic.enable=true

其次呢,使用删除命令行:

./bin/kafka-topics.sh –delete –topic THIS_IS_MY_TOPIC –zookeeper localhost:2181

*红色字体需要替换。

或者使用kafka-manager集群管理工具删除。

如果删除之前,没有配置 delete.topic.enable=true 参数,那么topic只会标记为marked for deletion ,也就是说,只是标记并没有删除;

此时只要加上配置项,然后重启kafka就可以真正删除该topic。

如果重启还是没有被删除,那么就使用zookeeper的命令删除试试;

输入工具行命令: ./zookeeper-3.4.10/bin/zkCli.sh

此时进入了一个shell中端。

执行:ls /brokers/topics 命令可以查看当前brokers下所有topics。

然后执行: rmr /brokers/topics/[topic name] 删除指定的topic,但是可能会提示:

The command ‘rmr’ has been deprecated. Please use ‘deleteall’ instead.

只要把rmr换成deleteall就可以了,如:

deleteall /brokers/topics/[topic name]

再次使用ls /brokers/topics 命令查看topics,如果发现你要删除的topic还在,

执行:ls /admin/delete_topics  这里会显示出你刚刚要删除的topic,然后执行:deleteall /admin/delete_topics/[topic name]  就可以完全删除了。

总结一下:

1.执行:

./bin/kafka-topics.sh –delete –topic THIS_IS_MY_TOPIC –zookeeper localhost:2181

2.如果没有删除:

配置文件server.properties 增加参数 delete.topic.enable=true

3.重启kafka

4.还是没有删除:

执行:./zookeeper-3.4.10/bin/zkCli.sh

执行:ls /brokers/topics 命令可以查看当前brokers下所有topics

执行:deleteall /brokers/topics/[topic name]

 

5.再次使用ls /brokers/topics 命令查看topics,如果发现你要删除的topic还在

执行:ls /admin/delete_topics

执行:deleteall /admin/delete_topics/[topic name] 

结束完成。比较麻烦,但是有效解决在kafka使用过程中如果有需要删除topic而又删除失败。

Spring boot中mongodb的使用

MongoDB 简介

MongoDB(来自于英文单词“Humongous”,中文含义为“庞大”)是可以应用于各种规模的企业、各个行业以及各类应用程序的开源数据库。基于分布式文件存储的数据库。由C++语言编写。旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。MongoDB 是一个高性能,开源,无模式的文档型数据库,是当前 NoSql 数据库中比较热门的一种。

MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。他支持的数据结构非常松散,是类似 json 的 bjson 格式,因此可以存储比较复杂的数据类型。MongoDB 最大的特点是他支持的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引。

传统的关系数据库一般由数据库(database)、表(table)、记录(record)三个层次概念组成,MongoDB 是由数据库(database)、集合(collection)、文档对象(document)三个层次组成。MongoDB 对于关系型数据库里的表,但是集合中没有列、行和关系概念,这体现了模式自由的特点。

MongoDB 中的一条记录就是一个文档,是一个数据结构,由字段和值对组成。MongoDB 文档与 JSON 对象类似。字段的值有可能包括其它文档、数组以及文档数组。MongoDB 支持 OS X、Linux 及 Windows 等操作系统,并提供了 Python,PHP,Ruby,Java及 C++ 语言的驱动程序,社区中也提供了对 Erlang 及 .NET 等平台的驱动程序。

MongoDB 的适合对大量或者无固定格式的数据进行存储,比如:日志、缓存等。对事物支持较弱,不适用复杂的多文档(多表)的级联查询。文中演示 Mongodb 版本为 3.5。

MongoDB 的增删改查

Spring Boot 对各种流行的数据源都进行了封装,当然也包括了 Mongodb,下面给大家介绍如何在 Spring Boot 中使用 Mongodb:

1、pom 包配置

pom 包里面添加 spring-boot-starter-data-mongodb 包引用

<dependencies>
	<dependency> 
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-starter-data-mongodb</artifactId>
	</dependency> 
</dependencies>

2、在 application.properties 中添加配置

spring.data.mongodb.uri=mongodb://name:pass@localhost:27017/test

多个 IP 集群可以采用以下配置:

spring.data.mongodb.uri=mongodb://user:pwd@ip1:port1,ip2:port2/database

2、创建数据实体

public class User implements Serializable {
        private static final long serialVersionUID = -3258839839160856613L;
        private Long id;
        private String userName;
        private String passWord;

      //getter、setter省略
}

3、创建实体的增删改查操作

Repository 层实现了 User 对象的增删改查

@Component
public class UserRepositoryImpl implements UserRepository {

    @Autowired
    private MongoTemplate mongoTemplate;

    /**
     * 创建对象
     * @param user
     */
    @Override
    public void saveUser(User user) {
        mongoTemplate.save(user);
    }

    /**
     * 根据用户名查询对象
     * @param userName
     * @return
     */
    @Override
    public User findUserByUserName(String userName) {
        Query query=new Query(Criteria.where("userName").is(userName));
        User user =  mongoTemplate.findOne(query , User.class);
        return user;
    }

    /**
     * 更新对象
     * @param user
     */
    @Override
    public long updateUser(User user) {
        Query query=new Query(Criteria.where("id").is(user.getId()));
        Update update= new Update().set("userName", user.getUserName()).set("passWord", user.getPassWord());
        //更新查询返回结果集的第一条
        UpdateResult result =mongoTemplate.updateFirst(query,update,User.class);
        //更新查询返回结果集的所有
        // mongoTemplate.updateMulti(query,update,UserEntity.class);
        if(result!=null)
            return result.getMatchedCount();
        else
            return 0;
    }

    /**
     * 删除对象
     * @param id
     */
    @Override
    public void deleteUserById(Long id) {
        Query query=new Query(Criteria.where("id").is(id));
        mongoTemplate.remove(query,User.class);
    }
}

4、开发对应的测试方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class UserDaoTest {

    @Autowired
    private UserDao userDao;

    @Test
    public void testSaveUser() throws Exception {
        UserEntity user=new UserEntity();
        user.setId(2l);
        user.setUserName("小明");
        user.setPassWord("fffooo123");
        userDao.saveUser(user);
    }

    @Test
    public void findUserByUserName(){
       UserEntity user= userDao.findUserByUserName("小明");
       System.out.println("user is "+user);
    }

    @Test
    public void updateUser(){
        UserEntity user=new UserEntity();
        user.setId(2l);
        user.setUserName("天空");
        user.setPassWord("fffxxxx");
        userDao.updateUser(user);
    }

    @Test
    public void deleteUserById(){
        userDao.deleteUserById(1l);
    }

}

5、查看验证结果

可以使用工具 MongoVUE 工具来连接后直接图形化展示查看,也可以登录服务器用命令来查看

1.登录 mongos

bin/mongo -host localhost -port 20000

2、切换到 test 库

use test

3、查询 user 集合数据

db.user.find()

根据3查询的结果来观察测试用例的执行是否正确。

到此 Spring Boot 对应 MongoDB 的增删改查功能已经全部实现。

多数据源 MongoDB 的使用

接下来实现 MongoDB 多数据源的使用

1、pom 包配置

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

2、配置两条数据源,如下:

mongodb.primary.uri=mongodb://192.168.0.75:20000
mongodb.primary.database=primary
mongodb.secondary.uri=mongodb://192.168.0.75:20000
mongodb.secondary.database=secondary

3、配置两个库的数据源

封装读取以 Mongodb 开头的两个配置文件

@Data
@ConfigurationProperties(prefix = "mongodb")
public class MultipleMongoProperties {

	private MongoProperties primary = new MongoProperties();
	private MongoProperties secondary = new MongoProperties();
}

配置不同包路径下使用不同的数据源

第一个库的封装

@Configuration
@EnableMongoRepositories(basePackages = "com.neo.model.repository.primary",
		mongoTemplateRef = PrimaryMongoConfig.MONGO_TEMPLATE)
public class PrimaryMongoConfig {

	protected static final String MONGO_TEMPLATE = "primaryMongoTemplate";
}

第二个库的封装

@Configuration
@EnableMongoRepositories(basePackages = "com.neo.model.repository.secondary",
		mongoTemplateRef = SecondaryMongoConfig.MONGO_TEMPLATE)
public class SecondaryMongoConfig {

	protected static final String MONGO_TEMPLATE = "secondaryMongoTemplate";
}

读取对应的配置信息并且构造对应的 MongoTemplate

@Configuration
public class MultipleMongoConfig {

    @Autowired
    private MultipleMongoProperties mongoProperties;

    @Primary
    @Bean(name = "primaryMongoTemplate")
    public MongoTemplate primaryMongoTemplate() throws Exception {
        return new MongoTemplate(primaryFactory(this.mongoProperties.getPrimary()));
    }

    @Bean
    @Qualifier("secondaryMongoTemplate")
    public MongoTemplate secondaryMongoTemplate() throws Exception {
        return new MongoTemplate(secondaryFactory(this.mongoProperties.getSecondary()));
    }

    @Bean
    @Primary
    public MongoDatabaseFactory primaryFactory(MongoProperties mongo) throws Exception {
        MongoClient client = MongoClients.create(mongo.getUri());
        return new SimpleMongoClientDatabaseFactory(client, mongoProperties.getPrimary().getDatabase());
    }

    @Bean
    public MongoDatabaseFactory secondaryFactory(MongoProperties mongo) throws Exception {
        MongoClient client = MongoClients.create(mongo.getUri());
        return new SimpleMongoClientDatabaseFactory(client, mongoProperties.getSecondary().getDatabase());
    }
}

两个库的配置信息已经完成。

4、创建两个库分别对应的对象和 Repository

对应可以共用

public class User implements Serializable {
        private static final long serialVersionUID = -3258839839160856613L;
        private String  id;
        private String userName;
        private String passWord;

        public User(String userName, String passWord) {
                this.userName = userName;
                this.passWord = passWord;
        }
}

对应的 Repository

public interface PrimaryRepository extends MongoRepository<PrimaryMongoObject, String> {
}

继承了 MongoRepository 会默认实现很多基本的增删改查,省了很多自己写 Repository 层的代码

Secondary 和上面的代码类似就不贴出来了

5、最后测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class MuliDatabaseTest {

    @Autowired
    private PrimaryRepository primaryRepository;

    @Autowired
    private SecondaryRepository secondaryRepository;

    @Test
    public void TestSave() {

        System.out.println("************************************************************");
        System.out.println("测试开始");
        System.out.println("************************************************************");

        this.primaryRepository
                .save(new PrimaryMongoObject(null, "第一个库的对象"));

        this.secondaryRepository
                .save(new SecondaryMongoObject(null, "第二个库的对象"));

        List<PrimaryMongoObject> primaries = this.primaryRepository.findAll();
        for (PrimaryMongoObject primary : primaries) {
            System.out.println(primary.toString());
        }

        List<SecondaryMongoObject> secondaries = this.secondaryRepository.findAll();

        for (SecondaryMongoObject secondary : secondaries) {
            System.out.println(secondary.toString());
        }

        System.out.println("************************************************************");
        System.out.println("测试完成");
        System.out.println("************************************************************");
    }

}

到此,MongoDB 多数据源的使用已经完成。

来源于:http://www.ityouknow.com/springboot/2023/01/11/spring-boot-mongodb.html

kafka集群服务broker扩容

原先的3节点的kafka假设为node1、node2、node3

准备2台空闲点的服务器(这里假设为node4和node5)

系统版本:CentOS7

node1  192.168.2.187

node2  192.168.2.188

node3  192.168.2.189

node4  192.168.2.190

node5  192.168.2.191

kafka的扩容操作分为2步:

1、zk 节点扩容

2、kafka 节点扩容

首先在node4 node5上把相关的软件部署好:

cd /root/
tar xf zookeeper-3.4.9.tar.gz
tar xf kafka_2.11-0.10.1.0.tar.gz 
tar xf jdk1.8.0_101.tar.gz 

mv kafka_2.11-0.10.1.0  zookeeper-3.4.9   jdk1.8.0_101   /usr/local/

cd /usr/local/ 
ln -s zookeeper-3.4.9   zookeeper-default
ln -s kafka_2.11-0.10.1.0  kafka-default
ln -s jdk1.8.0_101    jdk-default

第一部分:zk节点的扩容:

1、在node4上执行:

mkdir /usr/local/zookeeper-default/data/ 

vim  /usr/local/zookeeper-default/conf/zoo.cfg  在原有的基础上,增加最后的2行配置代码:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-default/data/
clientPort=2181
maxClientCnxns=2000
maxSessionTimeout=240000
server.1=192.168.2.187:2888:3888
server.2=192.168.2.188:2888:3888
server.3=192.168.2.189:2888:3888
server.4=192.168.2.190:2888:3888
server.5=192.168.2.191:2888:3888

## 清空目录防止有脏数据
rm -fr /usr/local/zookeeper-default/data/*

## 添加对应的myid文件到zk数据目录下
echo 4 > /usr/local/zookeeper-default/data/myid

2、启动node4的zk进程:

/usr/local/zookeeper-default/bin/zkServer.sh start

/usr/local/zookeeper-default/bin/zkServer.sh  status   类似如下效果:
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-default/bin/../conf/zoo.cfg
Mode: follower

/usr/local/zookeeper-default/bin/zkCli.sh

echo stat | nc 127.0.0.1 2181  结果类似如下:
Zookeeper version: 3.4.9-1757313, built on 08/23/2016 06:50 GMT
Clients:
 /127.0.0.1:50072[1](queued=0,recved=6,sent=6)
 /127.0.0.1:50076[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/2/13
Received: 24
Sent: 23
Connections: 2
Outstanding: 0
Zxid: 0x10000009a
Mode: follower
Node count: 63

3、在node5上执行:

vim  /usr/local/zookeeper-default/conf/zoo.cfg  增加最后的2行代码:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-default/data/
clientPort=2181
maxClientCnxns=2000
maxSessionTimeout=240000
server.1=192.168.2.187:2888:3888
server.2=192.168.2.188:2888:3888
server.3=192.168.2.189:2888:3888
server.4=192.168.2.190:2888:3888
server.5=192.168.2.191:2888:3888

## 清空目录防止有脏数据
rm -fr /usr/local/zookeeper-default/data/*

## 添加对应的myid文件到zk数据目录下
echo 5 > /usr/local/zookeeper-default/data/myid

4、启动node5的zk进程:

/usr/local/zookeeper-default/bin/zkServer.sh start

/usr/local/zookeeper-default/bin/zkServer.sh  status
 
echo stat | nc  127.0.0.1 2181  结果类似如下:
Zookeeper version: 3.4.9-1757313, built on 08/23/2016 06:50 GMT
Clients:
 /127.0.0.1:45582[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0/0
Received: 3
Sent: 2
Connections: 1
Outstanding: 0
Zxid: 0x10000009a
Mode: follower
Node count: 63
也可以使用 echo mntr   | nc  127.0.0.1 2181  这个结果更详细,类似如下:
zk_version3.4.9-1757313, built on 08/23/2016 06:50 GMT
zk_avg_latency0
zk_max_latency194
zk_min_latency0
zk_packets_received101436
zk_packets_sent102624
zk_num_alive_connections4
zk_outstanding_requests0
zk_server_statefollower
zk_znode_count141
zk_watch_count190
zk_ephemerals_count7
zk_approximate_data_size10382
zk_open_file_descriptor_count35
zk_max_file_descriptor_count102400

5、当我们确认 新加的2个zk节点没问题后,我们需要去修改之前的老的3台zk的配置,然后重启这3个zk

修改 node1 node2 node3的 zk配置,如下:

vim  /usr/local/zookeeper-default/conf/zoo.cfg  增加最后的2行代码:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-default/data/
clientPort=2181
maxClientCnxns=2000
maxSessionTimeout=240000
server.1=192.168.2.187:2888:3888
server.2=192.168.2.188:2888:3888
server.3=192.168.2.189:2888:3888
server.4=192.168.2.190:2888:3888
server.5=192.168.2.191:2888:3888

注意重启的时候,我们先重启 follower节点(例如我这里follower是 node2、node3,leader是 node1)

/usr/local/zookeeper-default/bin/zkServer.sh stop
/usr/local/zookeeper-default/bin/zkServer.sh status

/usr/local/zookeeper-default/bin/zkServer.sh start
/usr/local/zookeeper-default/bin/zkServer.sh status

第二部分:kafka节点的扩容:

1、node4 (192.168.2.190)上修改:

mkdir -pv /usr/local/kafka-default/kafka-logs

vim /usr/local/kafka-default/config/server.properties  修改后的文件如下:
broker.id=4   # 注意修改这里
listeners=PLAINTEXT://:9094,TRACE://:9194
advertised.listeners=PLAINTEXT://192.168.2.190:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka-default/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.2.187:2181,192.168.2.188:2181,192.168.2.189:2181,192.168.2.190:2181,192.168.2.191:2181  # 注意修改这里
zookeeper.connection.timeout.ms=6000
default.replication.factor=2
compression.type=gzip
offsets.retention.minutes=2880
controlled.shutdown.enable=true
delete.topic.enable=true

2、启动node4的kafka程序:

/usr/local/kafka-default/bin/kafka-server-start.sh -daemon /usr/local/kafka-default/config/server.properties

3、node5(192.168.2.191)上修改

mkdir -pv /usr/local/kafka-default/kafka-logs

vim /usr/local/kafka-default/config/server.properties  修改后的文件如下:
broker.id=5   # 注意修改这里
listeners=PLAINTEXT://:9094,TRACE://:9194
advertised.listeners=PLAINTEXT://192.168.2.191:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka-default/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.2.187:2181,192.168.2.188:2181,192.168.2.189:2181,192.168.2.190:2181,192.168.2.191:2181   # 注意修改这里
zookeeper.connection.timeout.ms=6000
default.replication.factor=2
compression.type=gzip
offsets.retention.minutes=2880
controlled.shutdown.enable=true
delete.topic.enable=true

4、启动node5的kafka程序:

/usr/local/kafka-default/bin/kafka-server-start.sh -daemon /usr/local/kafka-default/config/server.properties

5、测试是否有问题

这里我们可以自己先用 kafka-console-producer.sh 和 kafka-console-consumer.sh  自测下是否 正常工作,然后看看 kafka-manager上是否有需要重新均衡的副本。。

第三部分:对存在风险broker节点的数据迁移(我这里需要这么操作,单纯的扩容不需要这个步骤):

这里我们可以使用kafka-manager这个web平台来做 topic的迁移操作,很简单,这里就不截图了。

第四部分: 对node2 node3下线操作

1、关闭node2 node3节点上面的zk进程,让zk leader节点自动选举

2、关闭node2 node3上面的kafka进程,让kafka controller节点自动选举

## 可能遇到的问题: 

在迁移过程中,遇到consumergroup在我们迁移topic的时候发生异常,让业务方重启了consumer后 报错消失。。

 

转自:https://blog.51cto.com/lee90/2423980

Redis中数据导出和导入:redis-dump和redis-load

使用第三方工具redis-dump和redis-load迁移redis数据库指定库号数据到新redis恢复:

1、配置yum仓库

yum install centos-release-scl-rh -y

2、安装其他工具,不安装后面可能会报错

yum install rh-ruby24*  -y

3、让ruby、redis-dump和redis-load起作用,下次连接上来在运行redis-dump之前也需要执行该语句才行

scl  enable  rh-ruby24 bash

检测下 ruby 版本看是否生效

ruby -v

4、安装redis-dump

gem install redis-dump -V

5、redis-dump导出数据

redis-dump -u 127.0.0.1:6379 -a 'yourpassword' -d 1 > redis_1.json

6、redis-load 导入数据

如果你导出的是1号库的数据,然后你要把它导入到新redis的19号库,将导出文件中的所有”db”:1, 换成”db”:19,然后保存,上传后导入即可,在新redis无密码的情况下可以顺利导入。
修改前redis_1.json

{"db":1,"key":"course:online:capture:sendmsg:user:697","ttl":-1,"type":"list","value":["{\"@class\":\"com.shida.zhaosheng.projects.pojo.formVo.CaptureFormVo\",\"openId\":\"oV3Nd52RDdNqH17zSeyoDAlUemFw\",\"timestamp\":1630800931890,\"imgUrl\":null,\"materialId\":null}"],"size":165}

修改后redis_19.json

{"db":19,"key":"course:online:capture:sendmsg:user:697","ttl":-1,"type":"list","value":["{\"@class\":\"com.shida.zhaosheng.projects.pojo.formVo.CaptureFormVo\",\"openId\":\"oV3Nd52RDdNqH17zSeyoDAlUemFw\",\"timestamp\":1630800931890,\"imgUrl\":null,\"materialId\":null}"],"size":165}

恢复语句,在redis无密码的情况下可以顺利导入。

cat redis_19.json | redis-load -u 127.0.0.1:6379 -d 19

备注:
< test.json redis-load -u 192.168.0.31
ERROR (Yajl::ParseError): lexical error: invalid bytes in UTF8 string.
lue”:{“maxInactiveInterval”:”0000\u0005sr\u0000\u0011jav
(right here) ——^
如报错可加参数 -n,不检查 utf-8格式< test.json redis-load -n -u 192.168.0.31&&

Reactor 之 publishOn 与 subscribeOn

一、概述

在 Spring Reactor 项目中,有两个出镜较少的方法:publishOn subscribeOn。这两个方法的作用是指定执行 Reactive Streaming 的 Scheduler(可理解为线程池)

为何需要指定执行 Scheduler 呢?一个显而易见的原因是:组成一个反应式流的代码有快有慢。

例如 NIO、BIO。如果将这些功能都放在一个线程里执行,快的就会被慢的影响,所以需要相互隔离。这是这两个方法应用的最典型的场景。

二、Scheduler

在介绍 publishOn subscribeOn 方法之前,需要先介绍 Scheduler 这个概念。在 Reactor 中,Scheduler 用来定义执行调度任务的抽象。可以简单理解为线程池,但其实际作用要更多。先简单介绍 Scheduler 的实现:

  • Schedulers.elastic(): 调度器会动态创建工作线程,线程数无上界,类似于 Execturos.newCachedThreadPool()
  • Schedulers.parallel(): 创建固定线程数的调度器,默认线程数等于 CPU 核心数。

三、publishOn 与 subscribeOn

接下来进入正题。先看两个例子(来自 https://github.com/reactor/lite-rx-api-hands-on

publishOn 的例子

Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
    return flux
            .publishOn(Schedulers.elastic())
            .doOnNext(repository::save)
            .then();
}

subscribeOn 的例子

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
            .subscribeOn(Schedulers.elastic());
}

这里的 repository 的类型是 BlockingRepository,指的是会导致线程阻塞的数据库操作的集合,例如 JPA、MyBatis 等基于 JDBC 技术实现的 DAO。

  • 在第一个例子中,在执行了 publishOn(Schedulers.elastic()) 之后,repository::save 就会被Schedulers.elastic() 定义的线程池所执行。
  • 在第二个例子中,subscribeOn(Schedulers.elastic()) 的作用类似。它使得 repository.findAll()(也包括 Flux.fromIterable)的执行发生在 Schedulers.elastic() 所定义的线程池中。

从上面的描述看,publishOn subscribeOn 的作用类似,那两者的区别又是什么?

两者的区别

简单说,两者的区别在于影响范围。

publishOn 影响在其之后的 operator 执行的线程池,而 subscribeOn 则会从源头影响整个执行过程。

所以,publishOn 的影响范围和它的位置有关,而 subscribeOn 的影响范围则和位置无关。

看个 publishOnsubscribeOn 同时使用的例子

Flux
    .just("tom")
    .map(s -> {
        System.out.println("[map] Thread name: " + Thread.currentThread().getName());
        return s.concat("@mail.com");
    })
    .publishOn(Schedulers.newElastic("thread-publishOn"))
    .filter(s -> {
        System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
        return s.startsWith("t");
    })
    .subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
    .subscribe(s -> {
        System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
        System.out.println(s);
    });

输出结果如下:

[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com

从上面的例子可以看出,subscribeOn 定义在 publishOn 之后,但是却从源头开始生效。而在 publishOn 执行之后,线程池变更为 publishOn 所定义的。

实际用途

这里介绍 publishOnsubscribeOn 的一种实际用途,那就是反应式编程和传统的,会导致线程阻塞的编程技术混用的场景。其实开头两个例子已经解释了这个场景。

在第一个 publishOn 的例子中,repository::save 会导致线程阻塞,为了避免造成对其它反应式操作的影响,便使用 publishOn 改变其执行线程。

在第二个 subscribeOn 的例子中,repository.findAll() 会导致线程阻塞。但是其是源头的 publisher,因此不能使用 publishOn 改变其 执行线程。这时就需要使用 subscribeOn,在源头上修改其执行线程。

这样,通过 publishOnsubscribeOn 就在反应式编程中实现了线程池隔离的目的,一定程度上避免了会导致线程阻塞的程序执行影响到反应式编程的程序执行效率。

局限性

使用 publishOnsubscribeOn 只能在一定程度上避免反应式编程代码执行的效率被影响。因为用来隔离的线程池资源终归是有限的,比如当出现数据库资源不足、慢查询等问题时,对应的线程池资源如果被耗尽,还是会使整个反应式编程的执行效率受到影响。

目前,Redis、Mongo、Couchbase 等非关系型数据库均有相应的反应式编程的解决方案,但是关系型数据库却没有理想的方案。一个重要原因是 JDBC 本身就是一个阻塞式的 API,根本不可能让其适应反应式编程。因此需要一个新的方案。目前 Oracle 正在推动 ADBA (Asynchronous Database Access API),使得关系型数据库可以满足异步编程的需要。但是,因为是 Oracle 主导,大家都懂的,所以目前前景还不是很明朗。

另外一个技术方案是 Spring 推动的 R2DBC,从名字上来看就很像是 JDBC 在反应式编程领域的对应的解决方案。目前可以支持 PostgreSQL,支持 MySQL 目前还尚需时日。

 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |