Maple's Blog.

RabbitMQ Node demo

字数统计: 269阅读时长: 1 min
2019/09/05
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
const options = {
hostname: '192.168.2.121',
username: 'username',
password: 'password'
};


class Rabbit {
constructor(options) {
this._options = Object.assign({
protocol: 'amqp',
port: 5672,
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',

}, options);

this._conn = null;
this._inited = false;

this._channelMap = {

};

this._operatorMap = {

};
}

async init() {
if (this._inited) {
return;
}

const conn = await connect(this._options);
this._conn = conn;
this._inited = true;
}

async _getChannel(queue) {
let channel;

if (this._channelMap[queue]) {
channel = this._channelMap[queue];
} else {
channel = await this._conn.createChannel();
channel.assertQueue(queue);
this._channelMap[queue] = channel;
}
return channel;
}

channel(queue) {
if (queue.trim() === '') {
throw new Error('queue name should not be empty!');
}
if (this._operatorMap[queue]) {
return this._operatorMap[queue];
}

const self = this;

const operator = {
send: async function(msg) {
const channel = await self._getChannel(queue);
if (typeof msg === 'object') {
msg = JSON.stringify(msg);
}

channel.sendToQueue(queue, Buffer.from(msg));
},
subscribe: async function(func) {
const channel = await self._getChannel(queue);
channel.consume(queue, async function(msg) {
let status = true;

try {
let data = msg.content.toString();

try {
if (data && data[0] === '{') {
data = JSON.parse(data);
}
} catch(err) {
console.error(err);
}

await func(data);
} catch(err) {
console.error(err);
status = false;
}

if (status) channel.ack(msg);
});
}
};

this._operatorMap[queue] = operator;
return operator;
}

}
CATALOG