Skip to content

Commit d484019

Browse files
authored
🐇 Publish/Subscribe (#2)
🐇 Publish/Subscribe
2 parents d35af53 + 0a1ebcc commit d484019

File tree

4 files changed

+130
-3
lines changed

4 files changed

+130
-3
lines changed

README.md

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ O intuito desse repositório é testar e aprender a usar o sistema de mensageria
44
Cada etapa (exceto a primeira) será separada por Pull Requests para que possa ser mais fácil de identificar as etapas da versão final.
55

66
## ([#0](https://github.com/lucasgianine/message-queuing/commit/c86a7cd0750668b64d3e57371d61874107304e26)) Hello world!
7-
O conceito dessa etapa é apresentar o básico da mensageria, teremos um <i>Producer</i> que irá enviar uma mensagem, a <i>queue</i>, ou <i>fila</i> que irá fazer o processo onde tranformará a mensagem em um buffer para que, finalmente, seja entregue ao <i>Consumer</i>, que imprimirá a mensagem.
7+
O conceito dessa etapa é apresentar o básico da mensageria, teremos um <i>Producer</i> que irá enviar uma mensagem, a <i>queue</i>, ou <i>fila</i> que irá fazer o processo onde armazenará a mensagem em um buffer para que, finalmente, seja entregue ao <i>Consumer</i>, que imprimirá a mensagem.
88

99
```mermaid
1010
flowchart LR
@@ -31,7 +31,7 @@ Utilize esses comandos para teste:
3131
# -> [x] Sent: Hello World!
3232
```
3333

34-
## (#1) Work Queues
34+
## ([#1](https://github.com/lucasgianine/message-queuing/pull/1)) Work Queues
3535
Vamos trabalhar em criar Work Queues (ou Task Queues) para distribuir tarefas demoradas entre vários workers, ou seja, quando uma tarefa exije muitos recursos, todo fluxo espera que ela seja concluída para que a mensagem seja exibida, a ideia do Work Queues é que agendemos a tarefa para que ela seja feita mais tarde.
3636

3737
```mermaid
@@ -105,3 +105,71 @@ Pra corrigir esse feito, usamos `prefetch` com o valor `1` para que o Rabbit ent
105105
```typescript
106106
channel.prefetch(1)
107107
```
108+
109+
## ([#2](https://github.com/lucasgianine/message-queuing/pull/2)) Publish/Subscribe
110+
Dessa vez iremos entregar uma mensagem para vários consumidores, criaremos um registro simples com dois programas, onde um emitirá mensagens de registro e outro que vai receber e imprimir, no nosso programa, cada cópia em execução do receptor receberá as mensagens, onde o receptor poderá se comunicar com os dois queues ao mesmo tempo.
111+
112+
Toda ideia do Rabbit é que, na verdade o <i>producer</i> nunca envie mensagem diretamente para fila (pois na realidade é que o <i>producer</i> nem sabe se a mensagem chegará até lá), mas ao invés disso ele envie mensagens para uma `exchange`, pois ela sabe exatamente o que fazer com a mensagem que recebeu para empurrá-lá para uma <i>queue</i>.
113+
114+
```mermaid
115+
flowchart LR
116+
P["Producer"]
117+
X{"Exchange"}
118+
Q1["Queue 1"]
119+
Q2["Queue 2"]
120+
121+
P --> X --> Q1
122+
X --> Q2
123+
```
124+
125+
Há alguns tipos de exchanges, mas vamos trabalhar em cima do `fanout`: Ela transmite todas as mensagens que recebe para todas as filas que ela tem conhecimento.
126+
```typescript
127+
channel.assertExchange('logs', 'fanout', { durable: false })
128+
```
129+
130+
#### Filas temporárias
131+
Dar o nome para uma fila é importante para compartilharmos ela entre os <i>produces</i> e <i>consumers</i>, mas no caso dessa aplicação de logs, não precisamos criar uma fila permanente, deixaremos o nome da fila vazio para que o próprio servidor possa dar um nome aleatório, já que nesse momento isso não é prioritário visto que a fila, depois de ser consumida, deverá ser apagada automaticamente.
132+
```typescript
133+
channel.assertQueue('', {
134+
exclusive: true
135+
})
136+
137+
// Exemplo de retorno: amq.gen-JzTY20BRgKO-HjmUJj0wLg
138+
```
139+
140+
Depois de todo o processo de criar <i>exchange</i> e as filas temporárias agora vamos fazer nossa <i>exchange</i> enviar mensagem para a <i>fila</i>.
141+
```mermaid
142+
flowchart LR
143+
P["Producer"]
144+
X{"Exchange"}
145+
Q1["Queue 1"]
146+
Q2["Queue 2"]
147+
148+
P --> X -- Binding --> Q1
149+
X -- Binding --> Q2
150+
```
151+
152+
Chamamos de `binding` o relacionamento entre exchange (troca) e uma queue (fila).
153+
```typescript
154+
channel.bindQueue(queue_name, 'logs', '')
155+
```
156+
157+
Utilize esses comandos para teste:
158+
```bash
159+
# shell 1
160+
npm run receive_logs
161+
162+
# -> Será criado um arquivo .log na pasta src/logs
163+
# -> No arquivo aparecerá a <mensagem> escrita no próximo shell
164+
```
165+
166+
```bash
167+
# shell 2
168+
npm run emit_logs <mensagem>
169+
170+
# -> [x] Sent: <mensagem>
171+
```
172+
173+
## Referência
174+
- [RabbitMQ](https://www.rabbitmq.com/)
175+
- [Documentação do RabbitMQ](https://www.rabbitmq.com/tutorials)

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
"scripts": {
77
"test": "echo \"Error: no test specified\" && exit 1",
88
"producer": "node src/new_task.js",
9-
"consumer": "node src/worker.js"
9+
"consumer": "node src/worker.js",
10+
"emit_logs": "node src/logs/emit_logs.js",
11+
"receive_logs": "node src/logs/receive_logs.js > src/logs/logs_emitidas.log"
1012
},
1113
"keywords": [],
1214
"author": "",

src/logs/emit_logs.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
const amqp = require('amqplib/callback_api')
2+
3+
amqp.connect('amqp://localhost', (err, conn) => {
4+
if (err) throw err
5+
6+
conn.createChannel((err, channel) => {
7+
if (err) throw err
8+
9+
const exchange = 'logs'
10+
const message = process.argv.slice(2).join(' ') ?? 'Hello World!'
11+
12+
channel.assertExchange(exchange, 'fanout', {
13+
durable: false
14+
})
15+
16+
/*
17+
A string vazia como segundo parâmetro significa que não queremos
18+
enviar a mensagem para nenhuma fila específica.
19+
Queremos apenas publicá-la em nossa troca de 'logs'.
20+
*/
21+
channel.publish(exchange, '', Buffer.from(message))
22+
23+
console.log(` [x] Sent: ${message}`)
24+
})
25+
26+
setTimeout(() => {
27+
conn.close()
28+
process.exit(0)
29+
}, 500)
30+
})

src/logs/receive_logs.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
const amqp = require('amqplib/callback_api')
2+
3+
amqp.connect('amqp://localhost', (err, conn) => {
4+
if (err) throw err
5+
6+
conn.createChannel((err, channel) => {
7+
if (err) throw err
8+
9+
const exchange = 'logs'
10+
11+
channel.assertExchange(exchange, 'fanout', {
12+
durable: false
13+
})
14+
15+
channel.assertQueue('', {
16+
exclusive: true
17+
}, (err, q) => {
18+
if (err) throw err
19+
20+
channel.bindQueue(q.queue, exchange, '')
21+
22+
channel.consume(q.queue, (message) => {
23+
if (message.content) console.log(` [x] ${message.content.toString()}`)
24+
}, { noAck: true })
25+
})
26+
})
27+
})

0 commit comments

Comments
 (0)