RabbitMQ Adapter
RabbitMQ Adapter
The RabbitMQ adapter provides reliable message queue communication using AMQP protocol with support for topic exchanges, durable messaging, and load balancing.
Installation
First, install the required dependencies:
# npmnpm install amqplib
# yarnyarn add amqplib
# pnpmpnpm add amqplibBasic Usage
Producer
import { RabbitMQIO, RPCChannel } from "kkrpc"import { apiMethods, type API } from "./api"
const rabbitmqIO = new RabbitMQIO({ url: "amqp://localhost", exchange: "kkrpc-exchange", exchangeType: "topic", durable: true})
const producerRPC = new RPCChannel<API, API>(rabbitmqIO, { expose: apiMethods})
const api = producerRPC.getAPI()
// Test basic RPC callsconsole.log(await api.add(5, 3)) // 8console.log(await api.echo("Hello from RabbitMQ!")) // "Hello from RabbitMQ!"
rabbitmqIO.destroy()Consumer
import { RabbitMQIO, RPCChannel } from "kkrpc"import { apiMethods, type API } from "./api"
const rabbitmqIO = new RabbitMQIO({ url: "amqp://localhost", exchange: "kkrpc-exchange", exchangeType: "topic", durable: true, sessionId: "consumer-session"})
const consumerRPC = new RPCChannel<API, API>(rabbitmqIO, { expose: apiMethods})
const api = consumerRPC.getAPI()
// Process messages from producerconsole.log(await api.add(10, 20)) // 30console.log(await api.echo("Hello from consumer!")) // "Hello from consumer!"
rabbitmqIO.destroy()Configuration Options
interface RabbitMQOptions { url?: string // AMQP broker URL (default: "amqp://localhost") exchange?: string // Exchange name (default: "kkrpc-exchange") exchangeType?: "topic" | "direct" | "fanout" // Exchange type (default: "topic") durable?: boolean // Durable exchange and queues (default: true) sessionId?: string // Unique session identifier routingKeyPrefix?: string // Routing key prefix (default: "kkrpc")}Features
Topic Exchange Routing
The RabbitMQ adapter uses a topic exchange with routing keys to separate kkrpc traffic from other consumers:
const rabbitmqIO = new RabbitMQIO({ exchange: "my-exchange", exchangeType: "topic", // Use topic exchange for flexible routing routingKeyPrefix: "myapp.rpc" // Custom routing key prefix})
// Get routing informationconst routingKeys = rabbitmqIO.getRoutingKeys()console.log(routingKeys) // { inbound: "myapp.rpc.messages", outbound: "myapp.rpc.messages" }Durable Messaging
Configure durable exchanges and queues to survive broker restarts:
const rabbitmqIO = new RabbitMQIO({ durable: true, // Messages survive broker restarts exchange: "durable-exchange"})Session Management
Each adapter instance gets a unique session ID to prevent message conflicts:
const rabbitmqIO = new RabbitMQIO({ sessionId: "my-unique-session" // Optional custom session ID})
console.log(rabbitmqIO.getSessionId()) // Get current session IDconsole.log(rabbitmqIO.getExchange()) // Get exchange nameAdvanced Usage
Custom Exchange Configuration
const rabbitmqIO = new RabbitMQIO({ url: "amqp://guest:guest@localhost:5672", exchange: "custom-exchange", exchangeType: "direct", // Direct exchange for point-to-point durable: false, // Non-durable for temporary queues routingKeyPrefix: "custom.rpc"})Multiple Consumers
// Consumer 1const consumer1 = new RabbitMQIO({ sessionId: "consumer-1", exchange: "load-balanced-exchange"})
// Consumer 2const consumer2 = new RabbitMQIO({ sessionId: "consumer-2", exchange: "load-balanced-exchange"})
// Both consumers will receive all messages (broadcast pattern)Error Handling
const rabbitmqIO = new RabbitMQIO({ url: "amqp://localhost"})
try { const api = rabbitmqRPC.getAPI() await api.someMethod()} catch (error) { if (error.message.includes("RabbitMQ adapter has been destroyed")) { console.log("Adapter was destroyed") } else if (error.message.includes("Failed to create RabbitMQ channel")) { console.log("Connection failed - check RabbitMQ server") }}Connection Management
const rabbitmqIO = new RabbitMQIO()
// Graceful cleanuprabbitmqIO.destroy()
// Signal destroy to remote partiesawait rabbitmqIO.signalDestroy()Best Practices
- Use unique session IDs when running multiple instances
- Enable durable messaging for production systems
- Monitor connection health and implement reconnection logic
- Use appropriate exchange types for your use case:
topic: Flexible routing with wildcardsdirect: Point-to-point communicationfanout: Broadcast to all queues
- Clean up resources with
destroy()when shutting down
Dependencies
amqplib: AMQP client library for RabbitMQ- RabbitMQ server running on accessible host