NATS Adapter
NATS Adapter
Section titled “NATS Adapter”The NATS adapter provides high-performance messaging using the NATS messaging system with publish/subscribe patterns and optional queue groups for load balancing.
Installation
Section titled “Installation”First, install the required dependencies. The NATS client is already included as a dependency of kkrpc:
# npmnpm install kkrpc
# yarnyarn add kkrpc
# pnpmpnpm add kkrpcBasic Usage
Section titled “Basic Usage”Publisher
Section titled “Publisher”import { NatsIO, RPCChannel } from "kkrpc"import { apiMethods, type API } from "./api"
const natsIO = new NatsIO({ servers: "nats://localhost:4222", subject: "kkrpc-messages"})
const publisherRPC = new RPCChannel<API, API>(natsIO, { expose: apiMethods})
const api = publisherRPC.getAPI()
// Test basic RPC callsconsole.log(await api.add(5, 3)) // 8console.log(await api.echo("Hello from NATS!")) // "Hello from NATS!"
console.log("Subject:", natsIO.getSubject())console.log("Session ID:", natsIO.getSessionId())
natsIO.destroy()Subscriber
Section titled “Subscriber”import { NatsIO, RPCChannel } from "kkrpc"import { apiMethods, type API } from "./api"
const natsIO = new NatsIO({ servers: "nats://localhost:4222", subject: "kkrpc-messages", sessionId: "subscriber-session"})
const subscriberRPC = new RPCChannel<API, API>(natsIO, { expose: apiMethods})
const api = subscriberRPC.getAPI()
// Process messages from publisherconsole.log(await api.add(10, 20)) // 30console.log(await api.echo("Hello from subscriber!")) // "Hello from subscriber!"
natsIO.destroy()Configuration Options
Section titled “Configuration Options”interface NatsIOOptions { servers?: string | string[] // NATS server URLs (default: "nats://localhost:4222") subject?: string // Subject for RPC traffic (default: "kkrpc.messages") queueGroup?: string // Queue group for load balancing (optional) sessionId?: string // Unique session identifier timeout?: number // Connection timeout in ms (default: 10000)}Features
Section titled “Features”Subject-Based Routing
Section titled “Subject-Based Routing”NATS uses subjects for message routing. Subjects are hierarchical strings separated by dots:
const natsIO = new NatsIO({ servers: "nats://localhost:4222", subject: "app.service.rpc" // Hierarchical subject})
// You can use wildcards for subscriptions// "app.>" matches all subjects starting with "app."// "*.service" matches "foo.service" but not "foo.other.service"Queue Groups for Load Balancing
Section titled “Queue Groups for Load Balancing”Queue groups enable load balancing across multiple subscribers:
// All subscribers with the same queue group name// will share messages (only one receives each message)
const subscriber1 = new NatsIO({ servers: "nats://localhost:4222", subject: "tasks", queueGroup: "workers" // Same group name for load balancing})
const subscriber2 = new NatsIO({ servers: "nats://localhost:4222", subject: "tasks", queueGroup: "workers" // Same group name})
// Messages to "tasks" subject will be distributed// to only one of the subscribers in the "workers" groupBroadcasting (No Queue Group)
Section titled “Broadcasting (No Queue Group)”Without a queue group, all subscribers receive all messages:
// Publisher broadcasts to all subscribersconst publisher = new NatsIO({ servers: "nats://localhost:4222", subject: "notifications"})
// All subscribers receive all messagesconst subscriber1 = new NatsIO({ servers: "nats://localhost:4222", subject: "notifications"})
const subscriber2 = new NatsIO({ servers: "nats://localhost:4222", subject: "notifications"})Advanced Usage
Section titled “Advanced Usage”Multiple Servers
Section titled “Multiple Servers”Connect to a NATS cluster with multiple servers:
const natsIO = new NatsIO({ servers: [ "nats://server1:4222", "nats://server2:4222", "nats://server3:4222" ], subject: "kkrpc-messages"})Custom Session Management
Section titled “Custom Session Management”const natsIO = new NatsIO({ servers: "nats://localhost:4222", subject: "custom-rpc", sessionId: "my-unique-session-id" // Custom session ID})
console.log(natsIO.getSessionId()) // "my-unique-session-id"console.log(natsIO.getSubject()) // "custom-rpc"console.log(natsIO.getQueueGroup()) // undefined or queue group nameconsole.log(natsIO.isConnected()) // true/falseConnection Timeout
Section titled “Connection Timeout”const natsIO = new NatsIO({ servers: "nats://localhost:4222", subject: "rpc-messages", timeout: 15000 // 15 second connection timeout})Error Handling
Section titled “Error Handling”const natsIO = new NatsIO({ servers: "nats://localhost:4222", subject: "rpc-messages"})
try { const api = natsRPC.getAPI() await api.someMethod()} catch (error) { if (error.message.includes("NATS adapter has been destroyed")) { console.log("Adapter was destroyed") } else if (error.message.includes("NATS connection error")) { console.log("NATS connection failed - check servers") }}Connection Management
Section titled “Connection Management”const natsIO = new NatsIO()
// Check connection statusconsole.log(natsIO.isConnected()) // true/false
// Get adapter informationconsole.log(natsIO.getSubject()) // Subject nameconsole.log(natsIO.getQueueGroup()) // Queue group (if set)console.log(natsIO.getSessionId()) // Session ID
// Graceful cleanupnatsIO.destroy()
// Signal destroy to remote partiesawait natsIO.signalDestroy()Best Practices
Section titled “Best Practices”-
Subject Naming:
- Use hierarchical subjects like
app.service.operation - Avoid overly generic subjects to prevent conflicts
- Use consistent naming conventions across services
- Use hierarchical subjects like
-
Queue Groups:
- Use queue groups for load balancing
- Omit queue group for broadcast pattern
- All subscribers in a queue group share messages
-
Connection Handling:
- Implement reconnection logic for production
- Set appropriate timeouts
- Monitor connection health
-
Resource Management:
- Always call
destroy()when shutting down - Use
signalDestroy()to notify remote parties - Handle connection errors gracefully
- Always call
-
Cluster Usage:
- Specify multiple servers for redundancy
- NATS will automatically reconnect to available servers
- Consider server proximity for latency optimization
Performance Tips
Section titled “Performance Tips”- Subject Hierarchy: Well-designed subjects minimize wildcard usage
- Queue Groups: Use for horizontal scaling
- Connection Pooling: One connection per service is usually sufficient
- Message Size: Keep messages under 1MB for optimal performance
- Inbox Pattern: Use request/reply for synchronous RPC calls
Dependencies
Section titled “Dependencies”@nats-io/transport-node: NATS client for Node.js@nats-io/transport-deno: NATS client for Deno- NATS server (version 2.0+)
Running NATS Server
Section titled “Running NATS Server”# Using Dockerdocker run -p 4222:4222 -p 8222:8222 nats:latest
# Or install and run locally# https://docs.nats.io/running-nats
# Verify with monitoringcurl http://localhost:8222/healthzComparison with Other Adapters
Section titled “Comparison with Other Adapters”| Feature | NATS | RabbitMQ | Kafka | Redis Streams |
|---|---|---|---|---|
| Latency | Ultra-low | Low | Medium | Low |
| Persistence | Optional | Yes | Yes | Yes |
| Load Balancing | Queue groups | Queues | Consumer groups | Consumer groups |
| Complexity | Low | Medium | High | Medium |
| Schema | None | Optional | Optional | None |
| Clustering | Built-in | Supported | Built-in | Supported |
Production Considerations
Section titled “Production Considerations”Monitoring
Section titled “Monitoring”Monitor key metrics:
- Connection health and reconnection attempts
- Message throughput per subject
- Queue group distribution
- Server cluster status
Scaling
Section titled “Scaling”- Horizontal Scaling: Add more subscribers with queue groups
- Subject Design: Well-designed subjects prevent bottlenecks
- Cluster: Use NATS clustering for high availability
Reliability
Section titled “Reliability”- Use multiple servers for redundancy
- Implement proper error handling and reconnection
- Consider message acknowledgment requirements
- Monitor server health and network connectivity