1.zabbix简介
基于Web界面的分布式系统监控的企业级开源软件。可以监控各种系统与设备,网络参数,保证服务器设备安全运营;提供灵活的通知机制。
java中ArrayList 、LinkList区别
1.ArrayList是实现了基于动态数组的数据结构,LinkedList基于链表的数据结构。
2.对于随机访问get和set,ArrayList优于LinkedList,因为ArrayList可以随机定位,而LinkedList要移动指针一步一步的移动到节点处。(参考数组与链表来思考)
3.对于新增和删除操作add和remove,LinedList比较占优势,只需要对指针进行修改即可,而ArrayList要移动数据来填补被删除的对象的空间。
ArrayList和LinkedList是两个集合类,用于存储一系列的对象引用(references)。例如我们可以用ArrayList来存储一系列的String或者Integer。那么ArrayList和LinkedList在性能上有什么差别呢?什么时候应该用ArrayList什么时候又该用LinkedList呢?
一.时间复杂度
首先一点关键的是,ArrayList的内部实现是基于基础的对象数组的,因此,它使用get方法访问列表中的任意一个元素时(random-access),它的速度要比LinkedList快。LinkedList中的get方法是按照顺序从列表的一端开始检查,直到另外一端。对LinkedList而言,访问列表中的某个指定元素没有更快的方法了。
二.空间复杂度
在LinkedList中有一个私有的内部类,定义如下:
private static class Entry {
Object element;
Entry next;
Entry previous;
}
每个Entry对象reference列表中的一个元素,同时还有在LinkedList中它的上一个元素和下一个元素。一个有1000个元素的LinkedList对象将有1000个链接在一起的Entry对象,每个对象都对应于列表中的一个元素。这样的话,在一个LinkedList结构中将有一个很大的空间开销,因为它要存储这1000个Entity对象的相关信息。
ArrayList使用一个内置的数组来存储元素,这个数组的起始容量是10.当数组需要增长时,新的容量按如下公式获得:新容量=(旧容量*3)/2+1,也就是说每一次容量大概会增长50%。这就意味着,如果你有一个包含大量元素的ArrayList对象,那么最终将有很大的空间会被浪费掉,这个浪费是由ArrayList的工作方式本身造成的。如果没有足够的空间来存放新的元素,数组将不得不被重新进行分配以便能够增加新的元素。对数组进行重新分配,将会导致性能急剧下降。如果我们知道一个ArrayList将会有多少个元素,我们可以通过构造方法来指定容量。我们还可以通过trimToSize方法在ArrayList分配完毕之后去掉浪费掉的空间。
三.总结
ArrayList和LinkedList在性能上各有优缺点,都有各自所适用的地方,总的说来可以描述如下:
性能总结:
| – | add()操作 | delete()操作 | insert操作 | index取值操作 | iterator取值操作 |
|---|---|---|---|---|---|
| ArrayList/Vector/Stack | 好 | 差 | 差 | 极优 | 极优 |
| LinkedList | 好 | 好 | 好 | 差 | 极优 |
1.对ArrayList和LinkedList而言,在列表末尾增加一个元素所花的开销都是固定的。对ArrayList而言,主要是在内部数组中增加一项,指向所添加的元素,偶尔可能会导致对数组重新进行分配;而对LinkedList而言,这个开销是统一的,分配一个内部Entry对象。
2.在ArrayList的中间插入或删除一个元素意味着这个列表中剩余的元素都会被移动;而在LinkedList的中间插入或删除一个元素的开销是固定的。
3.LinkedList不支持高效的随机元素访问。
4.ArrayList的空间浪费主要体现在在list列表的结尾预留一定的容量空间,而LinkedList的空间花费则体现在它的每一个元素都需要消耗相当的空间
可以这样说:当操作是在一列数据的后面添加数据而不是在前面或中间,并且需要随机地访问其中的元素时,使用ArrayList会提供比较好的性能;当你的操作是在一列数据的前面或中间添加或删除数据,并且按照顺序访问其中的元素时,就应该使用LinkedList了。
LinkedList类
LinkedList实现了List接口,允许null元素。此外LinkedList提供额外的get,remove,insert方法在LinkedList的首部或尾部。这些操作使LinkedList可被用作堆栈(stack),队列(queue)或双向队列(deque)。
注意:LinkedList没有同步方法。如果多个线程同时访问一个List,则必须自己实现访问同步。一种解决方法是在创建List时构造一个同步的List:
List list = Collections.synchronizedList(new LinkedList(…));
ArrayList类
ArrayList实现了可变大小的数组。它允许所有元素,包括null。ArrayList没有同步。
size,isEmpty,get,set方法运行时间为常数。但是add方法开销为分摊的常数,添加n个元素需要O(n)的时间。其他的方法运行时间为线性。
每个ArrayList实例都有一个容量(Capacity),即用于存储元素的数组的大小。这个容量可随着不断添加新元素而自动增加,但是增长算法并没有定义。当需要插入大量元素时,在插入前可以调用ensureCapacity方法来增加ArrayList的容量以提高插入效率。
和LinkedList一样,ArrayList也是非同步的。
如果涉及到堆栈,队列等操作,应该考虑用List,对于需要快速插入,删除元素,应该使用LinkedList,如果需要快速随机访问元素,应该使用ArrayList。
java中synchronized与lock的区别
synchronized的缺陷
前面博客有提到过释放对象的锁有两种情况:
- 程序执行完同步代码块会释放代码块。
- 程序在执行同步代码块是出现异常,JVM会自动释放锁去处理异常。
如果获取锁的线程需要等待I/O或者调用了sleep()方法被阻塞了,但仍持有锁,其他线程只能干巴巴的等着,这样就会很影响程序效率。
因此就需要一种机制,可以不让等待的线程已知等待下去,比如值等待一段时间或响应中断,Lock锁就可以办到。
再举个例子:当有多个线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。但是采用synchronized关键字来实现同步的话,就会导致一个问题:如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。
因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到。
另外,Lock可以知道线程有没有得到锁,而synchronized不能。
总结区别,Lock与synchronized有以下区别:
- Lock是一个接口,而synchronized是关键字。
- synchronized会自动释放锁,而Lock必须手动释放锁。
- Lock可以让等待锁的线程响应中断,而synchronized不会,线程会一直等待下去。
- 通过Lock可以知道线程有没有拿到锁,而synchronized不能。
- Lock能提高多个线程读操作的效率。
- synchronized能锁住类、方法和代码块,而Lock是块范围内的
Go 语言 – 实战简易文章系统
go语言很简单,只要有一定的编程基础都很容易使用它来编写一些程序,学完了go lang的语法,习惯写一个小程序,这里我写了一个简易的文章系统,非常简单。
目录结构如下:
1、main.go
func main() {
//文件系统
//fs := http.FileSystem(http.Dir("e:/tools"))
//http.Handle("/", http.FileServer(fs))
//log.Fatal(http.ListenAndServe(":8080", nil))
port := "8080"
web := http.Server{
Addr: ":"+port,
Handler: app.HttpHandler(),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
app.Router()
log.Printf("Listening on port %s", port)
log.Printf("Open http://localhost:%s in the browser", port)
log.Fatal(web.ListenAndServe())
}
首先我们从入口类开始,main()的方法,首先实例化了一个web服务器对象,传入了port跟Handler,handler使用的是一个全局性,也就是说所有的请求都会指向app.HttpHandler()。
接着调用app.Router()方法,初始化一些router,代码待会贴上。
2、Router.go
type route struct {
path string
method string
authorized bool
handler func(http.ResponseWriter, *http.Request)
}
const (
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
OPTION = "OPTION"
HEADER = "HEADER"
)
var (
routes map[string]route
)
func Router() {
//http.HandleFunc("/", indexHandler)
routes = map[string]route{}
routes["/"] = route{path: "/", method: GET, authorized: false, handler: indexHandler}
routes["/view/res/*"] = route{path: "/", method: GET, authorized: false, handler: resourcesHandler}
routes["/user"] = route{path: "/user", method: GET, authorized: true, handler: indexHandler}
routes["/add"] = route{path: "/add", method: GET, authorized: true, handler: addHandler}
routes["/save"] = route{path: "/edit", method: POST, authorized: true, handler: addSaveHandler}
routes["/view"] = route{path: "/view", method: GET, authorized: false, handler: viewHandler}
routes["/sign/in"] = route{path: "/sign/up", method: GET, authorized: false, handler: signInHandler}
routes["/sign/up"] = route{path: "/sign/up", method: GET, authorized: false, handler: signUpHandler}
routes["/doSignIn"] = route{path: "/doSignIn", method: POST, authorized: false, handler: signInSaveHandler}
routes["/doSignUp"] = route{path: "/doSignUp", method: POST, authorized: false, handler: signUpSaveHandler}
}
func NewRouter(key string) (r route, ok bool) {
if strings.Contains(key, "/view/res/") {
key = "/view/res/*"
}
r, err := routes[key]
return r, err
}
router需要一个类型来保存路由的基本信息,所以这里申明一个route类型对象,route类型:
- path string //路由的路径
- method string //方法名
- authorized bool //是否授权
- handler func(http.ResponseWriter, *http.Request) //处理函数
3、handler.go
const (
ERROR_NOT_FOUND = "ERROR_NOT_FOUND"
ERROR_NOT_METHOD = "ERROR_NOT_METHOD"
ERROR_AUTH_INVALID = "ERROR_AUTH_INVALID"
)
var (
mutex sync.Mutex
wg sync.WaitGroup
)
func init() {
wg.Add(100)
}
func HttpHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
log.Println(r.Header.Get("Accept"))
log.Println(r.Header.Get("User-Agent"))
log.Println(r.Header)
log.Println(r.Proto, r.Host, r.Method, r.URL.Path)
token, _ := r.Cookie("token")
log.Println("token", token)
//if strings.Index(r.URL.Path,"/view/res/") == 0 {
// resourcesHandler(w,r)
// return
//}
route, ok := NewRouter(r.URL.Path)
if !ok {
errorHandler(w, r, ERROR_NOT_FOUND)
return
}
if r.Method != route.method {
errorHandler(w, r, ERROR_NOT_METHOD)
return
}
if route.authorized && token != nil && len(token.Value) < 32 {
errorHandler(w, r, ERROR_AUTH_INVALID)
return
}
route.handler(w, r)
}
}
func errorHandler(w http.ResponseWriter, r *http.Request, s string) {
if s == ERROR_NOT_FOUND {
http.NotFound(w, r)
return
}
if s == ERROR_NOT_METHOD {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
http.Error(w, http.StatusText(http.StatusNonAuthoritativeInfo), http.StatusNonAuthoritativeInfo)
return
}
func resourcesHandler(w http.ResponseWriter, r *http.Request) {
filePath := conf.ROOT + string([]byte(r.URL.Path)[1:])
//file, err := os.OpenFile(filePath, os.O_RDONLY, 066)
//defer file.Close()
//if err != nil {
// fmt.Println("not found", filePath)
// return
//}
http.ServeFile(w, r, filePath)
}
这里没有用到WaitGroup,只是申明的时候忘记删除了。
主要的函数HttpHandler(),这时一个公共函数,类似于http的调度者,所有的请求都会call这个函数,然后再通过这个函数去分配控制器(route.handler(w, r))。
资源文件处理函数resourcesHandler(),这个函数是将go http服务器中的js、css、image等这些静态资源直接输出,开始不知道有http.ServeFile(w, r, string)这个函数,所以使用了最基本的os读取文件的方式把文件输出出去,其实如果全心投入到go语言,那么真的需要很好地去了解一下go语言的SDK。
3、Controller.go
func indexHandler(w http.ResponseWriter, r *http.Request) {
util.Output(w, tmpl.Index(r), util.PUT_HTML)
}
这里我只展示了一个函数,其余的函数都是一样的,这里使用了工具类,把信息输出给用户,其中信息的处理交给了tmpl.go的文件。
4、tmpl.go
package tmpl
import (
"book/model"
"book/util"
"fmt"
"net/http"
"strconv"
"strings"
)
func init() {
}
func Index(r *http.Request) string {
//return "Hello, World!"
h := NewHtml();
//h.body("<h1>Hello, World</h1>")
//h.body(util.GetViewTpl("index.html"))
list := model.GetArticles("select * from lx_article order by id desc")
var str string
tml := `
<div class="row">
<div class="col-left">
<img src="/view/res/img/file_101.353.png" class="img128"/>
<a href="/view?id=%d" target="_blank">%s</a>
</div>
<div class="col-right">%s</div>
<div class="col-right1"><a href="#">%s</a></div>
</div>`
for _, s := range list {
str += fmt.Sprintf(tml, s.Id, s.Title, s.CreateTimeF, s.User.Username)
}
h.body(strings.Replace(util.GetViewTpl("index.html"), "{{content}}", str, -1))
return h.Show("首页")
}
这个文件比较负责,设计了html的代码,我没有时间去编写模板引擎,所以使用了比较简单的字符替换的方式,把模板输出出去,其实在生产环境中,我们很有必要编写一个模板引起,不过现在流行的是前后端分离,所有的请求,都是通过接口的形式去调去,那么在实际应用中这一层是用不上的,但是为了实现一个简易的文章系统,这里我还是编写出这样不人性化的代码。
核心代码还有很多比如:数据库、模型等到,这里不一一贴出,帖子的最后会附上整个项目的源代码地址,现在我们来看看截图:
用户登录
发表文章:
首页的效果图:
查看文章:
项目地址:
Spring中集成RabbitMQ中的消息队列跟发布订阅
1.介绍
RabbitMQ是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要发布的邮件放在邮箱中时,您可以确定邮件先生或Mailperson女士最终会将邮件发送给您的收件人。在这个类比中,RabbitMQ是一个邮箱,邮局和邮递员。
RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据 – 消息。
RabbitMQ和一般的消息传递使用了一些术语。
1.生产只意味着发送,发送消息的程序是生产者:
2.queue是RabbitMQ中的邮箱的名称,虽然信息流经RabbitMQ和应用程序,但它们只能存储在queue中。queue仅由主机的存储器&磁盘限制约束,它本质上就是一个大的消息缓冲器。许多生产者可以发送消息到一个队列,并且许多消费者可以尝试从一个队列接收数据。这就是我们代表队列的方式:

3.消费消息与接收消息有类似的意义。一个消费者是一个程序,主要是等待接收信息:

请注意,生产者,消费者和代理不必驻留在同一主机上; 实际上在大多数应用中他们没有。应用程序也可以是生产者和消费者。
2.工作队列(Queue)
工作队列(又称:任务队列)主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们安排任务稍后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行许多工作程序时,它们之间将共享这些队列。
这个概念在Web应用程序中特别有用,在这些应用程序中,在短HTTP请求窗口期间无法处理复杂任务。
3.发布/订阅(Publish/Subscribe)
工作队列实际上就是将每个任务都交付给一个工作者(只有一个能接收)。在这一部分,我们将做一些完全不同的事情 – 我们将向多个消费者传递信息。此模式称为“发布/订阅”。
为了说明这种模式,我们将构建一个简单的日志记录系统。它将包含两个程序 – 第一个将发出日志消息,第二个将接收和打印它们。
在我们的日志记录系统中,接收程序的每个运行副本都将获取消息。这样我们就可以运行一个接收器并将日志定向到磁盘; 同时我们将能够运行另一个接收器并在屏幕上看到日志。
基本上,发布的日志消息将被广播给所有接收者。
Exchanges
我们向队列发送消息和从队列接收消息。现在是时候在RabbitMQ中引入完整的消息传递模型了。
RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。
相反,生产者只能向exchange发送消息。exchange是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面将它们推送到Queue。exchanges必须确切知道如何处理它收到的消息。它应该附加到特定队列吗?它应该附加到许多队列吗?或者它应该被丢弃。其规则由exchange类型定义 。
有几种交换类型可供选择:direct,topic,headers 和fanout。我们将专注于最后一个 – fanout。让我们创建一个这种类型的exchange,并将其称为logs:
channel.exchangeDeclare("logs", "fanout");
fanout exchange非常简单。正如您可能从名称中猜到的那样,它只是将收到的所有消息广播到它知道的所有队列中。而这正是我们记录器所需要的。
现在,我们可以发布到我们的命名交换:
channel.basicPublish("logs","",null,message.getBytes());
临时队列
能够命名队列对我们来说至关重要 – 我们需要将工作人员指向同一个队列。当您想要在生产者和消费者之间共享队列时,为队列命名非常重要。
但我们的记录器并非如此。我们希望了解所有日志消息,而不仅仅是它们的一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。
首先,每当我们连接到RabbitMQ时,我们都需要一个新的空队列。为此,我们可以使用随机名称创建队列,或者更好 – 让服务器为我们选择随机队列名称。
其次,一旦我们断开消费者,就应该自动删除队列。
在Java客户端中,当我们没有向queueDeclare()提供参数时,我们使用生成的名称创建一个非持久的,独占的并能勾自动删除队列:
String queueName = channel.queueDeclare().getQueue();
此时,queueName包含随机队列名称。例如:它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
绑定队列

我们已经创建了一个fanout exchange和一个队列。现在我们需要告诉exchange将消息发送到我们的队列。exchange和队列之间的关系称为绑定。
channel.queueBind(queueName, "logs", "");
从现在开始,logs exchange会将消息附加到我们的队列中。
列出绑定
你可以使用rabbitmqctl列出现有的绑定
rabbitmqctl list_bindings
把它们放在一起
以上的教材都来自官方文档的翻译,原文请查阅:
https://www.rabbitmq.com/getstarted.html
4.集成
maven
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.0</version>
</dependency>
配置生产者
private static final String host = "127.0.0.1";
private static final int prot = 5672;
/**
* 缓存Chanel
*/
private Map<String, Channel> producerMap = new ConcurrentHashMap<>();
/**
* 连接工厂
*/
private ConnectionFactory factory;
private Connection connection;
private Channel channel;
@Override
public void afterPropertiesSet() throws Exception {
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(prot);
}
/**
* 获取一个连接,如果为空或断开了连接则重新实例化
*
* @return Connection
* @throws Exception
*/
@Override
public Connection getConnection() throws Exception {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}
return connection;
}
/**
* 返回一个通道
*
* @return Channel
* @throws Exception
*/
@Override
public Channel getChannel() throws Exception {
if (channel == null || !channel.isOpen()) {
channel = this.getConnection().createChannel();
}
return channel;
}
/**
* 创建一个生产者,如果缓存中没有,则重新创建
*
* @param exchange Queue name|exchange name
* @param type queue|fanout|topic|headers|direct
* @param durable 是否持久性
* @return Channel
* @throws Exception
*/
@Override
public Channel createProducer(String exchange, String type, boolean durable) throws Exception {
if (producerMap.containsKey(exchange + type + durable)) {
logger("producer by cache.");
Channel c1 = producerMap.get(exchange + type + durable);
if (c1.isOpen()) {
return c1;
}
}
Channel c = this.getChannel();
if (type == null || queue.equals(type)) {
c.queueDeclare(exchange, durable, false, false, null);
} else {
c.exchangeDeclare(exchange, type, durable);
}
producerMap.put(exchange + type + durable, c);
return c;
}
/**
* 发送一条消息
*
* @param name Queue name|exchange name
* @param type queue|fanout|topic|headers|direct
* @param message content
* @return boolean
* @throws Exception
*/
@Override
public boolean send(String name, String type, String message) throws Exception {
try {
if (type == null || queue.equals(type)) {
this.getProducer(name, type).basicPublish("", name, null, message.getBytes());
} else {
this.getProducer(name, type).basicPublish(name, "", null, message.getBytes());
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
配置消费者
/**
* 添加一个exchange监听器
*
* @param exchange exchange name
* @param autoAck 是否自动响应
* @param listener DeliverCallback监听器
*/
@Override
public void addExchangeListener(String exchange, boolean autoAck, DeliverCallback listener) {
try {
Channel c = this.getChannel();
String queue = c.queueDeclare().getQueue();
c.queueBind(queue, exchange, "");
c.basicConsume(queue, autoAck, listener, e -> {
logger("exchange error:" + e);
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 添加一个Queue监听器
*
* @param queue Queue name
* @param autoAck 是否自动响应
* @param listener DeliverCallback监听器
*/
@Override
public void addQueueListener(String queue, boolean autoAck, DeliverCallback listener) {
try {
this.getProducer(queue).basicConsume(queue, autoAck, listener, e -> {
logger("queue error:" + e);
});
} catch (Exception e) {
e.printStackTrace();
}
}
因为exchange需要绑定临时队列,所以就用两个方法来分开绑定监听。
测试代码
@RestController
@RequestMapping(value = "/test")
public class TestRabbitMQController {
private static final Logger logger = Logger.getLogger("RabbitMQ>");
@Resource
private IRabbitMQService rabbitMQService;
/**
* http://localhost:8180/test/rabbitmq/pub/add?name=lan.queue&type=queue
* http://localhost:8180/test/rabbitmq/pub/add?name=lan.fanout&type=fanout
* http://localhost:8180/test/rabbitmq/pub/add?name=lan.topic&type=topic
*
* @param request
* @return
*/
@RequestMapping(value = "/rabbitmq/pub/add")
public ResultResp<Void> queue(HttpServletRequest request) {
ResultResp<Void> resp = new ResultResp<>();
String name = request.getParameter("name");
String type = request.getParameter("type");
String msg = "test RabbitMQ " + type + " " + name + " " + DateTimeUtils.getTime();
try {
rabbitMQService.send(name, type, msg);
resp.setInfo(msg);
} catch (Exception e) {
e.printStackTrace();
resp.setInfo(e.getMessage());
}
return resp;
}
/**
* http://localhost:8180/test/rabbitmq/sub/add?id=100&name=lan.queue&type=queue
* http://localhost:8180/test/rabbitmq/sub/add?id=101&name=lan.queue&type=queue
* http://localhost:8180/test/rabbitmq/sub/add?id=102&name=lan.queue&type=queue
*
* http://localhost:8180/test/rabbitmq/sub/add?id=103&name=lan.fanout&type=fanout
* http://localhost:8180/test/rabbitmq/sub/add?id=104&name=lan.fanout&type=fanout
* http://localhost:8180/test/rabbitmq/sub/add?id=105&name=lan.fanout&type=fanout
*
* http://localhost:8180/test/rabbitmq/sub/add?id=106&name=lan.topic&type=topic
* http://localhost:8180/test/rabbitmq/sub/add?id=107&name=lan.topic&type=topic
* http://localhost:8180/test/rabbitmq/sub/add?id=108&name=lan.topic&type=topic
*
* @param request
* @return
*/
@RequestMapping(value = "/rabbitmq/sub/add")
public ResultResp<Void> topic(HttpServletRequest request) {
ResultResp<Void> resp = new ResultResp<>();
String id = request.getParameter("id");
String name = request.getParameter("name");
String type = request.getParameter("type");
try {
rabbitMQService.addListener(name, type, (s, c) -> {
logger.info(id + "# message:" + new String(c.getBody()) + ", routing:" + c.getEnvelope().getRoutingKey());
});
resp.setInfo(id);
} catch (Exception e) {
e.printStackTrace();
resp.setInfo(e.getMessage());
}
return resp;
}
}
5.完整代码
RabbitMQService接口
package com.lanxinbase.system.service.resource;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
/**
* Created by alan on 2019/5/2.
*/
public interface IRabbitMQService {
Connection getConnection() throws Exception;
Channel getChannel() throws Exception;
Channel getProducer(String name) throws Exception;
Channel getProducer(String name, String type) throws Exception;
Channel createProducer(String name, String type, boolean durable) throws Exception;
boolean send(String name, String message) throws Exception;
boolean send(String name, String type, String message) throws Exception;
void addListener(String name, String type,DeliverCallback listener);
void addListener(String name, String type, boolean autoAck, DeliverCallback listener);
void addExchangeListener(String exchange, boolean autoAck, DeliverCallback listener);
void addQueueListener(String queue, boolean autoAck, DeliverCallback listener);
}
RabbitMQService实现
package com.lanxinbase.system.service;
import com.lanxinbase.system.basic.CompactService;
import com.lanxinbase.system.service.resource.IRabbitMQService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by alan on 2019/5/3.
* <p>
* <p>
* 0.需要下载Erlang,并且设置好ERLANG_HOME的环境变量,类似于JDK的配置方式。
* 1.下载RabbitMQ
* 3.运行RabbitMQ,like this:./sbin/rabbitmq-server.bat
* <p>
* Queue Test:
* 生产者:
* http://localhost:8180/test/rabbitmq/pub/add?name=lan.queue&type=queue
* <p>
* rabbitMQService.send(name, type, msg);
* <p>
* 消费者(3个):
* http://localhost:8180/test/rabbitmq/sub/add?id=100&name=lan.queue&type=queue
* http://localhost:8180/test/rabbitmq/sub/add?id=101&name=lan.queue&type=queue
* http://localhost:8180/test/rabbitmq/sub/add?id=102&name=lan.queue&type=queue
* <p>
* rabbitMQService.addListener(name, type, (s, c) -> {
* logger.info(id + "# message:" + new String(c.getBody()) + ", routing:" + c.getEnvelope().getRoutingKey());
* });
* <p>
* Queue运行日志:
* 03-May-2019 22:24:37.773 lambda$topic$0 101# message:test RabbitMQ queue lan.queue 1556893477772, routing:lan.queue
* 03-May-2019 22:24:38.467 lambda$topic$0 102# message:test RabbitMQ queue lan.queue 1556893478466, routing:lan.queue
* 03-May-2019 22:24:39.376 lambda$topic$0 100# message:test RabbitMQ queue lan.queue 1556893479374, routing:lan.queue
* <p>
* 这里生产者生产了3条信息,Queue消息不会丢失,如果生产者生产消息的时候没有消费者进入,那么消息会等到消费者进入后发送给消费者。
* 如果有多个消费者监听同一个Queue,那么则会按照某种算法,将消息发送给其中一个消费者,如果接收成功后,通道会自动删除消息。
* <p>
* Exchange Test:
* 生产者:
* http://localhost:8180/test/rabbitmq/pub/add?name=lan.fanout&type=fanout
* http://localhost:8180/test/rabbitmq/pub/add?name=lan.topic&type=topic
* <p>
* rabbitMQService.send(name, type, msg);
* <p>
* 消费者:
* http://localhost:8180/test/rabbitmq/sub/add?id=103&name=lan.fanout&type=fanout
* http://localhost:8180/test/rabbitmq/sub/add?id=104&name=lan.fanout&type=fanout
* http://localhost:8180/test/rabbitmq/sub/add?id=105&name=lan.fanout&type=fanout
* <p>
* http://localhost:8180/test/rabbitmq/sub/add?id=106&name=lan.topic&type=topic
* http://localhost:8180/test/rabbitmq/sub/add?id=107&name=lan.topic&type=topic
* http://localhost:8180/test/rabbitmq/sub/add?id=108&name=lan.topic&type=topic
* <p>
* rabbitMQService.addListener(name, type, (s, c) -> {
* logger.info(id + "# message:" + new String(c.getBody()) + ", routing:" + c.getEnvelope().getRoutingKey());
* });
* <p>
* Exchange运行日志:
* 03-May-2019 22:24:42.424 lambda$topic$0 104# message:test RabbitMQ fanout lan.fanout 1556893482420, routing:
* 03-May-2019 22:24:42.425 lambda$topic$0 103# message:test RabbitMQ fanout lan.fanout 1556893482420, routing:
* 03-May-2019 22:24:42.425 lambda$topic$0 105# message:test RabbitMQ fanout lan.fanout 1556893482420, routing:
* <p>
* 03-May-2019 22:24:46.077 lambda$topic$0 107# message:test RabbitMQ topic lan.topic 1556893486075, routing:
* 03-May-2019 22:24:46.077 lambda$topic$0 108# message:test RabbitMQ topic lan.topic 1556893486075, routing:
* 03-May-2019 22:24:46.077 lambda$topic$0 106# message:test RabbitMQ topic lan.topic 1556893486075, routing:
* <p>
* 从日志时间上可以看的出,生产者的消息,全部同时发送给了所有消费者。如果生产者生产消息的时候没有消费者进入,那么消息会丢失。
* 当有消费者监听Topic时,可以收到消息,如果同时有多个消费者监听同一个topic,那么消息将分别发送给各个消费者。
*
* @See TestRabbitMQController
*/
@Service
public class RabbitMQService extends CompactService implements InitializingBean, DisposableBean, IRabbitMQService {
private static final String host = "127.0.0.1";
private static final int prot = 5672;
public static final String TOPIC_DEFAULT = "lan.topic";
public static final String DIRECT_DEFAULT = "lan.direct";
public static final String HEADERS_DEFAULT = "lan.headers";
public static final String FANOUT_DEFAULT = "lan.fanout";
public static final String QUEUE_DEFAULT = "lan.queue";
public static final String direct = "direct";
public static final String topic = "topic";
public static final String fanout = "fanout";
public static final String headers = "headers";
public static final String queue = "queue";
/**
* 缓存Chanel
*/
private Map<String, Channel> producerMap = new ConcurrentHashMap<>();
/**
* 连接工厂
*/
private ConnectionFactory factory;
private Connection connection;
private Channel channel;
public RabbitMQService() {
}
@Override
public void afterPropertiesSet() throws Exception {
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(prot);
}
/**
* 获取一个连接,如果为空或断开了连接则重新实例化
*
* @return Connection
* @throws Exception
*/
@Override
public Connection getConnection() throws Exception {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}
return connection;
}
/**
* 返回一个通道
*
* @return Channel
* @throws Exception
*/
@Override
public Channel getChannel() throws Exception {
if (channel == null || !channel.isOpen()) {
channel = this.getConnection().createChannel();
}
return channel;
}
/**
* 获取一个生产者
*
* @param name Queue name|exchange name
* @return Channel
* @throws Exception
*/
@Override
public Channel getProducer(String name) throws Exception {
return this.getProducer(name, queue);
}
/**
* 获取一个生产者
*
* @param name Queue name|exchange name
* @param type queue|fanout|topic|headers|direct
* @return Channel
* @throws Exception
*/
@Override
public Channel getProducer(String name, String type) throws Exception {
return this.createProducer(name, type, false);
}
/**
* 创建一个生产者,如果缓存中没有,则重新创建
*
* @param exchange Queue name|exchange name
* @param type queue|fanout|topic|headers|direct
* @param durable 是否持久性
* @return Channel
* @throws Exception
*/
@Override
public Channel createProducer(String exchange, String type, boolean durable) throws Exception {
if (producerMap.containsKey(exchange + type + durable)) {
logger("producer by cache.");
Channel c1 = producerMap.get(exchange + type + durable);
if (c1.isOpen()) {
return c1;
}
}
Channel c = this.getChannel();
if (type == null || queue.equals(type)) {
c.queueDeclare(exchange, durable, false, false, null);
} else {
c.exchangeDeclare(exchange, type, durable);
}
producerMap.put(exchange + type + durable, c);
return c;
}
/**
* 发送一条消息,默认只发送queue消息
*
* @param name Queue name|exchange name
* @param message content
* @return boolean
* @throws Exception
*/
@Override
public boolean send(String name, String message) throws Exception {
return this.send(name, queue, message);
}
/**
* 发送一条消息
*
* @param name Queue name|exchange name
* @param type queue|fanout|topic|headers|direct
* @param message content
* @return boolean
* @throws Exception
*/
@Override
public boolean send(String name, String type, String message) throws Exception {
try {
if (type == null || queue.equals(type)) {
this.getProducer(name, type).basicPublish("", name, null, message.getBytes());
} else {
this.getProducer(name, type).basicPublish(name, "", null, message.getBytes());
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 设置消费者监听
*
* @param name Queue name|exchange name
* @param type queue|fanout|topic|headers|direct
* @param listener DeliverCallback监听器
*/
@Override
public void addListener(String name, String type, DeliverCallback listener) {
this.addListener(name, type, true, listener);
}
/**
* 设置消费者监听
*
* @param name Queue name|exchange name
* @param type queue|fanout|topic|headers|direct
* @param autoAck 是否自动响应
* @param listener DeliverCallback监听器
*/
@Override
public void addListener(String name, String type, boolean autoAck, DeliverCallback listener) {
if (type == null || queue.equals(type)) {
this.addQueueListener(name, autoAck, listener);
} else {
this.addExchangeListener(name, autoAck, listener);
}
}
/**
* 添加一个exchange监听器
*
* @param exchange exchange name
* @param autoAck 是否自动响应
* @param listener DeliverCallback监听器
*/
@Override
public void addExchangeListener(String exchange, boolean autoAck, DeliverCallback listener) {
try {
Channel c = this.getChannel();
String queue = c.queueDeclare().getQueue();
c.queueBind(queue, exchange, "");
c.basicConsume(queue, autoAck, listener, e -> {
logger("exchange error:" + e);
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 添加一个Queue监听器
*
* @param queue Queue name
* @param autoAck 是否自动响应
* @param listener DeliverCallback监听器
*/
@Override
public void addQueueListener(String queue, boolean autoAck, DeliverCallback listener) {
try {
this.getProducer(queue).basicConsume(queue, autoAck, listener, e -> {
logger("queue error:" + e);
});
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void destroy() throws Exception {
channel.close();
connection.close();
}
}
RabbitMQService服务类每个方法都写了说明,这里就不解释了。











近期评论