引题
公司项目在从java切换Node,其中一个做作的事情就是ActiveMQ消息处理。下面介绍EggJS项目中监听ActiveMQ消息并处理。
1. ActiveMQ客户端工具库
ActiveMQ支持很多客户端(http://activemq.apache.org/cross-language-clients.html),Node官方支持就是stompit (https://github.com/gdaws/node-stomp)。全完支持Stomp协议。
A STOMP client library for Node.js that is fully compatible with STOMP 1.0, 1.1 and 1.2 servers.
1.1 STOMP协议简介:
STOMP: Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议。
STOMP 1.2规范 : https://stomp.github.io/stomp-specification-1.2.html
STOMP Server:

STOMP Clients比较多了:参考这里:https://stomp.github.io/implementations.html。
STOMP协议与HTTP协议很相似,它基于TCP协议。
STOMP的客户端和服务器之间的通信是通过“帧”(Frame)实现的,每个帧由多“行”(Line)组成。
1.2 STOMP协议示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| CONNECT accept-version:1.0,1.1,1.2 host:172.19.71.120 login: passcode: heart-beat:5000,5000
...
CONNECTED heart-beat:5000,5000 session:ID:lin-71-120-51748-1540376424566-4:40 server:ActiveMQ/5.10.2 version:1.2
...
SUBSCRIBE subscription:d2cTradeOperateTopic222 destination:/topic/t.d2c.tc.cash.TradeOperate ack:client-individual id:1
...
MESSAGE X-B3-SpanId:9e83f8e2f14a9fbb X-B3-TraceId:ea0628b489b2d91d ack:ID\clin-71-120-51748-1540376424566-44\c1 message-id:ID\clin-71-181-52505-1542678667123-1\c2\c3\c1\c3180 X-B3-Sampled:1 destination:/topic/t.d2c.tc.cash.TradeOperate timestamp:1545627372565 expires:0 subscription:1 persistent:true priority:4 X-B3-ParentSpanId:aaa522614fd78116
{"tid":"TCC18122412561211071928","receiptId":null,"event":"CASHIER_SELLER_CREATE_ONLINE","sellerCode":"A882080","operatorCode":"E114771","buyerCode":"R11344502355169","vemId":null,"tradeCreateTime":null,"tradePaidTime":null,"giftCardRefundPrice":null,"msgId":"Cash:MQ18122412561211041399"}. ...
ACK subscription:1 message-id:ID\clin-71-181-52505-1542678667123-1\c2\c3\c1\c3180 id:ID\clin-71-120-51748-1540376424566-44\c1
|
抓包文件, 右键保存去掉到.png
过滤tcp.port == 52622
1.3 STOMP心跳问题
STOMP连接是一个长连接, STOMP协议定义了发送心跳来监测stomp连接是否存活。
Client和Server根据心跳来判定对方已经挂掉了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| // 客户端发送, <cx>, <cy>分别代表一个以毫秒为单位的数字. CONNECT accept-version:1.0,1.1,1.2 host:172.19.71.120 login: passcode: heart-beat:<cx>,<cy>
// 服务端回复, <sx>, <sy>分别代表一个以毫秒为单位的数字. CONNECTED heart-beat:<sx>,<sy> session:ID:lin-71-120-51748-1540376424566-4:36 server:ActiveMQ/5.10.2 version:1.2
|
1 2 3 4 5
| <cx>: client能保证的发送心跳的最小间隔, 如果是0代表client不发送心跳. <cy>: client希望收到server心跳的间隔, 如果是0代表client不希望收到server的心跳.
<sx>: server能保证的发送心跳的最小间隔, 如果是0代表server不发送心跳. <sy>: server希望收到client心跳的间隔, 如果是0代表server不希望收到client的心跳.
|
如果在建立连接时没有心跳header, 默认当作heart-beat:0,0. 也就是不发心跳, 也不希望对方发心跳.
加入心跳header进行连接后, 最终协商得出发送心跳的频率的逻辑如下:
1 2 3 4 5
| Client: 取<cx>与<sy>的最大值, 也就是说client会取client最小能发送的间隔与server希望client发送间隔的最大值来发送心跳. 如果<cx>或<sy>中任何一个为0, client都不发送心跳.
Server: 取<sx>与<cy>的最大值, 也就是说server取server最小能发送的间隔与client希望server发送间隔的最大值来发送心跳. 如果<sx>或<cy>中任何一个为0, server都不发送心跳.
|
2. 在EggJS上集中MQ消息处理
由于MQ消息一般比较重要,因此我们将MQ消息兼听处理放到Agent进程中。EggJS进程模型如下:
1 2 3 4 5 6 7 8 9 10
| Koa Application ^ EggCore ^ ┌──────┴───────┐ │ │ Egg Agent Egg Application ^ ^ agent worker app worker
|
2.1 在项目根目录下建立agent.ts文件,egg会默认把这个启动到Agent进程.

2.2 兼听MQ消息代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| import stompit from 'stompit'; config = { host: '172.19.71.120', 'client-id': 'shop-bff', port: 61613, // 注册这里端口跟java不一样的; connectHeaders: { host: '172.19.71.120', login: '', passcode: '', // 'heart-beat': '3000,3000', } }
subscriptCashOrder(agent) { let subscribeHeaders = { subscription: 'd2cTradeOperateTopic', destination: '/topic/t.d2c.tc.cash.TradeOperate', ack: 'client-individual', //auto, client, client-individual, };
this.subscriptOrder(subscribeHeaders, (error, body) => { this.handlerOrder(agent, error, body, 'cashOrder'); }); }
/** * 订阅订单消息 */ subscriptOrder(subscribeHeaders, callback) { let connectOptions = this.config; stompit.connect( connectOptions, (error, client) => { if (error) { console.log('connect error ' + error.message); return; } client.subscribe(subscribeHeaders, (error, message) => { callback(error, message); client.ack(message); }); }, ); }
/** * 处理消息; */ handlerOrder(agent, error, message, type) { if (error) { console.log('subscribe error ' + type + ': ' + error.message); return; } message.readString('utf-8', (error, body) => { // 在这里处理消息回调 console.log('received message end =========== ' + type); client.ack(message); client.disconnect(); }); }
|
注意:STOMP协议默认端口是61613, 见http://activemq.apache.org/stomp.html
SUBSCRIBE消息的 ack 头:
- auto: client收到server发来的消息后不需要回复ACK frame. server假定消息发出去后client就已经收到。这种确认方式可以减少消息传输的次数.
- client: client必须发送ACK frame给servers, 让它处理消息. 如果在client发送ACK frame之前连接断开了,那么server将假设消息没有被处理,可能会再次发送消息给另外的客户端。client发送的ACK frame被当作是积累的确认。这就意味这种确认方式会去操作ACK frame指定的消息和订阅的所有消息. 譬如接收了10条消息,如果你ack了第8条消息,那么1-7条消息都会被ack,只有9-10两条消息还保持未ack状态。由于client不能处理某些消息,所以client应该发送NACK frame去告诉server它不能消费这些消息。
- client-individual,确认工作就像上面的’client’确认模式(除了由客户端发送的ACK或NACK帧)不会被累计。这意味着,后续ACK, NACK消息帧,也不能影响前面的消息的确认。
2.3 ActiveMQ注册中心
http://172.19.71.120:8161/admin/connections.jsp
启动上面代友可以看到注册中心就有这个客户端且是以Storm协议注册的:

也可以使用这个注册中心提供工具发MQ消息:

参考
STOMP Protocol Specification, Version 1.1
STOMP Protocol Specification, Version 1.2
stomp-over-websocket协议原理与实现
STOMP协议详解