title: NodeJs订阅MQ消息
tag:
- ActiveMQ
- NodeJs
- stompit
categories:
- NodeJs

NodeJs订阅MQ消息

一直没有用过node订阅过MQ消息,用java也没有写,摸索一个晚上,终于明白了, 原来node是stomp协议访问端口是:61613

1. 订阅queue

ActiveMQ管理端
http://172.19.71.120:8161
admin:admin

1.1 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
var stompit = require('stompit');

/**
* 获取MQ监听
* @param agent
*/
function startCloudPrintService() {
console.log('startCloudPrintService');
const connectOptions = {
host: '172.19.71.120',
'client-id': 'shop-bff',
port: 61613,
timeout: 30000,
connectHeaders: {
host: '172.19.71.120',
login: '',
passcode: '',
'heart-beat': '5000,5000',
},
};
stompit.connect(
connectOptions,
function(error, client) {
if (error) {
console.log('connect error ' + error.message);
return;
}

// let sendHeaders = {
// destination: '/queue/test',
// 'content-type': 'text/plain',
// };

// let frame = client.send(sendHeaders);
// frame.write('hello');
// frame.end();

let subscribeHeaders = {
subscription: 'd2cTradeOperateTopic',
destination: 't.d2c.tc.TradeOperate',
ack: 'client-individual',
};

client.subscribe(subscribeHeaders, function(error, message) {
if (error) {
console.log('subscribe error ' + error.message);
return;
}

message.readString('utf-8', function(error, body) {
if (error) {
console.log('read message error ' + error.message);
return;
}

console.log('received message: ' + body);

client.ack(message);

client.disconnect();
});
});
},
);
}

startCloudPrintService();

1.2 抓包报文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
CONNECT
accept-version:1.0,1.1,1.2
host:172.19.71.120
login:
passcode:
heart-beat:5000,5000

.
CONNECTED
heart-beat:5000,5000
session:ID:lin-71-120-57100-1536112290743-2:23
server:ActiveMQ/5.10.2
version:1.2

.
SUBSCRIBE
subscription:d2cTradeOperateTopic
destination:t.d2c.tc.TradeOperate
ack:client-individual
id:1

.













MESSAGE
ack:ID\clin-71-120-57100-1536112290743-29\c1
message-id:ID\clin-71-120-57100-1536112290743-11\c2\c1\c1\c56
type:
destination:/queue/t.d2c.tc.TradeOperate
timestamp:1538268629708
expires:0
subscription:1
priority:0
correlation-id:

Enter some text here for the message body....
ACK
subscription:1
message-id:ID\clin-71-120-57100-1536112290743-11\c2\c1\c1\c56
id:ID\clin-71-120-57100-1536112290743-29\c1

.
DISCONNECT
receipt:1

.
RECEIPT
receipt-id:1

.


2. 订阅topic

2.1 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
var stompit = require('stompit');

/**
* 获取MQ监听
* @param agent
*/
function startCloudPrintService() {
console.log('startCloudPrintService');
const connectOptions = {
host: '172.19.71.120',
'client-id': 'shop-bff',
port: 61613,
timeout: 30000,
connectHeaders: {
host: '172.19.71.120',
login: '',
passcode: '',
'heart-beat': '5000,5000',
},
};
stompit.connect(
connectOptions,
function(error, client) {
if (error) {
console.log('connect error ' + error.message);
return;
}

let subscribeHeaders = {
subscription: 'd2cTradeOperateTopic',
destination: '/topic/t.d2c.tc.TradeOperate',
ack: 'client-individual',
};

client.subscribe(subscribeHeaders, function(error, message) {
if (error) {
console.log('subscribe error ' + error.message);
return;
}

message.readString('utf-8', function(error, body) {
if (error) {
console.log('read message error ' + error.message);
return;
}

console.log('received message: ' + body);

client.ack(message);

client.disconnect();
});
});
},
);
}

startCloudPrintService();

2.2 报文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
CONNECT
accept-version:1.0,1.1,1.2
host:172.19.71.120
login:
passcode:
heart-beat:5000,5000

.
CONNECTED
heart-beat:5000,5000
session:ID:lin-71-120-57100-1536112290743-2:28
server:ActiveMQ/5.10.2
version:1.2

.
SUBSCRIBE
subscription:d2cTradeOperateTopic
destination:/topic/t.d2c.tc.TradeOperate
ack:client-individual
id:1

.







MESSAGE
ack:ID\clin-71-120-57100-1536112290743-34\c1
message-id:ID\clin-71-120-57100-1536112290743-11\c2\c1\c1\c59
type:
destination:/topic/t.d2c.tc.TradeOperate
timestamp:1538269174376
expires:0
subscription:1
priority:0
correlation-id:

Enter some text here for the message body....
ACK
subscription:1
message-id:ID\clin-71-120-57100-1536112290743-11\c2\c1\c1\c59
id:ID\clin-71-120-57100-1536112290743-34\c1

.
DISCONNECT
receipt:1

.
RECEIPT
receipt-id:1

.

参考 :

  1. http://jmesnil.net/stomp-websocket/doc/
  2. https://github.com/gdaws/node-stomp
  3. http://activemq.apache.org/stomp.html
  4. http://stomp.github.io/index.html