Kafka Adapter
Kafka Adapter
The Kafka adapter provides distributed streaming with high throughput and fault tolerance for large-scale systems using Apache Kafka.
Installation
First, install the required dependencies:
# npmnpm install kafkajs
# yarnyarn add kafkajs
# pnpmpnpm add kafkajsBasic Usage
Producer
import { KafkaIO, RPCChannel } from "kkrpc"import { apiMethods, type API } from "./api"
const kafkaIO = new KafkaIO({ brokers: ["localhost:9092"], topic: "kkrpc-topic", clientId: "kkrpc-producer", numPartitions: 3, replicationFactor: 1, maxQueueSize: 1000})
const producerRPC = new RPCChannel<API, API>(kafkaIO, { expose: apiMethods})
const api = producerRPC.getAPI()
// Test basic RPC callsconsole.log(await api.add(12, 18)) // 30console.log(await api.echo("Hello from Kafka!")) // "Hello from Kafka!"
console.log("Topic:", kafkaIO.getTopic())console.log("Session ID:", kafkaIO.getSessionId())
kafkaIO.destroy()Consumer
import { KafkaIO, RPCChannel } from "kkrpc"import { apiMethods, type API } from "./api"
const kafkaIO = new KafkaIO({ brokers: ["localhost:9092"], topic: "kkrpc-topic", clientId: "kkrpc-consumer", groupId: "kkrpc-consumer-group", fromBeginning: false, // Only read new messages maxQueueSize: 1000})
const consumerRPC = new RPCChannel<API, API>(kafkaIO, { expose: apiMethods})
const api = consumerRPC.getAPI()
// Process messages from Kafkaconsole.log(await api.divide(100, 4)) // 25console.log(await api.echo("Hello from Kafka consumer!")) // "Hello from Kafka consumer!"
console.log("Topic:", kafkaIO.getTopic())console.log("Group ID:", kafkaIO.getGroupId())
kafkaIO.destroy()Configuration Options
interface KafkaAdapterOptions { brokers?: string[] // Kafka broker addresses (default: ["localhost:9092"]) clientId?: string // Client identifier topic?: string // Topic name (default: "kkrpc-topic") groupId?: string // Consumer group ID fromBeginning?: boolean // Read from beginning (default: false) producerConfig?: ProducerConfig // Custom producer configuration consumerConfig?: ConsumerConfig // Custom consumer configuration ssl?: KafkaConfig["ssl"] // SSL configuration sasl?: KafkaConfig["sasl"] // SASL configuration numPartitions?: number // Number of partitions for auto-created topics replicationFactor?: number // Replication factor for auto-created topics maxQueueSize?: number // Max queue size (default: 1000) sessionId?: string // Override session ID retry?: KafkaConfig["retry"] // Retry configuration}Advanced Configuration
Custom Broker Configuration
const kafkaIO = new KafkaIO({ brokers: ["broker1:9092", "broker2:9092", "broker3:9092"], clientId: "my-kkrpc-service", topic: "my-service-rpc", numPartitions: 6, replicationFactor: 3, maxQueueSize: 2000})SSL Configuration
const kafkaIO = new KafkaIO({ brokers: ["secure-broker:9093"], ssl: { rejectUnauthorized: false, ca: [fs.readFileSync("/path/to/ca.crt")], key: fs.readFileSync("/path/to/client.key"), cert: fs.readFileSync("/path/to/client.crt") }, topic: "secure-rpc"})SASL Authentication
const kafkaIO = new KafkaIO({ brokers: ["sasl-broker:9092"], sasl: { mechanism: "plain", username: "my-user", password: "my-password" }, topic: "authenticated-rpc"})Custom Producer/Consumer Configuration
const kafkaIO = new KafkaIO({ brokers: ["localhost:9092"], topic: "custom-rpc", producerConfig: { maxBatchSize: 100, lingerMs: 10, compression: "gzip" }, consumerConfig: { sessionTimeoutMs: 30000, heartbeatIntervalMs: 3000, maxWaitTimeInMs: 5000 }})Consumer Groups
Load Balancing with Consumer Groups
// Producerconst producer = new KafkaIO({ topic: "load-balanced-topic", numPartitions: 4})
// Multiple consumers for load balancingconst consumer1 = new KafkaIO({ topic: "load-balanced-topic", groupId: "processor-group", clientId: "worker-1"})
const consumer2 = new KafkaIO({ topic: "load-balanced-topic", groupId: "processor-group", clientId: "worker-2"})
// Messages distributed across consumers in the groupReading from Beginning
const consumer = new KafkaIO({ topic: "historical-topic", groupId: "history-reader", fromBeginning: true // Read all messages from the beginning})Topic Management
The adapter automatically creates topics if they don’t exist:
const kafkaIO = new KafkaIO({ topic: "auto-created-topic", numPartitions: 3, // Number of partitions replicationFactor: 2 // Replication factor})
// Topic will be created with specified settingsError Handling
const kafkaIO = new KafkaIO()
try { const api = kafkaRPC.getAPI() await api.someMethod()} catch (error) { if (error.message.includes("Kafka adapter has been destroyed")) { console.log("Adapter was destroyed") } else if (error.message.includes("Kafka connection error")) { console.log("Kafka connection failed - check brokers") }}Connection Management
const kafkaIO = new KafkaIO()
// Get adapter informationconsole.log(kafkaIO.getTopic()) // Topic nameconsole.log(kafkaIO.getGroupId()) // Consumer group IDconsole.log(kafkaIO.getSessionId()) // Session ID
// Graceful cleanupkafkaIO.destroy()
// Signal destroy to remote partiesawait kafkaIO.signalDestroy()Memory Protection
Prevent memory issues with queue size limits:
const kafkaIO = new KafkaIO({ maxQueueSize: 1000 // Maximum messages in memory})
// When queue is full, oldest messages are dropped with warning// KafkaIO queue full (1000 messages), dropping oldest message to protect memoryRetry Configuration
const kafkaIO = new KafkaIO({ retry: { initialRetryTime: 100, retries: 8 }})Best Practices
-
Partition Strategy:
- More partitions = higher parallelism
- Consider message ordering requirements
- Start with 3-6 partitions for most use cases
-
Consumer Groups:
- Use unique
groupIdfor different consumer applications - Same
groupIdfor load balancing across instances - Different
groupIdfor broadcasting to multiple applications
- Use unique
-
Replication:
- Use
replicationFactor: 3for production - Ensure you have enough brokers for the replication factor
- Use
-
Memory Management:
- Set appropriate
maxQueueSizelimits - Monitor consumer lag in production
- Set appropriate
-
Error Handling:
- Implement proper error handling and reconnection logic
- Use retry configurations for resilience
-
Security:
- Use SSL/TLS for production environments
- Configure SASL for authentication
- Use network policies to restrict access
Production Considerations
Monitoring
Monitor key metrics:
- Consumer lag
- Throughput per partition
- Error rates
- Connection health
Scaling
- Horizontal Scaling: Add more consumers with the same
groupId - Vertical Scaling: Increase
numPartitionsfor more parallelism - Throughput: Adjust batch sizes and compression settings
Reliability
- Use at least 3 brokers for production
- Set
replicationFactor: 3for durability - Monitor broker health and network connectivity
Dependencies
kafkajs: Kafka client library for Node.js- Apache Kafka cluster (version 0.10+)
Performance Tips
- Batch Size: Increase
maxBatchSizefor higher throughput - Compression: Use
gziporsnappyfor large messages - Partitions: More partitions = higher concurrency
- Consumer Polling: Adjust
maxWaitTimeInMsfor latency vs throughput - Memory: Monitor
maxQueueSizeto prevent memory issues