@tokenring-ai/rpc
Overview
The RPC (Remote Procedure Call) service provides a centralized, type-safe mechanism for registering and executing remote procedures within the Token Ring ecosystem. This package enables plugins to register RPC endpoints with type-safe schemas and implementations, supporting queries, mutations, and streaming methods through a KeyedRegistry-based architecture.
Key Features
- Centralized Registry: KeyedRegistry for managing RPC endpoints with type safety
- Type-Safe Operations: Full TypeScript support with Zod schema validation
- Three Method Types:
- Query: Read-only operations that return a single value
- Mutation: State-changing operations that modify data
- Stream: Asynchronous generators for real-time data streams
- Schema-First Design: Separate schema definitions from implementation logic
- Local Client:
createLocalRPCClientfor direct in-process endpoint calls - Plugin Integration: Seamless integration with web-host and other services
- Abort Support: Stream methods support abort signals for cancellation
Core Components
RpcService
The main service that manages RPC endpoints using a KeyedRegistry. Implements the TokenRingService interface.
Class Signature:
export default class RpcService implements TokenRingService {
readonly name: string;
readonly description: string;
getEndpoint: (name: string) => RpcEndpoint | undefined;
getAllEndpoints: () => RpcEndpoint[];
registerEndpoint(endpoint: RpcEndpoint): void;
}
Properties:
name: Service identifier ("RpcService")description: Service description ("RPC endpoint registry and execution service")
Methods:
getEndpoint(name: string): Retrieves an endpoint by namegetAllEndpoints(): Returns all registered endpointsregisterEndpoint(endpoint: RpcEndpoint): Registers a new endpoint
Usage:
import RpcService from '@tokenring-ai/rpc';
// Create and register the service
const rpcService = new RpcService();
app.addServices(rpcService);
// Register endpoints
rpcService.registerEndpoint(endpoint);
// Get endpoint by name
const endpoint = rpcService.getEndpoint('myservice');
// Get all registered endpoints
const allEndpoints = rpcService.getAllEndpoints();
createRPCEndpoint
Helper function to create type-safe RPC endpoints from schemas and implementations.
Function Signature:
export function createRPCEndpoint<T extends RPCSchema>(
schemas: T,
implementation: RPCImplementation<T>
): RpcEndpoint;
Parameters:
schemas: RPCSchema defining the endpoint structureimplementation: RPCImplementation with method implementations
Returns:
RpcEndpoint: Type-safe endpoint with all methods bound
Usage:
import {createRPCEndpoint} from '@tokenring-ai/rpc/createRPCEndpoint';
import {z} from 'zod';
const schemas = {
name: "My Service",
path: '/rpc/myservice',
methods: {
greet: {
type: 'query' as const,
input: z.object({ name: z.string() }),
result: z.object({ message: z.string() })
}
}
};
const implementation = {
greet: async (args, app) => ({ message: `Hello, ${args.name}!` })
};
const endpoint = createRPCEndpoint(schemas, implementation);
createLocalRPCClient
Creates an RPC client that calls endpoint methods directly in-process. Useful for tests or when the UI and Backend run in the same process.
Function Signature:
export default function createLocalRPCClient<T extends RPCSchema>(
endpoint: RpcEndpoint,
app: TokenRingApp
): {
[K in keyof T["methods"]]: FunctionTypeOfRPCCall<T, K>;
};
Parameters:
endpoint: The RPC endpoint to create a client forapp: TokenRingApp instance for method execution
Returns:
- Object with methods matching the endpoint's schema
Usage:
import createLocalRPCClient from '@tokenring-ai/rpc/createLocalRPCClient';
const endpoint = rpcService.getEndpoint('myservice');
if (endpoint) {
const client = createLocalRPCClient(endpoint, app);
// Call methods directly on the client
const result = await client.greet({ name: 'World' });
console.log(result.message); // "Hello, World!"
// For streaming methods
const controller = new AbortController();
for await (const item of client.streamLogs({ count: 5 }, controller.signal)) {
console.log(item.log);
}
}
Services
RpcService
Interface: TokenRingService
Purpose: Centralized RPC endpoint registry and execution service
Registration (Manual):
import RpcService from '@tokenring-ai/rpc';
const rpcService = new RpcService();
app.addServices(rpcService);
Registration (Plugin): The package includes a plugin that automatically registers the RpcService:
import rpcPlugin from '@tokenring-ai/rpc/plugin';
app.install(rpcPlugin);
The plugin requires no configuration and automatically adds the RpcService to the application.
Provider Documentation
This package does not define provider interfaces. It provides a service-based architecture for RPC endpoint management.
RPC Endpoints
This package does not define predefined RPC endpoints. Instead, it provides the infrastructure for other packages to register their own RPC endpoints using the RpcService.
Packages using this service define their endpoints with:
- Name: Unique identifier for the endpoint
- Path: URL path for the endpoint (e.g.,
/rpc/myservice) - Methods: Object containing query, mutation, and stream method definitions
Chat Commands
This package does not define any chat commands. It provides the infrastructure for other packages to register RPC endpoints.
Configuration
The RPC package has a minimal configuration schema with no required options:
const packageConfigSchema = z.object({});
Plugin Configuration
When using the built-in plugin, no configuration is required:
import rpcPlugin from '@tokenring-ai/rpc/plugin';
app.install(rpcPlugin); // No config needed
The plugin automatically:
- Registers the RpcService with the application
- Provides the RPC endpoint registry for plugins to use
Manual Service Registration
If you prefer to register the service manually without the plugin:
import RpcService from '@tokenring-ai/rpc';
const rpcService = new RpcService();
app.addServices(rpcService);
Then register your endpoints as needed.
Usage Examples
Basic Setup
import RpcService from '@tokenring-ai/rpc';
const rpcService = new RpcService();
app.addServices(rpcService);
Registering an Endpoint
import {z} from 'zod';
import {createRPCEndpoint} from '@tokenring-ai/rpc/createRPCEndpoint';
import RpcService from '@tokenring-ai/rpc';
// Define schemas
const myServiceSchemas = {
name: "My Service",
path: '/rpc/myservice',
methods: {
greet: {
type: 'query' as const,
input: z.object({ name: z.string() }),
result: z.object({ message: z.string() })
},
updateUser: {
type: 'mutation' as const,
input: z.object({ id: z.string(), name: z.string() }),
result: z.object({ success: z.boolean() })
},
streamLogs: {
type: 'stream' as const,
input: z.object({ count: z.number() }),
result: z.object({ log: z.string() })
}
}
};
// Define implementation
const myServiceImpl = {
greet: async (args, app) => {
return { message: `Hello, ${args.name}!` };
},
updateUser: async (args, app) => {
// Update logic here
return { success: true };
},
streamLogs: async function* (args, app, signal) {
for (let i = 0; i < args.count; i++) {
if (signal.aborted) break;
yield { log: `Log entry ${i}` };
}
}
};
// Create and register endpoint
const myEndpoint = createRPCEndpoint(myServiceSchemas, myServiceImpl);
rpcService.registerEndpoint(myEndpoint);
Calling RPC Methods
// Get endpoint and call method directly
const endpoint = rpcService.getEndpoint('myservice');
if (endpoint) {
const result = await endpoint.methods.greet.execute(
{ name: 'World' },
app
);
console.log(result.message); // "Hello, World!"
}
// Stream example
const streamMethod = endpoint.methods.streamLogs;
if (streamMethod.type === 'stream') {
const controller = new AbortController();
for await (const item of streamMethod.execute(
{ count: 5 },
app,
controller.signal
)) {
console.log(item.log);
}
}
Using the Local Client
import createLocalRPCClient from '@tokenring-ai/rpc/createLocalRPCClient';
const endpoint = rpcService.getEndpoint('myservice');
if (endpoint) {
const client = createLocalRPCClient(endpoint, app);
// Call methods directly on the client
const result = await client.greet({ name: 'World' });
console.log(result.message); // "Hello, World!"
// For streaming methods
const controller = new AbortController();
for await (const item of client.streamLogs({ count: 5 }, controller.signal)) {
console.log(item.log);
}
}
Integration with Web Host
import {WebHostService} from '@tokenring-ai/web-host';
import JsonRpcResource from '@tokenring-ai/web-host/JsonRpcResource';
import RpcService from '@tokenring-ai/rpc';
app.waitForService(WebHostService, webHostService => {
app.waitForService(RpcService, rpcService => {
const endpoint = rpcService.getEndpoint('myservice');
if (endpoint) {
webHostService.registerResource(
'My Service RPC',
new JsonRpcResource(app, endpoint)
);
}
});
});
Plugin Integration
RPC endpoints can be registered from any plugin that has access to the RpcService:
import {TokenRingPlugin} from '@tokenring-ai/app';
import RpcService from '@tokenring-ai/rpc';
import {createRPCEndpoint} from '@tokenring-ai/rpc/createRPCEndpoint';
import {z} from 'zod';
export default {
name: '@my/plugin',
version: '1.0.0',
install(app, config) {
app.waitForService(RpcService, rpcService => {
const schemas = {
name: "My Plugin",
path: '/rpc/myplugin',
methods: {
ping: {
type: 'query' as const,
input: z.object({}),
result: z.object({ pong: z.boolean() })
}
}
};
const impl = {
ping: async (args, app) => ({ pong: true })
};
const endpoint = createRPCEndpoint(schemas, impl);
rpcService.registerEndpoint(endpoint);
});
}
} satisfies TokenRingPlugin;
Using the Built-in Plugin
The package includes a built-in plugin that automatically registers the RpcService. Install it directly:
import rpcPlugin from '@tokenring-ai/rpc/plugin';
// Install the plugin
app.install(rpcPlugin);
// The RpcService is now available
app.waitForService(RpcService, rpcService => {
// Register your endpoints
const endpoint = createRPCEndpoint(schemas, implementation);
rpcService.registerEndpoint(endpoint);
});
Integration
Dependencies
The package depends on:
- @tokenring-ai/app: Base application framework and service management (0.2.0)
- @tokenring-ai/utility: Shared utilities, including KeyedRegistry (0.2.0)
- zod: Schema validation library (^4.3.6)
Integration with Other Packages
@tokenring-ai/app
The RPC package integrates with the app framework through:
- Service Registration: RpcService implements
TokenRingServiceinterface - Plugin System: Includes a plugin for automatic service registration
- App Lifecycle: Uses
app.addServices()andapp.waitForService()patterns
@tokenring-ai/utility
Uses KeyedRegistry from the utility package for endpoint management:
import KeyedRegistry from '@tokenring-ai/utility/registry/KeyedRegistry';
private endpoints = new KeyedRegistry<RpcEndpoint>();
@tokenring-ai/web-host
RPC endpoints can be exposed via HTTP/WebSocket through the web-host package:
import {WebHostService} from '@tokenring-ai/web-host';
import JsonRpcResource from '@tokenring-ai/web-host/JsonRpcResource';
import RpcService from '@tokenring-ai/rpc';
app.waitForService(WebHostService, webHostService => {
app.waitForService(RpcService, rpcService => {
const endpoint = rpcService.getEndpoint('myservice');
if (endpoint) {
webHostService.registerResource(
'My Service RPC',
new JsonRpcResource(app, endpoint)
);
}
});
});
Agent System Integration
RPC endpoints registered with RpcService can be accessed by agents and other plugins:
app.waitForService(RpcService, rpcService => {
// Register endpoint
const endpoint = createRPCEndpoint(schemas, implementation);
rpcService.registerEndpoint(endpoint);
// Agents and other plugins can now access this endpoint
const retrievedEndpoint = rpcService.getEndpoint('myservice');
});
Service Registration Patterns
Manual Registration
import RpcService from '@tokenring-ai/rpc';
// Register the service
const rpcService = new RpcService();
app.addServices(rpcService);
// Wait for service to be available
app.waitForService(RpcService, service => {
// Use the service
service.registerEndpoint(endpoint);
});
Plugin-based Registration
import rpcPlugin from '@tokenring-ai/rpc/plugin';
// Install plugin
app.install(rpcPlugin);
// Wait for service to be available
app.waitForService(RpcService, rpcService => {
// Register endpoints
const endpoint = createRPCEndpoint(schemas, implementation);
rpcService.registerEndpoint(endpoint);
});
Best Practices
-
Separate Schemas from Implementation: Always define your schemas first, then implement the methods separately. This promotes clear separation of concerns.
-
Use Appropriate Method Types: Choose the correct method type based on whether the operation is read-only (query), state-changing (mutation), or produces streaming data.
-
Validate Input and Output: Always use Zod schemas to validate input and output types, ensuring type safety and preventing runtime errors.
-
Handle Abort Signals: For stream methods, properly handle abort signals to allow proper cleanup and resource release.
-
Use Semantic Names: Choose clear, descriptive names for endpoints and methods to improve code readability and maintainability.
-
Document Method Behavior: Document the expected behavior of each method, including any side effects and error conditions.
-
Test Edge Cases: Include tests for edge cases such as empty method lists, single methods, and mixed method types.
-
Leverage Plugin Pattern: Use the plugin pattern to register RPC endpoints in a way that integrates cleanly with the Token Ring application lifecycle.
-
Use Local Client for Tests: For unit tests where the UI and Backend run in the same process, use
createLocalRPCClientinstead of making actual HTTP calls. -
Direct Endpoint Registration: Always register the complete endpoint object with
registerEndpoint()method.
Testing
The package includes comprehensive tests for the createRPCEndpoint function using vitest.
Running Tests
# Run all tests
bun test
# Run tests in watch mode
bun test --watch
# Run tests with coverage
bun test --coverage
Test File Structure
The test file createRPCEndpoint.test.ts covers the following scenarios:
import createTestingApp from "@tokenring-ai/app/test/createTestingApp";
import {describe, expect, it} from 'vitest';
import {z} from 'zod';
import {createRPCEndpoint} from './createRPCEndpoint';
describe('createRPCEndpoint', () => {
let mockApp: any;
beforeEach(() => {
mockApp = createTestingApp();
});
it('should create endpoint with correct path', () => {
const endpoint = createRPCEndpoint(schemas, implementation);
expect(endpoint.path).toBe('/api/rpc');
});
it('should convert query method', () => {
const endpoint = createRPCEndpoint(schemas, implementation);
expect(endpoint.methods.testQuery).toEqual({
type: 'query',
inputSchema: schemas.methods.testQuery.input,
resultSchema: schemas.methods.testQuery.result,
execute: implementation.testQuery
});
});
it('should convert mutation method', () => {
const endpoint = createRPCEndpoint(schemas, implementation);
expect(endpoint.methods.testMutation).toEqual({
type: 'mutation',
inputSchema: schemas.methods.testMutation.input,
resultSchema: schemas.methods.testMutation.result,
execute: implementation.testMutation
});
});
it('should convert stream method', () => {
const endpoint = createRPCEndpoint(schemas, implementation);
expect(endpoint.methods.testStream).toEqual({
type: 'stream',
inputSchema: schemas.methods.testStream.input,
resultSchema: schemas.methods.testStream.result,
execute: implementation.testStream
});
});
it('should handle empty methods', () => {
const emptySchemas = {
name: "Example RPC",
path: '/api/empty',
methods: {}
};
const emptyImplementation = {};
const endpoint = createRPCEndpoint(emptySchemas, emptyImplementation);
expect(endpoint.path).toBe('/api/empty');
expect(Object.keys(endpoint.methods)).toHaveLength(0);
});
it('should preserve method implementations', async () => {
const endpoint = createRPCEndpoint(schemas, implementation);
const result = await endpoint.methods.testQuery.execute(
{ message: 'world' },
mockApp
);
expect(result).toBeDefined();
});
it('should handle single method', () => {
const singleMethodSchemas = {
name: "Single Method RPC",
path: '/api/single',
methods: {
ping: {
type: 'query' as const,
input: z.object({}),
result: z.object({ pong: z.boolean() })
}
}
};
const singleMethodImplementation = {
ping: async (args: any, app: any) => ({ pong: true })
};
const endpoint = createRPCEndpoint(singleMethodSchemas, singleMethodImplementation);
expect(endpoint.path).toBe('/api/single');
expect(Object.keys(endpoint.methods)).toHaveLength(1);
});
it('should handle mixed method types', () => {
const endpoint = createRPCEndpoint(schemas, implementation);
const methods = endpoint.methods;
expect(methods.testQuery.type).toBe('query');
expect(methods.testMutation.type).toBe('mutation');
expect(methods.testStream.type).toBe('stream');
});
});
Test Coverage
The test suite covers:
- Endpoint creation with correct path
- Query method conversion
- Mutation method conversion
- Stream method conversion
- Empty methods handling
- Method implementation preservation
- Single method handling
- Mixed method types
Writing Tests for Your RPC Endpoints
When creating your own RPC endpoints, follow this testing pattern:
import {describe, expect, it} from 'vitest';
import {z} from 'zod';
import {createRPCEndpoint} from '@tokenring-ai/rpc/createRPCEndpoint';
import createLocalRPCClient from '@tokenring-ai/rpc/createLocalRPCClient';
import createTestingApp from '@tokenring-ai/app/test/createTestingApp';
describe('My RPC Endpoint', () => {
const schemas = {
name: "My Service",
path: '/rpc/myservice',
methods: {
greet: {
type: 'query' as const,
input: z.object({ name: z.string() }),
result: z.object({ message: z.string() })
}
}
};
const implementation = {
greet: async (args, app) => ({ message: `Hello, ${args.name}!` })
};
it('should create endpoint correctly', () => {
const endpoint = createRPCEndpoint(schemas, implementation);
expect(endpoint.name).toBe('My Service');
expect(endpoint.path).toBe('/rpc/myservice');
});
it('should execute method correctly', async () => {
const app = createTestingApp();
const client = createLocalRPCClient(
createRPCEndpoint(schemas, implementation),
app
);
const result = await client.greet({ name: 'World' });
expect(result.message).toBe('Hello, World!');
});
});
Package Exports
The package exports the following:
Main Entry Point
// Main entry point - exports RpcService
import RpcService from '@tokenring-ai/rpc';
Sub-path Exports
| Export Path | Description |
|---|---|
@tokenring-ai/rpc | Main entry point, exports RpcService |
@tokenring-ai/rpc/createRPCEndpoint | Helper function to create type-safe RPC endpoints from schemas and implementations |
@tokenring-ai/rpc/createLocalRPCClient | Creates an RPC client for direct in-process endpoint calls |
@tokenring-ai/rpc/types | All type definitions (RPCSchema, RPCImplementation, RpcMethod, RpcEndpoint, etc.) |
@tokenring-ai/rpc/plugin | Token Ring plugin that registers the RpcService automatically |
Plugin Export
The package also exports a plugin that automatically registers the RpcService:
import rpcPlugin from '@tokenring-ai/rpc/plugin';
app.install(rpcPlugin);
This plugin requires no configuration and automatically adds the RpcService to the application.
Related Components
- @tokenring-ai/app: Base application framework for service management and plugin architecture
- @tokenring-ai/web-host: Provides HTTP/WebSocket endpoints for RPC resources
- @tokenring-ai/utility: Shared utilities including KeyedRegistry implementation
- @tokenring-ai/rpc/createLocalRPCClient: For direct in-process endpoint calls
License
MIT License - see LICENSE file for details.