Apache ActiveMQ Artemis实战

1.ActiveMQ Artemis

High-performance, non-blocking architecture for the next generation of event-driven messaging applications.

  • JMS 1.1 & 2.0 with full client implementation including JNDI
  • High availability using shared storage or network replication
  • Simple & powerful protocol agnostic addressing model
  • Flexible clustering for distributing load
  • Advanced journal implementations for low-latency persistence as well as JDBC
  • High feature parity with ActiveMQ 5 to ease migration

翻译过来就是:

为下一代事件驱动的消息传递应用程序提供高性能、无阻塞的体系结构。

  • 包含JNDI,具有完整的JMS 1.1 & 2.0客户端实现
  • 高可用性共享存储、网络复制能力
  • 简单而强大的寻址模型协议
  • 灵活的负载均衡分配能力
  • 针对低延迟持久性和JDBC的高级日志实现
  • 与ActiveMQ 5的高功能奇偶校验,以简化迁移

官方文档:http://activemq.apache.org/components/artemis/migration

2.Architectural differences

虽然它们(ActiveMQ & Artemis)的设计目的是做同样的工作,但内部的工作方式却不同。以下是您在规划迁移时需要注意的一些最显着的体系结构差异。

在ActiveMQ中,我们有一些IO连接层的不同实现,如TCP(阻塞的)和NIO(非阻塞的)。在Artemis中,IO层是使用Netty实现的,Netty是一个NIO框架。这意味着不再需要在不同的实现之间进行选择,因为默认情况下使用非阻塞实现。

每个broker都是一个重要消息存储。大多数ActiveMQ用户都应该熟悉KahaDB。它由一个消息日志组成,用于快速存储消息(和其他命令包)日志,以及在需要时用于检索取回消息。

Artemis有自己的消息存储。它只包含仅追加的消息日志。由于分页方式的不同,不需要使用消息索引。我们一会儿再谈。在这一点上,很重要的一点是,这两个存储是不能互换的,如果需要的话,必须仔细规划数据迁移。

我们所说的分页差异是什么意思?分页是当broker无法在其内存中保存所有传入消息时发生的过程。如何处理这种情况的策略在两个broker之间是不同的。ActiveMQ有一些游标,这些游标基本上是准备发送给使用者的消息的缓存。它将尝试将所有传入的消息保存在缓存。当可用内存用完时,消息会添加到存储中,但缓存会停止。当空间再次可用时,broker将通过批量从存储中提取消息来再次填充缓存。因此,我们需要在broker运行时,经常地读取日志。为了做到这一点,我们需要维护日志索引,以便在日志中跟踪消息的位置。

在Artemis中,在这方面的工作是不同的。整个消息日志保存在内存中,并直接从中发送消息。当内存耗尽时,消息会在生产者端进行分页(在消息到达broker之前)。它们按到达时的顺序存储在连续的页面文件中。一旦释放内存,消息就会从这些页面文件移到日志中。对于这样的分页工作,只有在broker启动时才从文件日志读取消息,以便在内存中重新创建日志的这个版本。在这种情况下,日志只能按顺序读取,这意味着不需要在日志中保留消息索引。

这是ActiveMQ5.x和Artemis的主要区别之一。尽早了解它非常重要,因为它会影响许多目标策略设置以及我们如何配置代理以正确支持这些场景。

3.Addressing differences

另一个很大的区别是消息寻址和路由是如何完成的。ActiveMQ始于开源JMS实现,因此在其核心,所有JMS概念(如队列(queues)、主题(topics)和持久订阅(durable subscriptions))都是首先实现。这些都是基于项目开发的OpenWire协议,甚至KahaDB消息存储也是以OpenWire为中心的。这意味着所有其他支持的协议,如:MQTT、AMQP都在内部转换为OpenWire。

Artemis采取了不同的方法。它只在内部实现队列,所有消息传送概念都是通过使用消息地址路由到适当的队列来实现的。如:发布订阅(topics)和点对点(queues)之类的消息传递概念是使用不同类型的路由机制实现。广播(Multicast)路由用于实现发布订阅(publish-subscribe)规程,所有订阅者将通过路由将获得自己的内部队列和消息。选播(Anycast)路由用于实现点对点规程,其中每一个地址只有一个队列,所有消费者都订阅它。所有协议都使用寻址和路由方案。例如,您可以将JMS topic作为广播地址,用于发送给消费者。

