Node监听MQ消息

引题

公司项目在从java切换Node,其中一个做作的事情就是ActiveMQ消息处理。下面介绍EggJS项目中监听ActiveMQ消息并处理。

1. ActiveMQ客户端工具库

ActiveMQ支持很多客户端(http://activemq.apache.org/cross-language-clients.html),Node官方支持就是stompit (https://github.com/gdaws/node-stomp)。全完支持Stomp协议。

A STOMP client library for Node.js that is fully compatible with STOMP 1.0, 1.1 and 1.2 servers.

1.1 STOMP协议简介:

STOMP: Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议。
STOMP 1.2规范 : https://stomp.github.io/stomp-specification-1.2.html

STOMP Server:

STOMP Clients比较多了:参考这里:https://stomp.github.io/implementations.html。

STOMP协议与HTTP协议很相似,它基于TCP协议。
STOMP的客户端和服务器之间的通信是通过“帧”(Frame)实现的,每个帧由多“行”(Line)组成。

1.2 STOMP协议示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
CONNECT
accept-version:1.0,1.1,1.2
host:172.19.71.120
login:
passcode:
heart-beat:5000,5000

...

CONNECTED
heart-beat:5000,5000
session:ID:lin-71-120-51748-1540376424566-4:40
server:ActiveMQ/5.10.2
version:1.2

...

SUBSCRIBE
subscription:d2cTradeOperateTopic222
destination:/topic/t.d2c.tc.cash.TradeOperate
ack:client-individual
id:1

...

MESSAGE
X-B3-SpanId:9e83f8e2f14a9fbb
X-B3-TraceId:ea0628b489b2d91d
ack:ID\clin-71-120-51748-1540376424566-44\c1
message-id:ID\clin-71-181-52505-1542678667123-1\c2\c3\c1\c3180
X-B3-Sampled:1
destination:/topic/t.d2c.tc.cash.TradeOperate
timestamp:1545627372565
expires:0
subscription:1
persistent:true
priority:4
X-B3-ParentSpanId:aaa522614fd78116

{"tid":"TCC18122412561211071928","receiptId":null,"event":"CASHIER_SELLER_CREATE_ONLINE","sellerCode":"A882080","operatorCode":"E114771","buyerCode":"R11344502355169","vemId":null,"tradeCreateTime":null,"tradePaidTime":null,"giftCardRefundPrice":null,"msgId":"Cash:MQ18122412561211041399"}.
...

ACK
subscription:1
message-id:ID\clin-71-181-52505-1542678667123-1\c2\c3\c1\c3180
id:ID\clin-71-120-51748-1540376424566-44\c1

抓包文件, 右键保存去掉到.png
过滤tcp.port == 52622

1.3 STOMP心跳问题

STOMP连接是一个长连接, STOMP协议定义了发送心跳来监测stomp连接是否存活。
Client和Server根据心跳来判定对方已经挂掉了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 客户端发送, <cx>, <cy>分别代表一个以毫秒为单位的数字.
CONNECT
accept-version:1.0,1.1,1.2
host:172.19.71.120
login:
passcode:
heart-beat:<cx>,<cy>

// 服务端回复, <sx>, <sy>分别代表一个以毫秒为单位的数字.
CONNECTED
heart-beat:<sx>,<sy>
session:ID:lin-71-120-51748-1540376424566-4:36
server:ActiveMQ/5.10.2
version:1.2
1
2
3
4
5
<cx>: client能保证的发送心跳的最小间隔, 如果是0代表client不发送心跳.
<cy>: client希望收到server心跳的间隔, 如果是0代表client不希望收到server的心跳.

<sx>: server能保证的发送心跳的最小间隔, 如果是0代表server不发送心跳.
<sy>: server希望收到client心跳的间隔, 如果是0代表server不希望收到client的心跳.

如果在建立连接时没有心跳header, 默认当作heart-beat:0,0. 也就是不发心跳, 也不希望对方发心跳.
加入心跳header进行连接后, 最终协商得出发送心跳的频率的逻辑如下:

1
2
3
4
5
Client: 取<cx>与<sy>的最大值, 也就是说client会取client最小能发送的间隔与server希望client发送间隔的最大值来发送心跳. 
如果<cx>或<sy>中任何一个为0, client都不发送心跳.

Server: 取<sx>与<cy>的最大值, 也就是说server取server最小能发送的间隔与client希望server发送间隔的最大值来发送心跳.
如果<sx>或<cy>中任何一个为0, server都不发送心跳.

2. 在EggJS上集中MQ消息处理

由于MQ消息一般比较重要,因此我们将MQ消息兼听处理放到Agent进程中。EggJS进程模型如下:

1
2
3
4
5
6
7
8
9
10
      Koa Application
^
EggCore
^
┌──────┴───────┐
│ │
Egg Agent Egg Application
^ ^
agent worker app worker

2.1 在项目根目录下建立agent.ts文件,egg会默认把这个启动到Agent进程.

2.2 兼听MQ消息代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import stompit from 'stompit';

config = {
host: '172.19.71.120',
'client-id': 'shop-bff',
port: 61613, // 注册这里端口跟java不一样的;
connectHeaders: {
host: '172.19.71.120',
login: '',
passcode: '',
// 'heart-beat': '3000,3000',
}
}

subscriptCashOrder(agent) {
let subscribeHeaders = {
subscription: 'd2cTradeOperateTopic',
destination: '/topic/t.d2c.tc.cash.TradeOperate',
ack: 'client-individual', //auto, client, client-individual,
};

this.subscriptOrder(subscribeHeaders, (error, body) => {
this.handlerOrder(agent, error, body, 'cashOrder');
});
}

/**
* 订阅订单消息
*/
subscriptOrder(subscribeHeaders, callback) {
let connectOptions = this.config;
stompit.connect(
connectOptions,
(error, client) => {
if (error) {
console.log('connect error ' + error.message);
return;
}
client.subscribe(subscribeHeaders, (error, message) => {
callback(error, message);
client.ack(message);
});
},
);
}

/**
* 处理消息;
*/
handlerOrder(agent, error, message, type) {
if (error) {
console.log('subscribe error ' + type + ': ' + error.message);
return;
}
message.readString('utf-8', (error, body) => {
// 在这里处理消息回调
console.log('received message end =========== ' + type);
client.ack(message);
client.disconnect();
});
}

注意:STOMP协议默认端口是61613, 见http://activemq.apache.org/stomp.html

SUBSCRIBE消息的 ack 头:

  • auto: client收到server发来的消息后不需要回复ACK frame. server假定消息发出去后client就已经收到。这种确认方式可以减少消息传输的次数.
  • client: client必须发送ACK frame给servers, 让它处理消息. 如果在client发送ACK frame之前连接断开了,那么server将假设消息没有被处理,可能会再次发送消息给另外的客户端。client发送的ACK frame被当作是积累的确认。这就意味这种确认方式会去操作ACK frame指定的消息和订阅的所有消息. 譬如接收了10条消息,如果你ack了第8条消息,那么1-7条消息都会被ack,只有9-10两条消息还保持未ack状态。由于client不能处理某些消息,所以client应该发送NACK frame去告诉server它不能消费这些消息。
  • client-individual,确认工作就像上面的’client’确认模式(除了由客户端发送的ACK或NACK帧)不会被累计。这意味着,后续ACK, NACK消息帧,也不能影响前面的消息的确认。

2.3 ActiveMQ注册中心

http://172.19.71.120:8161/admin/connections.jsp

启动上面代友可以看到注册中心就有这个客户端且是以Storm协议注册的:

也可以使用这个注册中心提供工具发MQ消息:

参考

  1. STOMP Protocol Specification, Version 1.1

  2. STOMP Protocol Specification, Version 1.2

  3. stomp-over-websocket协议原理与实现

  4. STOMP协议详解