@commandkit/queue
The CommandKit Queue package provides a service-agnostic message queue API for inter-service communication. It allows you to send and receive messages between different parts of your application or across multiple services using a simple, unified interface.
Features
- Service Agnostic: Works with any message queue implementation
- Simple API: Easy-to-use sendandreceivefunctions
- Type Safety: Full TypeScript support with strong typing
- Driver System: Pluggable drivers for different queue backends
- Discord.js Integration: Built-in support for Discord.js brokers
- Redis Support: Ready-to-use Redis PubSub driver
Installation
npm install @commandkit/queue@next
For Discord.js integration with Redis:
npm install @commandkit/queue@next @discordjs/brokers ioredis
Basic setup
Setting up the driver
Before you can send or receive messages, you need to set up a driver. The driver handles the actual message queue backend.
import { setDriver } from '@commandkit/queue';
import { RedisPubSubDriver } from '@commandkit/queue/discordjs';
import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';
// Create a Redis connection
const redis = new Redis();
// Create a broker
const broker = new PubSubRedisBroker(redis);
// Create a driver
const driver = new RedisPubSubDriver(broker);
// Set the driver
setDriver(driver);
Redis configuration
You can configure Redis for different environments:
import Redis from 'ioredis';
// Local Redis
const redis = new Redis();
// Or with configuration
const redis = new Redis({
  host: 'localhost',
  port: 6379,
  password: 'your-password',
  db: 0,
});
// Cloud Redis (example with Redis Cloud)
const redis = new Redis({
  host: 'your-redis-host.redis.cloud.com',
  port: 6379,
  password: 'your-redis-password',
  tls: {},
});
Core concepts
Topics
Topics are named channels where messages are sent and received. They function as categories for different types of messages.
Messages
Messages can be any JSON-serializable data. They're sent to topics and received by subscribers.
Drivers
Drivers are implementations that handle the actual message queue backend. CommandKit provides a unified API that works with any driver.
Basic operations
Sending messages
Use the send function to publish messages to a topic.
import { send } from '@commandkit/queue';
// Send a simple message
await send('user-events', { userId: '123', action: 'login' });
// Send different types of data
await send('notifications', {
  type: 'welcome',
  userId: '123',
  message: 'Welcome to our platform!',
});
await send('analytics', {
  event: 'page_view',
  page: '/dashboard',
  timestamp: Date.now(),
});
Receiving messages
Use the receive function to subscribe to messages from a topic.
import { receive } from '@commandkit/queue';
// Basic message handling
await receive('user-events', (message) => {
  console.log(`User ${message.userId} performed ${message.action}`);
});
// Handle different message types
await receive('notifications', (message) => {
  switch (message.type) {
    case 'welcome':
      console.log(`Welcome message for user ${message.userId}`);
      break;
    case 'reminder':
      console.log(`Reminder: ${message.message}`);
      break;
  }
});
// Async message handling
await receive('analytics', async (message) => {
  await processAnalyticsEvent(message);
});
Type safety
You can define types for your messages to get better TypeScript support.
interface UserEvent {
  userId: string;
  action: 'login' | 'logout' | 'register';
  timestamp?: number;
}
interface Notification {
  type: 'welcome' | 'reminder' | 'alert';
  userId: string;
  message: string;
}
// Type-safe sending
await send('user-events', {
  userId: '123',
  action: 'login',
  timestamp: Date.now(),
} as UserEvent);
// Type-safe receiving
await receive('user-events', (message: UserEvent) => {
  console.log(`User ${message.userId} ${message.action}`);
});
You can also define typed events for better TypeScript support:
interface QueueEvents {
  'user-updates': {
    userId: string;
    action: 'login' | 'logout' | 'register';
    timestamp: number;
  };
  'guild-events': {
    guildId: string;
    event: 'member-join' | 'member-leave' | 'role-update';
    data: any;
  };
  analytics: {
    event: string;
    data: Record<string, any>;
    timestamp: number;
  };
}
// Create a typed driver
const driver = new RedisPubSubDriver<QueueEvents>(broker);
setDriver(driver);
Discord.js integration
CommandKit Queue provides seamless integration with Discord.js through
the @discordjs/brokers package and Redis PubSub.
Cross-shard communication
Send messages between different shards of your Discord.js application:
// In shard 0
await send('shard-communication', {
  fromShard: 0,
  toShard: 1,
  type: 'user-status-update',
  data: { userId: '123456789', status: 'online' },
});
// In shard 1 (receiving)
await receive('shard-communication', (message) => {
  if (message.toShard === 1) {
    console.log(`Received from shard ${message.fromShard}:`, message.data);
  }
});
Multi-bot communication
Send messages between different Discord.js bots:
// In bot A
await send('bot-communication', {
  fromBot: 'bot-a',
  toBot: 'bot-b',
  type: 'user-data-request',
  data: { userId: '123456789' },
});
// In bot B
await receive('bot-communication', async (message) => {
  if (message.toBot === 'bot-b') {
    const userData = await getUserData(message.data.userId);
    await send('bot-communication', {
      fromBot: 'bot-b',
      toBot: 'bot-a',
      type: 'user-data-response',
      data: userData,
    });
  }
});
Real-time updates
Send real-time updates to connected clients:
// When a user joins a voice channel
await send('voice-updates', {
  userId: '123456789',
  guildId: '987654321',
  channelId: '111222333',
  action: 'join',
  timestamp: Date.now(),
});
// Handle voice updates
await receive('voice-updates', (message) => {
  // Update voice channel status
  updateVoiceChannelStatus(
    message.guildId,
    message.channelId,
    message.userId,
    message.action,
  );
  // Notify other users
  notifyVoiceChannelUsers(message.guildId, message.channelId, message);
});
Error handling
Always handle errors when sending or receiving messages.
// Handle Redis connection errors
redis.on('error', (error) => {
  console.error('Redis connection error:', error);
});
redis.on('connect', () => {
  console.log('Connected to Redis');
});
// Handle broker errors
broker.on('error', (error) => {
  console.error('Broker error:', error);
});
// Handle message processing errors
await receive('user-updates', async (message) => {
  try {
    await processUserUpdate(message);
  } catch (error) {
    console.error('Failed to process user update:', error);
    // Optionally retry or send to dead letter queue
    await send('failed-messages', {
      originalTopic: 'user-updates',
      message,
      error: error.message,
      timestamp: Date.now(),
    });
  }
});
Best practices
Use descriptive topic names
// Good
await send('user-authentication-events', message);
await send('order-processing-updates', message);
// Avoid
await send('events', message);
await send('data', message);
Structure your messages
// Good - structured message
await send('user-events', {
  type: 'login',
  userId: '123',
  timestamp: Date.now(),
});
// Avoid - unstructured
await send('user-events', 'user logged in');
Handle message processing gracefully
await receive('user-events', async (message) => {
  try {
    // Process the message
    await processUserEvent(message);
    // Acknowledge successful processing
    console.log(`Processed event for user ${message.userId}`);
  } catch (error) {
    // Log error but don't crash
    console.error(`Failed to process event for user ${message.userId}:`, error);
    // Optionally retry or send to dead letter queue
    await handleFailedMessage(message, error);
  }
});
Use appropriate message sizes
// Good - reasonable message size
await send('user-profile-updates', {
  userId: '123',
  changes: {
    displayName: 'New Name',
    avatar: 'https://example.com/avatar.jpg',
  },
});
// Avoid - very large messages
await send('user-profile-updates', {
  userId: '123',
  fullProfile: {
    /* massive object */
  },
});
Use cases
- Inter-Service Communication: Send messages between different parts of your application
- Event Broadcasting: Broadcast events to multiple subscribers
- Task Distribution: Distribute work across multiple workers
- Real-time Updates: Send real-time updates to connected clients
- Microservices: Enable communication between microservices
Cleanup
Always clean up resources when shutting down:
import { setDriver } from '@commandkit/queue';
// Cleanup function
async function cleanup() {
  try {
    // Close the driver
    const driver = getDriver(); // You'll need to implement this
    if (driver && driver.close) {
      await driver.close();
    }
    // Close Redis connection
    await redis.quit();
    console.log('Queue cleanup completed');
  } catch (error) {
    console.error('Error during cleanup:', error);
  }
}
// Handle graceful shutdown
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);