1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| const options = { hostname: '192.168.2.121', username: 'username', password: 'password' };
class Rabbit { constructor(options) { this._options = Object.assign({ protocol: 'amqp', port: 5672, locale: 'en_US', frameMax: 0, heartbeat: 0, vhost: '/',
}, options);
this._conn = null; this._inited = false;
this._channelMap = {
};
this._operatorMap = {
}; }
async init() { if (this._inited) { return; }
const conn = await connect(this._options); this._conn = conn; this._inited = true; }
async _getChannel(queue) { let channel;
if (this._channelMap[queue]) { channel = this._channelMap[queue]; } else { channel = await this._conn.createChannel(); channel.assertQueue(queue); this._channelMap[queue] = channel; } return channel; }
channel(queue) { if (queue.trim() === '') { throw new Error('queue name should not be empty!'); } if (this._operatorMap[queue]) { return this._operatorMap[queue]; }
const self = this;
const operator = { send: async function(msg) { const channel = await self._getChannel(queue); if (typeof msg === 'object') { msg = JSON.stringify(msg); }
channel.sendToQueue(queue, Buffer.from(msg)); }, subscribe: async function(func) { const channel = await self._getChannel(queue); channel.consume(queue, async function(msg) { let status = true; try { let data = msg.content.toString(); try { if (data && data[0] === '{') { data = JSON.parse(data); } } catch(err) { console.error(err); }
await func(data); } catch(err) { console.error(err); status = false; } if (status) channel.ack(msg); }); } };
this._operatorMap[queue] = operator; return operator; }
}
|