*注:翻译水平不咋滴,文字来源于官网文档的翻译件(http://activemq.apache.org/components/artemis/migration)。

4.消息队列

点对点消息(queue):

通常将消息发送到队列,队列将消息持久化后,将消息推送给多个消费者,但只有其中一个能够接收成功。如果生产者在生产消息的时候,并无消费者在线,那么会将消息进行持久化,待消费者上线后,将消息发送给消费者。当消费者接收消息成功后,会隐式地发送一个ack给消息中间件,消息中间件收到ack后将消息从消息队列中标记为已消费并从队列中移除。

发布订阅(topic):

将消息发送到称为Topic的实体上,一个Topic可以有许多消费者(订阅者),每个订阅者都能收到发布者(生产者)的消息。生产订阅消息分为持久性订阅非持久性订阅

持久性订阅会将消息进行持久化而保存每一条消息,直到所有订阅者都收到了它们。

非持久订阅仅持续到创建它们的连接的最长生命周期。

消息传递可靠性,消息的可靠性有三种传输保障:

  • At most once,至多一次,消息可能丢失但是不会重复发送;
  • At least once,至少一次,消息不会丢失,但是可能会重复;
  • Exactly once,精确一次,每条消息肯定会被传输一次且仅一次。

事务消息:

事务消息指的是针对事务开始和事务结束期间的全体操作,要不全部成功,要不全部失败。Artemis支持本地的多个消息的发送和确认事务。

消息持久化:

需要持久化的消息存储在磁盘中,不需要持久化的消息存储在内存中。

5.使用Artemis

首先我们我们下载并安装了broker,我们就会遇到与ActiveMQ第一个不同之处。使用Artemis,您需要显式地创建一个broker实例,而在ActiveMQ上,这个步骤是可选的。这个步骤的意思是将broker的安装和配置分开,这使得以后升级和维护代理更加容易。

所以,在启动Artemis之前,你需要执行这样的操作:

$ bin/artemis create –user admin –password admin –role admins –allow-anonymous true /opt/artemis

不管你在什么操作系统上安装broker二进制文件,broker实例都会位于像“/opt/artemis”目录中。对每个ActiveMQ用户都熟悉这个文件夹的内容:

  • bin – 包含管理broker的shell脚本(start, stop, etc.)
  • data – 每一个broker数据存储 (message store)
  • etc – 包含broker配置文件(it’s what conf directory is in ActiveMQ)
  • log – Artemis logs存储目录, 而不像ActiveMQ将日志保存在data目录中
  • tmp – 临时文件目录

现在让我们更详细地了解一下配置。

  • etc/bootstrap.xml
    • 文件用于设置基本信息,如:主broker配置文件的位置、实用程序(如Web服务器)和JAAS安全性。
  • etc/broker.xml
    • 与ActiveMQ’s中的conf/activemq.xml类似,用于配置broker的大部分信息,如:端口、名称、安全策略等。
  • etc/artemis.profile
    • 文件与ActiveMQ中的bin/env文件类似。您可以在这个文件中配置broker环境变量,主要:SSL、调试等相关的JVM常规参数。
  • etc/logging.properties
    • 两个brokers之间的日志配置没有多大差别,所以任何熟悉Java日志记录系统的人都会配置这个文件。

最后,我们有JAAS配置文件 (login.configartemis-users.properties and artemis-roles.properties),它们与ActiveMQ中的配置相同。

现在我们可以启动Artemis:

$ bin/artemis run

6.Spring配置Artemis

maven配置

<!-- https://mvnrepository.com/artifact/org.apache.activemq/artemis-jms-client -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>artemis-jms-client</artifactId>
    <version>2.7.0</version>
</dependency>

xml配置(这里我使用java)

private ActiveMQConnectionFactory factory;
private Connection connection;

@Autowired
private ActiveMQQueueListener listener;

public ActiveMQConfig() {
    factory = new ActiveMQConnectionFactory(tcp_uri);
    factory.setCacheDestinations(true);

    try {
        connection = factory.createConnection();
        connection.start();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

首先我们需要一个ActiveMQ的连接器工厂类,然后还需要连接到ActiveMQ。

@Bean(destroyMethod = "close")
public Session activeMQSession() throws JMSException {
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    return session;
}

/**
 * Queue生产者
 *
 * @param session
 * @return
 * @throws JMSException
 */
@Bean
public MessageProducer messageProducer(Session session) throws JMSException {
    if (mqType == 0) {
        return session.createProducer(session.createQueue(QUEUE_DEFAULT));
    }
    return session.createProducer(session.createTopic(TOPIC_DEFAULT));
}

这里我们需要创建一个生产者(MessageProducer),mqType是一个int对象,如:private int mqType = 1;//0:queue|1:topic|2:mqtt

这里为了做实验,便于切换,但是在实际应用场合中,我们应该是既使用Queue也会使用Topic,我将会在最后贴出ActiveMQService服务类,此类实现了对Artemis的基本操作。

/**
 * Queue消费者
 *
 * @param session
 * @return
 * @throws JMSException
 */
@Bean
public MessageConsumer messageConsumer(Session session) throws JMSException {
    MessageConsumer consumer = null;
    if (mqType == 0) {
        consumer = session.createConsumer(session.createQueue(QUEUE_DEFAULT));
    } else {
        consumer = session.createConsumer(session.createTopic(TOPIC_DEFAULT));
    }
    consumer.setMessageListener(listener);
    return consumer;
}

最后我们只需要创建一个消费者(MessageConsumer),配置文件非常简单,现在可以用来测试。

@RestController
@RequestMapping(value = "/test")
public class TestActiveMQController {

    @Autowired
    private IActiveMQProducerService producerService;

    @RequestMapping(value = "/activemq/queue")
    public ResultResp<Void> testQueue(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();
        String str = "test queue " + DateTimeUtils.getTimeInt();
        producerService.send(str);
        resp.setInfo(str);
        return resp;
    }

    @RequestMapping(value = "/activemq/mqtt")
    public ResultResp<Void> testMqtt(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();
        String str = "test mqtt " + DateTimeUtils.getTimeInt();
        producerService.push(str);
        resp.setInfo(str);
        return resp;
    }
}

//生产者服务类
@Service
public class ActiveMQProducerServiceImpl extends CompactService implements IActiveMQProducerService {

    @Autowired
    private MessageProducer producer;

    @Autowired
    private Session session;

    @Autowired
    private BlockingConnection blockingConnection;


    public ActiveMQProducerServiceImpl() {

    }

    /**
     * Queue & Topic send
     *
     * @param message
     */
    @Override
    public void send(String message) {
        try {
            producer.send(session.createTextMessage(message));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * MQTT push
     *
     * @param message
     */
    @Override
    public void push(String message) {
        try {
            blockingConnection.publish(ActiveMQConfig.MQTT_DEFAULT, message.getBytes(), QoS.AT_LEAST_ONCE, false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

为了测试,随便整合了一下,但是后来觉得,其实没有必要使用config的方式进行整合,以为Apache ActiveMQ Artemis是一个非常大的消息队列,应该以服务层的概念来整合进去,所以后来写了一个Artemis服务对象类:

7.整合到服务层

接口IActiveMQService

package com.lanxinbase.system.service.resource;

import javax.jms.*;

/**
 * Created by alan on 2019/5/2.
 */
public interface IActiveMQService {

    Session createSession() throws JMSException;

    Queue createQueue(Session session, String name) throws JMSException;

    Topic createTopic(Session session, String name) throws JMSException;

    MessageProducer getMessageProducer(Queue queue) throws JMSException;

    MessageProducer getMessageProducer(Topic topic) throws JMSException;

    MessageProducer createMessageProducer(Destination destination, String name) throws JMSException;

    MessageConsumer getMessageConsumer(Session session, Queue queue) throws JMSException;

    MessageConsumer getMessageConsumer(Session session, Topic topic) throws JMSException;

    MessageConsumer createMessageConsumer(Session session, Destination destination, String name) throws JMSException;

    boolean send(String name, String type, String message);

    boolean send(String name, String type, String message, CompletionListener listener);

    boolean send(MessageProducer producer, String message);

    boolean send(MessageProducer producer, String message, CompletionListener listener);

    void addListener(String name, String type, MessageListener listener);


}

实现ActiveMQService:

package com.lanxinbase.system.service;

import com.lanxinbase.system.basic.CompactService;
import com.lanxinbase.system.service.resource.IActiveMQService;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by alan on 2019/5/2.
 * <p>
 * * 0.下载artemis,不是ActiveMQ!!
 * 在ActiveMQ中,这些都是自动的,但是artemis在第一次使用时候需要创建一个broker。
 * 1.创建broker,命令: ./bin/artemis create --user admin --password admin --role admins --allow-anonymous true /opt/arteclsmis
 * 2.启动artemis,命令:/opt/arteclsmis/bin/artemis run
 * <p>
 * <p>
 * Queue Test:
 * 生产者:
 * http://localhost:8180/test/activemq/pub/add?name=testQueue&type=queue
 * http://localhost:8180/test/activemq/pub/add?name=testQueue1&type=queue
 * http://localhost:8180/test/activemq/pub/add?name=testQueue2&type=queue
 * <p>
 * 消费者:
 * http://localhost:8180/test/activemq/sub/add?id=100&name=testQueue&type=queue
 * http://localhost:8180/test/activemq/sub/add?id=101&name=testQueue&type=queue
 * http://localhost:8180/test/activemq/sub/add?id=102&name=testQueue2&type=queue
 * http://localhost:8180/test/activemq/sub/add?id=103&name=testQueue1&type=queue
 * http://localhost:8180/test/activemq/sub/add?id=104&name=testQueue&type=queue
 * <p>
 * 结果日志:
 * 02-May-2019 13:32:22.318 lambda$subAdd$0 101 : test mqtt 1556775142
 * 02-May-2019 13:32:22.988 lambda$subAdd$0 100 : test mqtt 1556775142
 * 02-May-2019 13:32:23.824 lambda$subAdd$0 104 : test mqtt 1556775143
 * <p>
 * Queue消息不会丢失,如果生产者生产消息的时候没有消费者进入,那么消息会等到消费者进入后发送给消费者。
 * 如果有多个消费者监听同一个Queue,那么则会按照某种算法,将消息发送给其中一个消费者,如果接收成功后,通道会自动删除消息。
 * <p>
 * ***************************************************************************************************************
 * <p>
 * Topic Test:
 * 生产者:
 * http://localhost:8180/test/activemq/pub/add?name=testTopic&type=topic
 * http://localhost:8180/test/activemq/pub/add?name=testTopic1&type=topic
 * http://localhost:8180/test/activemq/pub/add?name=testTopic2&type=topic
 * <p>
 * 消费者:
 * http://localhost:8180/test/activemq/sub/add?id=100&name=testTopic&type=topic
 * http://localhost:8180/test/activemq/sub/add?id=101&name=testTopic&type=topic
 * http://localhost:8180/test/activemq/sub/add?id=102&name=testTopic1&type=topic
 * http://localhost:8180/test/activemq/sub/add?id=103&name=testTopic2&type=topic
 * http://localhost:8180/test/activemq/sub/add?id=104&name=testTopic&type=topic
 * <p>
 * 结果日志:
 * 02-May-2019 13:39:53.216 信息  topic/default/testTopic
 * 02-May-2019 13:39:53.219 lambda$subAdd$0 101 : test mqtt 1556775593
 * 02-May-2019 13:39:53.219 lambda$subAdd$0 100 : test mqtt 1556775593
 * 02-May-2019 13:39:53.220 lambda$subAdd$0 104 : test mqtt 1556775593
 * <p>
 * 02-May-2019 13:39:56.224 信息 topic/default/testTopic1
 * 02-May-2019 13:39:56.227 lambda$subAdd$0 102 : test mqtt 1556775596
 * <p>
 * 02-May-2019 13:39:59.420 信息 topic/default/testTopic2
 * 02-May-2019 13:39:59.423 lambda$subAdd$0 103 : test mqtt 1556775599
 * <p>
 * Topic消息会丢失,如果生产者生产消息的时候没有消费者进入,那么消息会丢失。
 * 当有消费者监听Topic时,可以收到消息,如果同时有多个消费者监听同一个topic,那么消息将分别发送给各个消费者。
 *
 * @See TestActiveMQController.class
 */

@Service
public class ActiveMQService extends CompactService implements IActiveMQService, InitializingBean, DisposableBean {

    public static final String TYPE_QUEUE = "queue";
    public static final String TYPE_TOPIC = "topic";

    private static final String TOPIC_DEFAULT = "topic/default/";
    private static final String QUEUE_DEFAULT = "queue/default/";
    private static final String tcp_uri = "tcp://0.0.0.0:61616";

    private ActiveMQConnectionFactory factory;
    private Connection connection;
    private Session session;

    private Map<String, MessageProducer> producerMap = new ConcurrentHashMap<>();

    public ActiveMQService() {

    }

    /**
     * 首次启动运行
     *
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        factory = new ActiveMQConnectionFactory(tcp_uri);
        factory.setCacheDestinations(true);

        connection = factory.createConnection();
        connection.start();
        createSession();
    }

    /**
     * 私有的session,主要用于生产者
     *
     * @return
     * @throws JMSException
     */
    private Session getSession() throws JMSException {
        if (session == null) {
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        }
        return session;
    }

    /**
     * 创建一个新的session,主要应用于消费者
     *
     * @return
     * @throws JMSException
     */
    @Override
    public Session createSession() throws JMSException {
        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    /**
     * 创建一个Queue
     *
     * @param s    session
     * @param name 如:test
     * @return
     * @throws JMSException
     */
    @Override
    public Queue createQueue(Session s, String name) throws JMSException {
        if (name == null) {
            name = "";
        }
        return s.createQueue(QUEUE_DEFAULT + name);
    }

    /**
     * 创建一个Topic
     *
     * @param s    session
     * @param name 如:test
     * @return
     * @throws JMSException
     */
    @Override
    public Topic createTopic(Session s, String name) throws JMSException {
        if (name == null) {
            name = "";
        }
        return s.createTopic(TOPIC_DEFAULT + name);
    }

    /**
     * 获取一个生产者
     *
     * @param queue
     * @return
     * @throws JMSException
     */
    @Override
    public MessageProducer getMessageProducer(Queue queue) throws JMSException {
        return createMessageProducer(queue, queue.getQueueName());
    }

    /**
     * 获取一个生产者
     *
     * @param topic
     * @return
     * @throws JMSException
     */
    @Override
    public MessageProducer getMessageProducer(Topic topic) throws JMSException {
        return createMessageProducer(topic, topic.getTopicName());
    }

    /**
     * 创建一个生产者
     *
     * @param destination
     * @param name        缓存key,同时等于queue&topic的name
     * @return
     * @throws JMSException
     */
    @Override
    public MessageProducer createMessageProducer(Destination destination, String name) throws JMSException {
        logger(name);
        MessageProducer producer = producerMap.get(name);
        if (producer == null) {
            producer = session.createProducer(destination);
            producerMap.put(name, producer);
        }
        return producer;
    }

    /**
     * 获取一个消费者
     *
     * @param s     可以通过createSession创建
     * @param queue
     * @return
     * @throws JMSException
     */
    @Override
    public MessageConsumer getMessageConsumer(Session s, Queue queue) throws JMSException {
        return createMessageConsumer(s, queue, queue.getQueueName());
    }

    /**
     * 获取一个消费者
     *
     * @param s     可以通过createSession创建
     * @param topic
     * @return
     * @throws JMSException
     */
    @Override
    public MessageConsumer getMessageConsumer(Session s, Topic topic) throws JMSException {
        return createMessageConsumer(s, topic, topic.getTopicName());
    }

    /**
     * 创建一个消费者
     *
     * @param s           session(本来是把session做成单例,但是消费者应该是动态的,不同于生产者,所以这里需要随时创建一个session)
     * @param destination
     * @param name        废弃
     * @return
     * @throws JMSException
     */
    @Override
    public MessageConsumer createMessageConsumer(Session s, Destination destination, String name) throws JMSException {
//        MessageConsumer consumer = consumerMap.get(name);
        MessageConsumer consumer = null;
//        if (consumer == null) {
//            consumer = session.createConsumer(destination);
//            consumerMap.put(name, consumer);
//        }
        consumer = s.createConsumer(destination);
        return consumer;
    }

    /**
     * 生产一条消息
     *
     * @param name    Queue|Topic的name
     * @param type    类型:Queue|Topic
     * @param message 字符串消息,通常应该是JSON字符串
     * @return
     */
    @Override
    public boolean send(String name, String type, String message) {
        return this.send(name, type, message, null);
    }

    @Override
    public boolean send(String name, String type, String message, CompletionListener listener) {
        MessageProducer producer;
        try {
            if (TYPE_QUEUE.equals(type)) {
                producer = getMessageProducer(createQueue(getSession(), name));
            } else {
                producer = getMessageProducer(createTopic(getSession(), name));
            }
            if (listener == null) {
                return this.send(producer, message);
            } else {
                return this.send(producer, message, listener);
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
        return false;
    }


    /**
     * 发送消息
     *
     * @param producer 生产者
     * @param message  消息字符串
     * @return
     */
    @Override
    public boolean send(MessageProducer producer, String message) {
        try {
            producer.send(session.createTextMessage(message));
            return true;
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean send(MessageProducer producer, String message, CompletionListener listener) {
        try {
            producer.send(session.createTextMessage(message), listener);
            return true;
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 消费者监听
     *
     * @param name     Queue|Topic的name
     * @param type     类型:Queue|Topic
     * @param listener 监听回调
     */
    @Override
    public void addListener(String name, String type, MessageListener listener) {
        MessageConsumer consumer = null;
        try {
            Session session = createSession();
            if (TYPE_QUEUE.equals(type)) {
                consumer = getMessageConsumer(session, createQueue(session, name));
            } else {
                consumer = getMessageConsumer(session, createTopic(session, name));
            }
            this.addListener(consumer, listener);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    private void addListener(MessageConsumer consumer, MessageListener listener) {
        try {
            consumer.setMessageListener(listener);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * 程序退出时需要关闭或停止连接
     *
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        if (session != null) {
            session.close();
        }
        connection.stop();
        connection.close();
        factory.close();
    }


}

由于每个方法都加了注释,所以这里就不更多的废话了。但是在public MessageConsumer createMessageConsumer()方法中我使用的是创建了一个全新的Session来创建消费者,为什么要这样子做呢?

在生产环境中应该跟createMessageProducer()方法中的session一样使用单列的session,这里需要注意就是这个session我并没有研究过有没有过期时间,在生产环境中,需要实时传输消息,理论上是不会过期。

之所以这样做,我认为在用户登陆APP上线之后会执行一个会话(创建一个新的session),当APP下线之后也应该移除这个Session,示例代码只是为了做测试,所以没有过多的实现,在实际应用中应该根据自身的业务需求来改进逻辑代码。

最后测试TestActiveMQController:

@RestController
@RequestMapping(value = "/test")
public class TestActiveMQController {

    private static final Logger logger = Logger.getLogger("TestActiveMQController>");

    @Resource
    private IActiveMQService activeMQService;


    /**
     * /test/activemq/pub/add?name=testQueue&type=queue
     * /test/activemq/pub/add?name=testQueue1&type=queue
     * <p>
     * /test/activemq/pub/add?name=testTopic&type=topic
     * /test/activemq/pub/add?name=testTopic1&type=topic
     *
     * @param request
     * @return
     */
    @RequestMapping(value = "/activemq/pub/add")
    public ResultResp<Void> pubAdd(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();

        String name = request.getParameter("name");
        String type = request.getParameter("type");

        String msg = "test mqtt " + DateTimeUtils.getTimeInt();

        activeMQService.send(name, type, msg);

        resp.setInfo(msg);
        return resp;
    }

    /**
     * /test/activemq/sub/add?id=100&name=testQueue&type=queue
     * /test/activemq/sub/add?id=101&name=testQueue1&type=queue
     * <p>
     * /test/activemq/sub/add?id=100&name=testTopic&type=topic
     * /test/activemq/sub/add?id=101&name=testTopic&type=topic
     *
     * @param request
     * @return
     */
    @RequestMapping(value = "/activemq/sub/add")
    public ResultResp<Void> subAdd(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();

        String id = request.getParameter("id");
        String name = request.getParameter("name");
        String type = request.getParameter("type");

        activeMQService.addListener(name, type, (m) -> {
            TextMessage textMessage = (TextMessage) m;
            try {
                logger.info(id + " : " + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });

        return resp;
    }
}

至此,从一点点理论到配置到实现全部完成,不喜勿喷哈?我只是写给自己以后查阅的。

Leave a Comment

 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |