Back to skills
Rabbitmq Ops
Manage RabbitMQ connections, queues, exchanges, and message routing. Use when working with message queues, pub/sub patterns, or distributed messaging.
4 stars
0 votes
0 copies
0 views
Added 12/19/2025
data-aijavascriptgojavanodedebugginggitapiperformancedocumentation
Works with
cliapi
Install via CLI
$
openskills install pluginagentmarketplace/claude-collective-intelligenceFiles
SKILL.md
---
name: rabbitmq-operations
description: Manage RabbitMQ connections, queues, exchanges, and message routing. Use when working with message queues, pub/sub patterns, or distributed messaging.
---
# RabbitMQ Operations
Comprehensive skill for managing RabbitMQ in multi-agent orchestration systems.
## Quick Start
### Basic Connection
```javascript
import { RabbitMQClient } from './scripts/rabbitmq-client.js';
const client = new RabbitMQClient({
url: 'amqp://localhost:5672'
});
await client.connect();
```
### Send Message to Queue
```javascript
await client.publishTask({
title: "Process data",
description: "Transform CSV to JSON",
priority: "high"
});
```
### Consume Messages
```javascript
await client.consumeTasks('agent.tasks', async (msg, { ack, nack }) => {
console.log('Received:', msg.task);
// Process task
const result = await processTask(msg.task);
// Acknowledge
ack();
});
```
## Core Concepts
### Queues
Point-to-point messaging with load balancing:
```javascript
// Setup durable queue
await client.setupTaskQueue('agent.tasks');
// Multiple consumers share work
// Each message delivered to ONE consumer
```
### Exchanges
Publish-subscribe with routing:
```javascript
// Fanout - broadcast to all
await client.setupBrainstormExchange('agent.brainstorm');
// Topic - selective routing
await client.setupStatusExchange('agent.status');
```
### Message Patterns
**Work Queue** (Load Balancing):
```
Producer → Queue → Consumer 1
→ Consumer 2
→ Consumer 3
Each message to ONE consumer
```
**Pub/Sub** (Broadcasting):
```
Publisher → Exchange → Queue 1 → Consumer 1
→ Queue 2 → Consumer 2
→ Queue 3 → Consumer 3
Each message to ALL consumers
```
**Topic** (Selective):
```
Publisher → Topic Exchange
↓ (routing key: agent.status.connected)
→ Queue (pattern: agent.status.#) → Consumer 1
→ Queue (pattern: agent.*.connected) → Consumer 2
```
## Advanced Operations
### Message Persistence
```javascript
// Durable queue + persistent messages = survive restarts
await channel.assertQueue('agent.tasks', {
durable: true // Queue survives broker restart
});
await channel.sendToQueue('agent.tasks', message, {
persistent: true // Message written to disk
});
```
### Prefetch (QoS)
```javascript
// Fair dispatch - each worker gets 1 task at a time
await channel.prefetch(1);
// Or allow multiple concurrent tasks per worker
await channel.prefetch(5);
```
### Message TTL
```javascript
// Messages expire after 1 hour
await channel.assertQueue('agent.tasks', {
arguments: {
'x-message-ttl': 3600000
}
});
```
### Dead Letter Exchange
```javascript
// Failed messages go to dead letter queue
await channel.assertQueue('agent.tasks', {
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed'
}
});
```
### Priority Queues
```javascript
await channel.assertQueue('agent.tasks', {
arguments: {
'x-max-priority': 10
}
});
// Send with priority
await channel.sendToQueue('agent.tasks', message, {
priority: 8
});
```
## Connection Management
### Auto-Reconnect
```javascript
const client = new RabbitMQClient({
autoReconnect: true,
maxReconnectAttempts: 10
});
client.on('disconnected', () => {
console.log('Connection lost, will retry...');
});
client.on('connected', () => {
console.log('Reconnected successfully!');
});
```
### Heartbeat
```javascript
// Keep connection alive
const client = new RabbitMQClient({
heartbeat: 30 // Send heartbeat every 30s
});
```
### Connection Pooling
```javascript
// For high-throughput scenarios
const pool = new ConnectionPool({
min: 2,
max: 10,
url: 'amqp://localhost:5672'
});
const channel = await pool.acquire();
// Use channel
await pool.release(channel);
```
## Message Acknowledgment
### Manual Ack
```javascript
await client.consumeTasks('queue', async (msg, { ack, nack, reject }) => {
try {
await processMessage(msg);
ack(); // Success
} catch (error) {
if (error.isTransient) {
nack(true); // Requeue for retry
} else {
reject(); // Dead letter
}
}
}, { noAck: false });
```
### Auto-Ack (Risky)
```javascript
// Message lost if processing fails!
await channel.consume('queue', handler, { noAck: true });
```
## Routing Patterns
### Direct Exchange
```javascript
await channel.assertExchange('direct_logs', 'direct');
// Bind with routing key
await channel.bindQueue(queue, 'direct_logs', 'error');
await channel.bindQueue(queue, 'direct_logs', 'warning');
// Publish with routing key
await channel.publish('direct_logs', 'error', message);
```
### Topic Exchange
```javascript
await channel.assertExchange('logs', 'topic');
// Wildcards in binding:
// * matches one word
// # matches zero or more words
await channel.bindQueue(queue, 'logs', 'agent.status.*');
await channel.bindQueue(queue, 'logs', 'agent.#');
// Publish
await channel.publish('logs', 'agent.status.connected', message);
```
### Fanout Exchange
```javascript
await channel.assertExchange('notifications', 'fanout');
// All bound queues receive message
await channel.bindQueue(queue1, 'notifications', '');
await channel.bindQueue(queue2, 'notifications', '');
await channel.publish('notifications', '', message);
// Both queue1 and queue2 receive it
```
## Monitoring and Debugging
### Queue Inspection
```javascript
// Check queue status
const info = await channel.checkQueue('agent.tasks');
console.log('Messages:', info.messageCount);
console.log('Consumers:', info.consumerCount);
```
### Management API
```javascript
// Use RabbitMQ Management HTTP API
const response = await fetch('http://localhost:15672/api/queues', {
headers: {
'Authorization': 'Basic ' + btoa('guest:guest')
}
});
const queues = await response.json();
queues.forEach(q => {
console.log(`${q.name}: ${q.messages} messages`);
});
```
### Consumer Tracking
```javascript
const consumer = await channel.consume('queue', handler);
// Cancel consumer
await channel.cancel(consumer.consumerTag);
```
## Performance Optimization
### Batch Publishing
```javascript
// Publish multiple messages efficiently
const messages = [...];
for (const msg of messages) {
channel.sendToQueue('queue', Buffer.from(JSON.stringify(msg)));
}
// Wait for all to be written
await channel.waitForConfirms();
```
### Publisher Confirms
```javascript
await channel.confirmSelect();
channel.sendToQueue('queue', message);
await channel.waitForConfirms();
// Message definitely received by broker
```
### Consumer Concurrency
```javascript
// Multiple channels for parallel processing
const channels = await Promise.all([
connection.createChannel(),
connection.createChannel(),
connection.createChannel()
]);
channels.forEach((ch, i) => {
ch.consume('queue', handler);
});
```
## Error Handling
### Channel Errors
```javascript
channel.on('error', (err) => {
console.error('Channel error:', err);
// Channel is closed, create new one
});
channel.on('close', () => {
console.log('Channel closed');
});
```
### Connection Errors
```javascript
connection.on('error', (err) => {
console.error('Connection error:', err);
});
connection.on('close', () => {
console.log('Connection closed');
// Implement reconnection logic
});
```
### Message Handling Errors
```javascript
try {
await processMessage(msg);
ack();
} catch (error) {
console.error('Processing error:', error);
// Log error with context
await logError({
messageId: msg.properties.messageId,
error: error.message,
stack: error.stack,
timestamp: Date.now()
});
// Decide: retry or dead letter
if (shouldRetry(error)) {
nack(true); // Requeue
} else {
reject(); // Dead letter
}
}
```
## Best Practices
1. **Use Persistent Messages for Critical Data**
```javascript
sendToQueue(queue, msg, { persistent: true });
```
2. **Set Reasonable Prefetch**
```javascript
await channel.prefetch(1); // Fair dispatch
```
3. **Always Close Connections**
```javascript
process.on('SIGINT', async () => {
await channel.close();
await connection.close();
});
```
4. **Use Confirm Channels for Reliability**
```javascript
await channel.confirmSelect();
await channel.waitForConfirms();
```
5. **Monitor Queue Depths**
```javascript
setInterval(async () => {
const info = await channel.checkQueue('queue');
if (info.messageCount > 100) {
alert('High queue depth!');
}
}, 60000);
```
6. **Implement Dead Letter Queues**
```javascript
// Capture and analyze failed messages
```
7. **Use Message TTL**
```javascript
// Prevent old messages from clogging queues
```
8. **Set Max Length**
```javascript
await channel.assertQueue('queue', {
arguments: { 'x-max-length': 10000 }
});
```
## Common Patterns
### Request-Reply
```javascript
// Request
const correlationId = uuid();
const replyQueue = await channel.assertQueue('', { exclusive: true });
await channel.sendToQueue('rpc_queue', msg, {
correlationId,
replyTo: replyQueue.queue
});
// Wait for reply
await channel.consume(replyQueue.queue, (reply) => {
if (reply.properties.correlationId === correlationId) {
// Process reply
}
});
```
### Work Stealing
```javascript
// Workers can steal work from each other
await channel.prefetch(1);
await channel.consume('queue', async (msg) => {
// Process
ack();
});
```
### Circuit Breaker
```javascript
let failureCount = 0;
const threshold = 5;
await channel.consume('queue', async (msg) => {
try {
await process(msg);
failureCount = 0;
ack();
} catch (error) {
failureCount++;
if (failureCount >= threshold) {
console.error('Circuit open, stopping consumption');
await channel.cancel(consumerTag);
} else {
nack(true);
}
}
});
```
## Resources
- [RabbitMQ Documentation](https://www.rabbitmq.com/documentation.html)
- [amqplib Documentation](https://amqp-node.github.io/amqplib/)
- [RabbitMQ Tutorials](https://www.rabbitmq.com/getstarted.html)
- Plugin scripts: `scripts/rabbitmq-client.js`
Attribution
Comments (0)
No comments yet. Be the first to comment!
