Maple's Blog.

rabbtmq 笔记 & demo

字数统计: 1.7k阅读时长: 8 min
2023/04/11

基本

首先熟悉几个概念:

  • exchange
    • 交换器,可以认为是消息 publish 的目的地。
    • 所有消息都是推送到 exchange 的
  • queue
    • 队列,consume 监听的位置
    • queue 其实和 publish 没有任何关系
    • 只是为了方便多个消费者消费同一个消息
  • 流程
    1. 创建 exchange,也可以复用已存在的 exchange。
      • 如果 exchange 已存在则不创建
      • 如果已存在的 exchange 的属性与代码配置的不符合则报错
    2. consumer 创建 queue
      • 如果已经存在则改为获取 queue,参数以存在的为准,如果链接的 queue 参数不符合就报错
    3. consummer 绑定 queue 和 exchange
      • 这一步以前我理解为 publisher 也需要,其实 publisher 根本不需要 queue,更不需要绑定 queue
    4. publisher 推送消息到 exchange
      • 参数包括
        • 消息本体
        • routingKey (除了 fanout 模式和 header 模式)
      • publisher 不需要 queue!
    5. exchange 根据规则把对应的消息放入 queue
    6. consumer 获取 queue 的消息并消费
  • fanout
    • 任何到达 exchange 的消息都会被放到绑定的 queue 去
    • 可以使用独占模式
  • direct
    • 路由模式的一种。
    • consume 和 publish 都必须传入 routingKey (路由 key)
    • consume 可以绑定多个 routingKey
  • topic
    • 路由模式的一种,direct 的扩展
    • routingKey 支持 * 和 # 匹配
      • *代表匹配任意一个关键词,#代表匹配一个或多个关键词
      • 关键字之间通过 . 连接
  • header
    • 据说这玩意儿不行就懒得研究了,本质也是使用 header 替换 routingKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
/**
* @Author: maple
* @Date: 2022-05-26 11:14:52
* @LastEditors: maple
* @LastEditTime: 2023-04-10 14:24:48
*/
import { Options, connect, Message, Connection, Channel } from 'amqplib'

type MessageRaw = string|object|Buffer;

export enum QueueType {
direct = 'direct',
topic = 'topic',
headers = 'headers',
fanout = 'fanout',
match = 'match'
}

type CreateChannelOptions = {
queue?: string;
type?: QueueType;
exchange?: string,
options?: CreateChannelOptionsExtra;
publisher: boolean
}

type CreateChannelOptionsExtra = {
routingKey?: string|string[];
exchangeOptions?: ExchangeOptions,
queueOptions?: QueueOptions
}

type QueueOptions = {
durable?: boolean; // 是否持久化, 默认 false
exclusive?: boolean; // 是否独占, 默认 false
} & Options.AssertQueue

type ExchangeOptions = {
durable?: boolean; // 是否持久化,默认 false
} & Options.AssertExchange;

type SendToQueueOptions = {
persistent?: boolean; // 是否持久化;默认 false;
queue: string // 消息队列名
} & Options.Publish;

type PublishOptions = {
routingKey?: string[]|string;
exchange?: string;
} & Options.Publish;

interface Consumer {
(content: string, msg: Message): void;
}

type ConsumeOptions = {
noAck?: boolean;
catchErr?: boolean;
} & Options.Consume;

interface RabbitMQChannelOptions {
type: QueueType;
conn: Connection;
channel: Channel;
queue: string;
exchange?: string;
routingKeys?: string[];
queueOptions?: QueueOptions;
exchangeOptions?: ExchangeOptions;

prefetch?: number;
}

