Um projeto criado em TypeScript para facilitar a criação de consumidores RabbitMQ, com suporte a métricas Prometheus, tracing com Elastic APM e um sistema de plugins para consumidores.

Simplificar a integração com consumo de stream de menssagens via RabbitMQ
-
Métricas: Adicionado métricas de consumo (mensagens processadas, tempo médio, CPU, Memoria, etc.) via prometheus
-
Monitoramento: Suporte para integração com APM do Elastic Search
-
Resiliencia: Adicionado estrategias de retentativas e DLQ
-
Long Process: Mecanismos de fallback para operações longas
-
Plugin/Play: Adicionado suporte para criação de plugins para uso em geral
-
ManagerConsumer: Controle de Skip, Stop, Retryable consumo de mensagens
-
Logger: Suporte de de incoporação do logger de sua escolha
- Healcheck para consumidores ativos
- Exemplos de
idempotency key
com redis
⚡ Pipeline inteligente de consumo RabbitMQ
mensagens fluem em um ciclo estruturado de pré-processamento, processamento e pós-processamento — com controle total do contexto, resiliência, garantindo consistência e extensibilidade de ponta a ponta.

Esta lib utiliza Prometheus para coletar, armazenar e alertar sobre métricas em tempo real, garantindo alta disponibilidade e performance.
🔍 Métricas Coletadas:
-
Infraestrutura: CPU, memória, uso de disco, tráfego de rede.
-
Consumidor RabbitMQ: Total de mensagens processadas, Tempo de processamento de mensagens, Mensagens atualmente sendo processadas, Total de novas tentativas de processamento, Total de mensagens enviadas para DLQ

🚀 Application Performance Monitoring (APM)
Este lib utiliza APM para monitorar desempenho, rastrear transações distribuídas e identificar gargalos em tempo real.
🔍 Métricas Coletadas:
- Erros corridos em tempo de execução
- Latencia media do consumidor
- Throughput por consumo de mensagens
- CPU e Memoria utilizada

