Queue
Overview
The Queue package provides a work queue system for managing and executing chat prompts in a batched manner. It leverages the Agent's state management and checkpoint system to preserve context between queued tasks, enabling efficient multi-prompt workflows.
Key Features
- Queue Management: Add, remove, and reorder work items with a simple API
- Checkpoint-Based State: Preserves agent state between queued tasks
- Chat Command Interface: Manage queues directly through chat commands
- Tool Access: Programmatically add tasks via tools
- State Persistence: Automatic serialization and restoration of queue state
- Queue Processing: Start, load, execute, skip, and complete queued tasks
Core Components
WorkQueueService
The main service class that implements the TokenRingService interface and manages work items.
Constructor:
new WorkQueueService({ maxSize?: number } = {})
Properties:
maxSize: number | undefined- Maximum number of items the queue can hold
Methods:
Queue Operations:
enqueue(item, agent): Add a work item to the queue, returnsboolean(success)dequeue(agent): Remove and return the first work itemget(idx, agent): Get work item at index without removingsplice(start, deleteCount, agent, ...items): Modify queue array, return removed itemsclear(agent): Remove all items from queuegetAll(agent): Return copy of all queue itemssize(agent): Get current queue sizeisEmpty(agent): Check if queue is empty
Status Methods:
started(agent): Check if service has startedgetCurrentItem(agent): Get current loaded work itemsetInitialCheckpoint(message, agent): Save initial agent stategetInitialCheckpoint(agent): Get saved initial state
Queue Control Methods:
startWork(agent): Begin queue processingstopWork(agent): Stop queue processingsetCurrentItem(item, agent): Set current work itemclearInitialCheckpoint(agent): Clear initial state
State Management:
serialize(): Convert state to plain objectdeserialize(data): Restore state from plain objectreset(what): Reset state for specific scopeshow(): Display current state in array form
State Slice: WorkQueueState
Manages the queue's internal state with persistence support.
Properties:
queue: QueueItem[]- Array of queued work itemsstarted: boolean- Whether queue processing has beguncurrentItem: QueueItem | null- Work item currently loaded for executioninitialCheckpoint: AgentCheckpointData | null- Saved state at queue start
Queue Item Type:
interface QueueItem {
checkpoint: AgentCheckpointData; // Agent state checkpoint for restoration
name: string; // Friendly name/description of task
input: string; // Actual prompt content to execute
}
Serialized Schema
interface SerializedWorkQueueState {
queue: QueueItem[];
started: boolean;
currentItem: QueueItem | null;
initialCheckpoint: AgentCheckpointData | null;
}
Chat Commands
All /queue commands are registered with the AgentCommandService and are available through the chat interface.
/queue list
Display all queued prompts with their indices.
Example:
/queue list
Output:
Queue contents:
1. Write README for the project
2. Fix bugs in authentication module
3. Add unit tests for services
Implementation Notes:
- Uses
numberedListutility to format output - Shows only task names
- Returns empty list message if queue is empty
/queue add <prompt>
Add a new chat prompt to the end of the queue.
Example:
/queue add 'Write a Python function to calculate Fibonacci numbers'
Output:
Added to queue. Queue length: 1
Validation:
- Raises error if no prompt provided
- Captures conversation checkpoint for state restoration
/queue remove <index>
Remove the prompt at the given zero-based index.
Example:
/queue remove 2
Output:
Removed "Fix bug in login" from queue. Remaining: 3
Validation:
- Index must be >= 0 and < queue size
- Shows error message for invalid indices
/queue details <index>
Show detailed information about a specific queue item.
Example:
/queue details 0
Output:
Queue item details:
{
"checkpoint": {...},
"name": "Write README for the project",
"input": "Write a comprehensive README covering all features..."
}
Implementation Notes:
- Uses
JSON.stringifywith 2-space indentation - Includes full item structure including checkpoint
/queue clear
Remove all prompts from the queue.
Example:
/queue clear
Output:
Queue cleared!
/queue start
Begin queue processing. This command saves the current agent state as an initial checkpoint.
Example:
/queue start
Output:
Queue started, use /queue next to start working on the first item in the queue, or /queue done to end the queue.
Process:
- Saves current agent state as initial checkpoint
- Sets started flag to true
- Creates a checkpoint entry marking queue start using
@tokenring-ai/checkpoint
/queue next
Load the first queued item (does not execute it).
Example:
/queue next
Output:
Queue Item loaded: Write README for the project Use /queue run to run the queue item, and /queue next|skip|done to move on to the next item.
Process:
- Dequeues the first item
- Sets it as the current item
- Displays the item name
/queue run
Execute the currently loaded queued prompt.
Example:
/queue run
Process:
- Restores agent state from the current item's checkpoint
- Executes the chat command via
runChat()with the item's input - Uses
@tokenring-ai/chat/runChatfunction - Creates error handling for failures
- Has no return value (directly processes chat)
/queue skip
Skip the current item and re-add it to the end of the queue.
Example:
/queue skip
Output:
Queue item skipped. It has been added to the end of the queue in case you would like to run it later, and you can use /queue next to load the next item in the queue, or /queue done to end the queue.
Process:
- Re-adds the current item to the queue
- Clears the current item
- Allows the next item to be processed
/queue done
End queue processing and restore the initial agent state.
Example:
/queue done
Output:
Restored chat state to preserved state.
Process:
- Checks if queue is empty
- If initial checkpoint exists, restores agent state to it
- Shows error if no initial checkpoint found
- Clears queue items and processing state
- Stops work/queue processing
Error Cases:
- If no initial checkpoint found, shows error:
Couldn't restore initial state, no initial checkpoint found
Tools
queue_addTaskToQueue
Adds a task to the work queue for later execution.
Tool Definition:
{
name: "queue_addTaskToQueue",
displayName: "Queue/addTaskToQueue",
description: "Adds a task to the queue for later execution by the system."
}
Input Schema:
{
description: z.string().describe("A short description of the task to be performed"),
content: z.string().describe("A natural language string, explaining the exact task to be performed, in great detail. This string will be used to prompt an AI agent as the next message in this conversation, so should be as detailed as possible, and should directly order the AI agent to execute the task, using the tools that are available to it.")
}
Returns:
{
status: "queued";
message: "Task has been queued for later execution.";
}
Error Cases:
- Throws error if
descriptionis not provided - Throws error if
contentis not provided
Output Handling:
- Uses
agent.infoMessage()to prefix output with tool name - This ensures output is clearly attributed to the tool
Usage Example:
// Via tool
{
"name": "queue_addTaskToQueue",
"arguments": {
"description": "Analyze dependencies",
"content": "Use the filesystem tools to find all npm dependencies in the project and create a breakdown report."
}
}
Integration
Plugin Configuration
The package has a configuration schema with optional agentDefaults for queue size limits:
// In plugin.ts
const packageConfigSchema = z.object({
queue: WorkQueueServiceConfigSchema.prefault({})
});
// WorkQueueServiceConfigSchema
export const WorkQueueServiceConfigSchema = z.object({
agentDefaults: z.object({
maxSize: z.number().positive().optional()
}).prefault({})
});
Agent Configuration
The queue service can be configured at the agent level:
const agentConfig = {
queue: {
maxSize: 100
}
};
const agent = new Agent(app, { config: agentConfig, headless: false });
Agent Configuration Schema
{
queue: {
maxSize?: number // Optional: Maximum queue size for this agent
}
}
Agent Initialization
The service is automatically attached to the agent during plugin initialization.
// Called automatically in plugin.ts
queueService.attach(agent);
Service Registration
The service is registered in the plugin configuration:
// In plugin.ts
import WorkQueueService from "./WorkQueueService.js";
app.addServices(new WorkQueueService());
Usage Examples
Basic Queue Workflow
import WorkQueueService from '@tokenring-ai/queue';
// Initialize service (optional maxSize parameter)
const queueService = new WorkQueueService({ maxSize: 50 });
// Attach to agent - this happens automatically in plugin
queueService.attach(agent);
// Add multiple tasks
queueService.enqueue(
{
checkpoint: agent.generateCheckpoint(),
name: 'Create user types',
input: 'Generate TypeScript interfaces for User, Account, and Profile'
},
agent
);
queueService.enqueue(
{
checkpoint: agent.generateCheckpoint(),
name: 'Write service layer',
input: 'Implement UserService, AccountService, and ProfileService with proper validation'
},
agent
);
// Start queue processing
queueService.startWork(agent);
queueService.setInitialCheckpoint(agent.generateCheckpoint(), agent);
// Process items one by one
while (true) {
const item = queueService.dequeue(agent);
if (!item) {
break; // Queue is empty
}
// Set current item
queueService.setCurrentItem(item, agent);
// Restore state and execute
agent.restoreState(item.checkpoint.state);
// Execute the task
// ... (execute using chat service or other means)
// Create checkpoint
await agent.createCheckpoint(`Completed: ${item.name}`);
}
// Complete queue and restore state
queueService.stopWork(agent);
const initial = queueService.getInitialCheckpoint(agent);
if (initial) {
agent.restoreState(initial.state);
}
Chat Command Workflow
// Build up a queue of prompts offline
/queue add 'Generate README documentation for the project'
/queue add 'Create unit tests for database interactions'
/queue add 'Add error handling to API endpoints'
// Review the queue
/queue list
// Start processing
/queue start
// Process all items automatically
# Phase 1: Generate documentation
/queue next
/queue run
# Phase 2: Write tests
/queue next
/queue run
# Phase 3: Add error handling
/queue next
/queue run
# Found an issue in documentation - skip and fix later
/queue skip
# Continue with remaining items
/queue next
/queue run
# Finish and restore context
/queue done
Programmatic Task Management
// Add a task via tool
const result = await agent.executeTool('queue_addTaskToQueue', {
description: 'Analyze performance bottlenecks',
content: 'Use performance monitoring tools to identify slow queries and implement optimizations using database indexing and query optimization techniques.'
});
console.log(result); // { status: 'queued', message: 'Task has been queued for later execution.' }
// Check queue status
const size = queueService.size(agent);
const isEmpty = queueService.isEmpty(agent);
const itemCount = queueService.getAll(agent).length;
// Inspect current item
const current = queueService.getCurrentItem(agent);
console.log(`Current task: ${current?.name}`);
// Modify queue programmatically
queueService.splice(1, 1, agent, {
checkpoint: agent.generateCheckpoint(),
name: 'Fixed analysis task',
input: 'Updated analysis with cache optimization'
});
// View queue state
const state = queueService.serialize();
console.log('Queue state:', state);
// Reset queue under specific scope
const agent = createAgent();
queueService.attach(agent);
// ... use queue ...
// Reset only when "chat" is reset
queueService.reset(['chat']);
Advanced: Loading and Processing Items
// Add items to queue with meaningful names
queueService.enqueue({ checkpoint: agent.generateCheckpoint(), name: "Task 1", input: "" }, agent);
queueService.enqueue({ checkpoint: agent.generateCheckpoint(), name: "Task 2", input: "" }, agent);
queueService.enqueue({ checkpoint: agent.generateCheckpoint(), name: "Task 3", input: "" }, agent);
// Start queue
queueService.startWork(agent);
queueService.setInitialCheckpoint(agent.generateCheckpoint(), agent);
// Load and process items
while (true) {
const next = queueService.dequeue(agent);
if (!next) break;
queueService.setCurrentItem(next, agent);
// Activate the item's chat context
agent.restoreState(next.checkpoint.state);
// Get chat service and execute
const chatService = agent.requireServiceByType(ChatService);
const chatConfig = chatService.getChatConfig(agent);
try {
await runChat(next.input, chatConfig, agent);
} catch (error: any) {
agent.errorMessage(`Error running queued prompt: ${error.message || error}`);
}
await agent.createCheckpoint(`Completed: ${next.name}`);
}
// Restore initial state
queueService.stopWork(agent);
const initial = queueService.getInitialCheckpoint(agent);
if (initial) {
agent.restoreState(initial.state);
}
State Management
Serialization
The queue state is automatically serialized using a Zod schema:
const serializationSchema = z.object({
queue: z.array(z.object({
checkpoint: z.any(),
name: z.string(),
input: z.string()
})),
started: z.boolean(),
currentItem: z.any().nullable(),
initialCheckpoint: z.any().nullable()
});
When serialized:
- Queue items maintain their checkpoint references
- Processing state (started, currentItem) is preserved
- Initial checkpoint is kept for restoration
State structure:
interface SerializedWorkQueueState {
queue: QueueItem[];
started: boolean;
currentItem: QueueItem | null;
initialCheckpoint: AgentCheckpointData | null;
}
Reset Behavior
The reset(what) method clears queue state when appropriate scopes are reset:
- When
what.includes('chat'): Clears queue, current item, and initial checkpoint - Other reset scopes: No effect on queue state (by design)
This ensures that queue items persist across unrelated state cleanups.
Checkpoint Integration
The queue automatically creates checkpoints:
- Initial Checkpoint: Created when queue is started via
setInitialCheckpoint - Completion Checkpoints: Created for each completed item using
agent.createCheckpoint() - Start Checkpoint: Created using
@tokenring-ai/checkpointwhen queue starts processing - End Checkpoint: Created for each item when done processing via
runChat
State Circular References
The serialization schema uses z.any() for checkpoint fields to handle circular references that naturally exist in the AgentCheckpointData structure, ensuring successful serialization.
State Slice
WorkQueueState Class
class WorkQueueState implements AgentStateSlice<typeof serializationSchema> {
// State properties
name = "WorkQueueState";
queue: QueueItem[] = [];
started = false;
currentItem: QueueItem | null = null;
initialCheckpoint: AgentCheckpointData | null = null;
// Serialization schema
serializationSchema = serializationSchema;
// Methods
// Reset state when appropriate scope is reset
reset(what: ResetWhat[]): void {
if (what.includes("chat")) {
this.queue = [];
this.started = false;
this.currentItem = null;
this.initialCheckpoint = null;
}
}
// Convert to serializable format
serialize(): z.output<typeof serializationSchema> {
return {
started: this.started,
currentItem: this.currentItem,
initialCheckpoint: this.initialCheckpoint,
queue: this.queue,
};
}
// Restore from serializable format
deserialize(data: z.output<typeof serializationSchema>): void {
this.started = data.started;
this.currentItem = data.currentItem;
this.initialCheckpoint = data.initialCheckpoint;
this.queue = data.queue;
}
// Get human-readable state summary
show(): string[] {
return [
"Started: " + this.started,
"Queue Items: " + this.queue.length,
"Current Item: " + (this.currentItem?.name || "None")
];
}
}
State Display
The show() method returns an array of status strings for quick debugging:
[
"Started: false",
"Queue Items: 5",
"Current Item: None"
]
Error Handling
Command Validation
All /queue commands include input validation:
- Index validation: Index must be within valid range
- Prompt validation: Add command requires non-empty prompt
- State validation: Run/Next require queue to be started
Error Messages
Clear error messages guide users to correct their commands:
// Invalid index
agent.errorMessage("Usage: /queue remove <index> (index starts from 0)");
// Missing prompt
agent.errorMessage("Usage: /queue add <prompt>");
// Queue not started
agent.infoMessage("Queue not started. Use /queue start to start the queue.");
// No items in queue
agent.infoMessage("No queue item loaded. Use /queue next to load the next item...");
// No initial checkpoint
agent.errorMessage("Couldn't restore initial state, no initial checkpoint found");
Execution Errors
The /queue run command includes try/catch error handling:
try {
await runChat(input, chatConfig, agent);
} catch (error: any) {
agent.errorMessage(
"Error running queued prompt: " + (error.message || error)
);
}
Best Practices
Queue Organization
- Descriptive Names: Use clear, descriptive names for queue items
- Logical Ordering: Sort tasks logically (e.g., write → test → deploy)
- Batch Related Tasks: Group related operations together
Example:
/queue add 'Analyze user behavior'
/queue add 'Generate monthly reports'
/queue add 'Update dashboard queries'
Error Handling
- Try/Catch in Commands: All
/queue runcommands include error handling - Checkpoint Recovery: System attempts to restore initial state even after errors
- Manual Recovery: Use
/queue doneto restore state if automation fails
Performance Considerations
- MaxSize Option: Set
maxSizeto prevent memory issues with very large queues - Batch Processing: Queue is more efficient than executing prompts one-by-one
- State Efficiency: Checkpoint system provides good balance between speed and state capture
Usage Patterns
- Seed the Queue: Add multiple tasks before starting processing
- Process Incrementally: Load and execute items iteratively to monitor progress
- Handle Interruptions: Use
/queue nextand/queue skipto manage unexpected issues - Preserve Context: Queue maintains full conversation context between tasks
Checkpoint Strategy
- Start Checkpoint: Enable state restoration after successful completion
- Completion Checkpoints: Track what has been done for debugging
- Error Recovery: System attempts auto-recovery using checkpoints
Troubleshooting
Queue Empty
Problem: Queue appears empty but not processing
Solution:
- Verify with
/queue listto confirm items exist - Check that
/queue startwas called - Ensure no items were accidentally cleared
Not Started
Problem: Cannot load items from queue
Solution:
- Run
/queue startto begin queue processing - Check queue is not empty with
/queue list
State Restoration Issues
Problem: State not restored after completing queue
Solution:
- Verify initial checkpoint was saved with
/queue start - Check for errors during checkpoint creation
- Manually restore using
/queue done
Index Out of Range
Problem: Remove operation fails with index error
Solution:
- Use
/queue listto get current queue indices - Ensure index is 0-based and within queue size
- Check for queue changes between add and remove
Tool Parameter Errors
Problem: queue_addTaskToQueue throws errors about missing parameters
Solution:
- Ensure both
descriptionandcontentparameters are provided - Check parameter schema for type requirements
- Verify string inputs are not empty
Queue Items Not Processing
Problem: After /queue next, items don't process
Solution:
- Ensure
/queue startwas called before processing - Verify items were added with
/queue add - Check queue with
/queue listto confirm items exist
Testing
Unit Test Example
import { describe, it, expect } from 'vitest';
import WorkQueueService from './WorkQueueService';
import type { Agent } from '@tokenring-ai/agent';
describe('WorkQueueService', () => {
it('should enqueue and dequeue items', () => {
const service = new WorkQueueService({ maxSize: 10 });
const mockAgent = createMockAgent();
// Attach agent to service (in real usage, this is automatic)
// Attach would be needed for agent-dependent methods
// For unit tests, mock the agent interactions
// Service can be tested independently for queue operations
expect(service.size(mockAgent)).toBe(0);
});
it('should enforce queue size limit', () => {
const service = new WorkQueueService({ maxSize: 2 });
const mockAgent = createMockAgent();
// Attempt to add more items than maxSize
const result1 = service.enqueue(
{ checkpoint: {}, name: 'A', input: '' },
mockAgent
);
const result2 = service.enqueue(
{ checkpoint: {}, name: 'B', input: '' },
mockAgent
);
const result3 = service.enqueue(
{ checkpoint: {}, name: 'C', input: '' },
mockAgent
);
expect(result1).toBe(true);
expect(result2).toBe(true);
expect(result3).toBe(false); // Queue full
});
it('should serialize and deserialize state', () => {
const service = new WorkQueueService();
const mockAgent = createMockAgent();
// Enqueue items
service.enqueue(
{ checkpoint: {}, name: 'Test', input: '' },
mockAgent
);
service.enqueue(
{ checkpoint: {}, name: 'Test2', input: '' },
mockAgent
);
// Serialize
const serialized = service.serialize();
expect(serialized.queue.length).toBe(2);
// Deserialize (requires WorkQueueState instance)
const state = new WorkQueueState();
state.deserialize(serialized);
expect(state.queue.length).toBe(2);
});
});
describe('WorkQueueState', () => {
it('should reset queue state when chat is reset', () => {
const state = new WorkQueueState();
// Modify state
const item: QueueItem = {
checkpoint: {},
name: 'Test',
input: ''
};
state.queue.push(item);
state.started = true;
state.currentItem = item;
// Reset
state.reset(['chat']);
expect(state.queue.length).toBe(0);
expect(state.started).toBe(false);
expect(state.currentItem).toBeNull();
});
it('should show state summary', () => {
const state = new WorkQueueState();
state.queue.push({ checkpoint: {}, name: 'Item1', input: '' });
state.queue.push({ checkpoint: {}, name: 'Item2', input: '' });
const summary = state.show();
expect(summary[0]).toBe('Started: false');
expect(summary[1]).toBe('Queue Items: 2');
expect(summary[2]).toContain('Current Item: None');
});
});
Package Structure
@tokenring-ai/queue/
├── plugin.ts # Plugin registration and service setup
├── package.json # Package metadata and dependencies
├── index.ts # Service exports
├── WorkQueueService.ts # Main service class
├── chatCommands.ts # Chat command exports
├── tools.ts # Tool exports
├── commands/
│ └── queue.ts # /queue command implementation
├── tools/
│ └── addTaskToQueue.ts # addTaskToQueue tool
└── state/
└── workQueueState.ts # WorkQueueState implementation
Dependencies
@tokenring-ai/agent: Agent framework and state management@tokenring-ai/app: Application framework and plugin system@tokenring-ai/chat: Chat service for command execution (runChat)@tokenring-ai/checkpoint: Checkpoint management for state saving@tokenring-ai/utility: Shared utilities including deepMergezod: Schema validation and configuration
Plugin Registration
The plugin automatically registers:
- ChatTools: Adds
queue_addTaskToQueuetool to chat service - AgentCommands: Registers
/queuecommands with AgentCommandService - Services: Registers WorkQueueService with app
// plugin.ts
export default {
name: '@tokenring-ai/queue',
version: '0.2.0',
description: 'Queue for Token Ring',
install(app, config) {
app.waitForService(ChatService, chatService =>
chatService.addTools(tools)
);
app.waitForService(AgentCommandService, agentCommandService =>
agentCommandService.addAgentCommands(chatCommands)
);
app.addServices(new WorkQueueService());
},
config: z.object({
queue: WorkQueueServiceConfigSchema.prefault({})
})
} satisfies TokenRingPlugin<typeof packageConfigSchema>;
License
MIT License - see LICENSE file for details.