export class RabbitMQChannel {
private readonly type: QueueType;
private readonly conn: Connection;
private readonly channel: Channel;
private readonly queue: string;
private readonly exchange?: string;
private readonly routingKeys?: string[];
private queueOptions?: QueueOptions;
private exchangeOptions?: ExchangeOptions;
private readonly prefetchCount?: number;
constructor(options: RabbitMQChannelOptions) {

this.type = options.type;
this.conn = options.conn;
this.channel = options.channel;
this.queue = options.queue;
this.exchange = options.exchange;
this.queueOptions = options.queueOptions;
this.exchangeOptions = options.exchangeOptions;
this.routingKeys = options.routingKeys;
this.prefetchCount = options.prefetch;

if (!this.type || !this.conn || !this.channel) {
throw new Error('illegal params');
}

/**
* 设置 prefetch 参数,避免大量消息堆集
* prefetch允许为每个consumer指定最大的 unacked messages数目。
* 简单来说就是用来指定一个consumer一次可以从Rabbit中获取多少条message并缓存在client中(RabbitMQ提供的各种语言的client library)。
* 一旦缓冲区满了,Rabbit将会停止投递新的message到该consumer中直到它发出ack。
* 链接:https://www.jianshu.com/p/4d043d3045ca
*/
if (typeof this.prefetchCount === 'number' && this.prefetchCount > 0) {
this.prefetch(this.prefetchCount)
.catch(err => {
console.error(err)
});
}
}

/**
* 将任意类型的参数转换为 buffer
* @returns {object} buffer
* @param msg
*/
_handleMsgToBuffer (msg: MessageRaw): Buffer {
let buffer: Buffer;

if (typeof msg === 'object' && !Buffer.isBuffer(msg)) {
const hasToString = msg.toString && msg.toString !== Object.prototype.toString && !Array.isArray(msg);
if (hasToString) {
msg = msg.toString();
} else {
msg = JSON.stringify(msg);
}
}

if (!Buffer.isBuffer(msg)) {
buffer = Buffer.from(msg || '');
} else {
buffer = msg;
}
return buffer;
}

/**
* send to queue
*
* 发送数据到队列
* @param {object} msg 数据
* @param {Object} sendOptions send to queue 参数
*/
sendToQueue (msg: MessageRaw, sendOptions?: SendToQueueOptions) {
const queue = sendOptions?.queue;
const endQueue = queue || this.queue || "";

this.channel.sendToQueue(endQueue, this._handleMsgToBuffer(msg), sendOptions);
}

/**
* sendToQueue 的复杂版本,可以传递 routingKey 和 exchange
* routingKey 和 exchange 最后不能为空
* @param {*} msg 消息
* @param {} publishOptions 推送消息参数
*/
publish (msg: MessageRaw, publishOptions?: PublishOptions) {
let routingKey = publishOptions?.routingKey;
let exchange = publishOptions?.exchange;

let routingKeys: string[]|undefined = this.routingKeys;

if (routingKey) {
routingKeys = Array.isArray(routingKey) ? routingKeys : [routingKey];
}

if (routingKeys == undefined || !routingKeys.length || routingKeys[0] === undefined) {
routingKeys = [""];
}

exchange = exchange || this.exchange;

if (!exchange) {
throw new Error('需要额外指定 exchange');
}

for(const routingKey of routingKeys) {
const success = this.channel.publish(exchange, routingKey, this._handleMsgToBuffer(msg), publishOptions);
}
}

async consume (consumer: Consumer, consumeOptions?: ConsumeOptions) {

const noAck = consumeOptions?.noAck;
const catchErr = consumeOptions?.catchErr || false;

await this.channel.consume(this.queue, async (msg) => {
if (!msg) {
return;
}

try {
// buffer msg
const msgContent = msg.content.toString();

// consumer handle
await consumer(msgContent, msg);
} catch (err: any) {
if (catchErr) {
throw err;
}
}


// 自动 ack
if (!noAck) {
// ack msg
this.channel.ack(msg);
}
}, consumeOptions);
}

async ackAll () {
// await this._handleAssertChannel();
await this.channel.ackAll();
}

async nack (message: Message, allUpto: boolean, requeue: boolean) {
// await this._handleAssertChannel();
this.channel.nack(message, allUpto, requeue);
}

async nackAll (requeue: boolean) {
// await this._handleAssertChannel();
this.channel.nackAll(requeue);
}

async prefetch (number: number) {
if ( number >= 1) {
await this.channel.prefetch(number);
}
}

getChannel(): Channel {
return this.channel;
}
}

export class RabbitMQ {
private link: Options.Connect | string | undefined;
private conn: Connection | undefined;

constructor (link: Options.Connect|String) {
this.setConnect(link);
}

setConnect(link: Options.Connect|String) {
if (!link) {
link = 'amqp://localhost'
}
const defaultLinkOptions =
{
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/'
};

this.link = typeof link === 'string'
? link
: Object.assign(defaultLinkOptions, link || {});
}

/**
* 连接并返回 connect 对象
* @param {string|object} link amqp 连接参数
* @returns connect
*/
async connet (link?: Options.Connect|string): Promise<Connection> {
const realLink = link || this.link;

if (!realLink) {
throw new Error('Link can be null');
}

this.conn = await connect(realLink);
return this.conn;
}

// fanout 绑定 queue 广播 (不需要绑定 RoutingKey 和 BindingKey)
// direct 绑定 queue 发送 (绑定 RoutingKey 和 BindingKey) (如果交换器只有一个队列,则 routingKey 和 BindingKey 只有一个)
// -- BindingKey(queue) 用于队列和交换器绑定
// -- RoutingKey 如果不绑定,发送给所有队列,绑定了只发给符合的队列

// headers
// match
// topic 绑定 queue 和 topic

// 'direct' | 'topic' | 'headers' | 'fanout' | 'match'
async createChannel (createChannelOptions?: CreateChannelOptions): Promise<RabbitMQChannel> {
let queue = createChannelOptions?.queue; // 队列名称
const type = createChannelOptions?.type || QueueType.direct; // exchange 类型
const exchange = createChannelOptions?.exchange; // 不传就是默认的 exchange
const options = createChannelOptions?.options;



let queueOptions = options?.queueOptions || {};
const exchangeOptions = options?.exchangeOptions;
const routingKey = options?.routingKey;

// 独占模式
if (!queue || queue.trim() === '') {
queue = '';
queueOptions.exclusive = true;
}

// 判断是否为 publisher,如果是 publisher 就必须传入 routingKey
// direct, topic, match
if (createChannelOptions?.publisher === false) {
// consumer
if (routingKey == undefined && (
QueueType.direct === type ||
QueueType.topic === type ||
QueueType.match === type)) {
throw new Error("direct 必传 routingKey");
}
}


// routingKey 可能是列表
const routingKeys = typeof routingKey === 'string' ? [routingKey] : routingKey || [];

if (!this.conn) {
throw new Error('conn should connect');
}

// 创建 channel
const channel = await this.conn.createChannel();

if (exchange) {
// 申明 exchange
await channel.assertExchange(exchange, type, exchangeOptions);
}

if (!createChannelOptions?.publisher) {
const bindExchange = exchange || '';

// 申明 queue
const q = await channel.assertQueue(queue, queueOptions);
if (QueueType.fanout === type) {
await channel.bindQueue(q.queue, bindExchange, '');
} else {
for(const routingKey of routingKeys) {
// 绑定 queue / exchange / routingKey
await channel.bindQueue(q.queue, bindExchange, routingKey);
}
}
}

// 返回 RabbitMQChannel 参数
return new RabbitMQChannel({
type,
conn: this.conn,
channel,
queue,
exchange,
routingKeys: routingKeys
});
}

async close () {
if (this.conn) {
await this.conn.close();
}
}
}

CATALOG
  1. 1. 基本