import { MuncherConsumerBase } from 'muncher'
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}]
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});
import { MuncherConsumerBase } from 'muncher'
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async preProcess(ctx: IMessageContext<Example>): Promise<void> {
ctx.metadata.new = "New Value in ctx" // Data shared in context
}
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx.metadata.new)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}]
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});
import { MuncherConsumerBase } from 'muncher'
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
// In Class Domain
protected async onErrorProcess(ctx: IMessageContext<Example>, error: unknown): Promise<void> {
console.log(ctx, error)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}]
})
// Out Class Domain
consumer.onError((err) => {
console.log(err)
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});
import { MuncherConsumerBase } from 'muncher'
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}],
apm: {
serverUrl: 'http://localhost:8200'
}
})
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});
import { MuncherConsumerBase } from 'muncher'
import http, { Server } from 'http';
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}]
})
const server = http.createServer(async (req, res) => {
if (req.url === '/metrics') {
const { metrics, contentType } = await consumer.getMetrics();
res.writeHead(200, { 'Content-Type': contentType });
res.end(metrics);
} else {
res.writeHead(404);
res.end();
}
});
await consumer.start()
server.listen(9091, () => {
console.log('🚀 Metrics server rodando em http://localhost:9091/metrics');
})
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});
import { MuncherConsumerBase, SkipMessageError } from 'muncher'
class PluginValidade extends PluginBase<unknown> {
private schema: AnySchema
constructor(schema: AnySchema) {
super('PluginValidade');
this.schema = schema
}
async preProcess(ctx: IMessageContext<unknown>): Promise<void> {
const validate = this.schema.validate(ctx.)
if (validate.error) {
throw new SkipMessageError(this.name, validate.error.details)
}
}
async initialize(consumer: IConsumer): Promise<void> {
consumer.onError((err) => {
console.log("plugin", err)
})
}
}
class ExampleConsumer extends MuncherConsumerBase<Example> {
protected async processMessage(ctx: IMessageContext<Example>): Promise<void> {
console.log(ctx)
}
}
const consumer = new ExampleConsumer({
connection: {
username: 'admin',
password: 'admin'
},
queues: [{
topic: 'Example.Queue'
}],
plugins: [
new PluginValidade(joi.object({
name: joi.string()
}))
]
})
consumer.withPlugin(new PluginValidade(joi.object({
name: joi.string()
})))
await consumer.start()
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});
Parâmetro |
Descrição |
Example |
StopProcessingError |
Consumo da menssagem e parado é marcado como erro |
new StopProcessingError('msg', {...params}) |
SkipMessageError |
Consumo da menssagem e finalizado é marcado como sucesso |
new SkipMessageError('msg', {...params}) |
RetryableError |
Consumo da menssagem e marcado para reprocessamento |
new RetryableError('msg', {...params}) |
Parâmetro |
Tipo |
Obrigatório |
Descrição |
connection |
object |
✅ |
Conexão com RabbitMQ Valores
|
exchanges |
Array |
❌ |
Configuração de consumo Via Exchange Valores
|
queues |
Array |
❌ |
Configuração de consumo Via Queue Valores
|
binds |
Array |
❌ |
Configuração de consumo Via Binding Valores
|
plugins |
Array |
❌ |
Configuração de Plugins |
serviceName |
string |
❌ |
Nome do serviço |
prefetch |
string |
❌ |
Quantidade mensagens um consumidor pode processar ao mesmo tempo |
logger |
number |
❌ |
Logger customizado Default value: console
|
DLQ |
object |
❌ |
Configuração de DLQ Valores
|
apm |
object |
❌ |
Configuração do apm Valores
|
retryPolicy |
object |
❌ |
Configuração de retryPolicy Valores
|
Parâmetro |
Descrição |
Exemplo |
protocol |
O protocolo a ser utilizado |
Default value: 'amqp' |
hostname |
Nome do host usado para conexão ao servidor. |
Default value: 'localhost' |
port |
Port used for connecting to the server. |
Default value: 5672 |
username |
Nome de usuário usado para autenticação no servidor. |
Default value: 'guest' |
password |
Senha usada para autenticação no servidor. |
Default value: 'guest' |
locale |
O local desejado para mensagens de erro. O RabbitMQ usa apenas en_US |
Default value: 'en_US' |
frameMax |
O tamanho em bytes do quadro máximo permitido na conexão. 0 significa sem limite (mas como os quadros têm um campo de tamanho que é um inteiro sem sinal de 32 bits, ele é forçosamente 2^32 - 1). |
Default value: 0x1000 (4kb) |
heartbeat |
O período do heartbeat da conexão em segundos. |
Default value: 0 |
vhost |
VHost usado. |
Default value: '/' |
Parâmetro |
Tipo |
Obrigatório |
Descrição |
routingKey |
string |
✅ |
Nome da fila que receberá a mensagem. |
exchange |
string |
❌ |
Nome da exchange que envia a mensagem. |
Importante!: Caso não seja passado uma exchange
o envio de dados ao DLQ
será feito via sendToQueue
Cenário |
Use sendToQueue
|
Use publish
|
Enviar direto pra uma fila específica? |
✅ |
⚠️ |
Quer usar Exchange + RoutingKey? |
⚠️ |
✅ |
Precisa de controle com DLX, fanout, topic routing? |
❌ |
✅ |
Protótipo simples/teste? |
✅ |
⚠️ |
Parâmetro |
Tipo |
Obrigatório |
Descrição |
topic |
string |
✅ |
Nome da fila que receberá a mensagem. |
options |
object |
❌ |
Opções de Assert Valores
|
optionsConsumer |
object |
❌ |
Opções do consumidor Valores
|
Parâmetro |
Tipo |
Obrigatório |
Descrição |
topic |
string |
✅ |
Nome da fila que receberá a mensagem. |
type |
string |
✅ |
Tipo de roteamento da exchange. Valores
|
options |
object |
❌ |
Opções de Assert Valores
|
optionsConsumer |
object |
❌ |
Opções do consumidor Valores
|
Parâmetro |
Tipo |
Obrigatório |
Descrição |
queue |
string |
✅ |
Nome da fila que receberá a mensagem. |
exchange |
string |
✅ |
Nome da exchange que envia a mensagem. |
routingKey |
string |
✅ |
Chave usada pra rotear a mensagem. Pode ser '' , # , *.log , etc. |
args |
any |
❌ |
Argumentos adicionais (raramente usados; para casos avançados). |
Tipo |
Comportamento |
direct |
Roteamento exato via routingKey . |
topic |
Roteamento por padrões (*.log , #.error ). |
fanout |
Ignora routingKey , envia para todas as filas ligadas. |
headers |
Roteia por headers, não por routingKey . Mais raro e avançado. |
Parâmetro |
Tipo |
Descrição |
durable |
boolean |
Se true , a fila sobrevive a reinicializações do servidor. |
exclusive |
boolean |
Se true , a fila é acessível apenas por esta conexão e será deletada ao desconectar. |
autoDelete |
boolean |
Se true , a fila será removida automaticamente quando não tiver mais consumidores. |
messageTtl |
number |
Tempo (ms) que a mensagem pode ficar na fila antes de ser descartada. |
expires |
number |
Tempo (ms) que a fila pode ficar inativa antes de ser deletada. |
maxLength |
number |
Máximo de mensagens que a fila pode conter. |
maxPriority |
number |
Define níveis de prioridade de mensagens (quando usado). |
Parâmetro |
Tipo |
Descrição |
durable |
boolean |
Se true , a exchange sobrevive a reinicializações do servidor. |
internal |
boolean |
Se true , a exchange não pode ser publicada diretamente, apenas usada por outras exchanges. |
autoDelete |
boolean |
Se true , a exchange é deletada quando não houver mais filas vinculadas. |
arguments |
any |
Usado para configurações adicionais específicas de tipo de exchange. |
Parâmetro |
Tipo |
Descrição |
consumerTag |
string |
Identificador único do consumidor. Útil para cancelar ou rastrear consumo. |
noAck |
boolean |
Se true , não exige ack manual. A mensagem é considerada entregue assim que recebida. ⚠️ Use com cuidado. |
exclusive |
boolean |
Se true , somente esta conexão pode consumir da fila. |
priority |
number |
Prioridade do consumidor. Maior valor consome primeiro (requer suporte da fila). |
arguments |
any |
Argumentos adicionais (ex: filtros, extensões customizadas). |
Parâmetro |
Tipo |
Descrição |
maxRetries |
number |
Número de retentativas |
backoff |
number |
Tempo de espera entre tentativas |
Sinta-se à vontade para abrir issues ou enviar pull requests. Sugestões são sempre bem-vindas!
Desenvolvido por: Ismael Alves 🤓🤓🤓