New protocol version released: This page may contain outdated information.
Flow Control Overview
What is Flow Control?
Flow control is a mechanism that regulates the rate of message transmission between client and server to:- Prevent message loss due to buffer overflow
- Ensure fair resource allocation
- Maintain system stability under load
- Provide back-pressure feedback
Credit-Based System
HAIP uses a credit-based flow control system where:- Message Credits: Control how many messages can be sent
- Byte Credits: Control how much data can be sent
- Channel-Specific: Each channel has independent credit pools
- Dynamic Updates: Credits are replenished based on server capacity
Flow Control Configuration
Basic Configuration
import { createHAIPClient } from "haip-sdk";
const client = createHAIPClient({
url: "ws://localhost:8080",
token: "your-jwt-token",
transport: "websocket",
flowControl: {
initialCredits: 10, // Initial message credits per channel
initialCreditBytes: 1024 * 1024, // Initial byte credits (1MB)
maxCredits: 100, // Maximum credits per channel
maxCreditBytes: 10 * 1024 * 1024, // Maximum byte credits (10MB)
},
});
Advanced Configuration
const client = createHAIPClient({
url: "ws://localhost:8080",
token: "your-jwt-token",
transport: "websocket",
flowControl: {
initialCredits: {
USER: 20,
AGENT: 10,
SYSTEM: 5,
},
initialCreditBytes: {
USER: 2 * 1024 * 1024, // 2MB for user messages
AGENT: 5 * 1024 * 1024, // 5MB for agent responses
SYSTEM: 1024 * 1024, // 1MB for system messages
},
maxCredits: {
USER: 50,
AGENT: 25,
SYSTEM: 10,
},
maxCreditBytes: {
USER: 10 * 1024 * 1024,
AGENT: 20 * 1024 * 1024,
SYSTEM: 5 * 1024 * 1024,
},
},
});
Flow Control State
Checking Current State
// Get current flow control state
const state = client.getConnectionState();
console.log("Flow control state:", {
userCredits: state.credits.get("USER"),
agentCredits: state.credits.get("AGENT"),
systemCredits: state.credits.get("SYSTEM"),
userBytes: state.byteCredits.get("USER"),
agentBytes: state.byteCredits.get("AGENT"),
systemBytes: state.byteCredits.get("SYSTEM"),
});
Monitoring Flow Control
class FlowControlMonitor {
constructor(private client: any) {
this.setupMonitoring();
}
private setupMonitoring() {
// Monitor flow control updates
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "FLOW_UPDATE") {
this.handleFlowUpdate(message);
}
});
// Periodic state check
setInterval(() => {
this.logFlowState();
}, 5000);
}
private handleFlowUpdate(message: HAIPMessage) {
const { channel, add_messages, add_bytes } = message.payload;
console.log(`Flow update for ${channel}:`, {
messageCredits: add_messages,
byteCredits: add_bytes,
});
}
private logFlowState() {
const state = this.client.getConnectionState();
console.log("Current flow state:", {
user: {
messages: state.credits.get("USER"),
bytes: state.byteCredits.get("USER"),
},
agent: {
messages: state.credits.get("AGENT"),
bytes: state.byteCredits.get("AGENT"),
},
system: {
messages: state.credits.get("SYSTEM"),
bytes: state.byteCredits.get("SYSTEM"),
},
});
}
}
// Usage
const monitor = new FlowControlMonitor(client);
Sending Flow Updates
Requesting More Credits
// Request more message credits
await client.sendFlowUpdate("USER", 10); // Request 10 more message credits
// Request more byte credits
await client.sendFlowUpdate("USER", 0, 1024 * 1024); // Request 1MB more byte credits
// Request both
await client.sendFlowUpdate("USER", 10, 1024 * 1024); // Request both message and byte credits
Automatic Credit Management
class AutomaticFlowControl {
private thresholds = {
messageThreshold: 5, // Request more when below 5 message credits
byteThreshold: 512 * 1024, // Request more when below 512KB byte credits
};
constructor(private client: any) {
this.setupAutomaticControl();
}
private setupAutomaticControl() {
// Check flow control before sending messages
const originalSendTextMessage = this.client.sendTextMessage.bind(
this.client
);
this.client.sendTextMessage = async (
channel: string,
text: string,
author?: string,
runId?: string
) => {
await this.ensureCredits(channel);
return originalSendTextMessage(channel, text, author, runId);
};
}
private async ensureCredits(channel: string) {
const state = this.client.getConnectionState();
const messageCredits = state.credits.get(channel) || 0;
const byteCredits = state.byteCredits.get(channel) || 0;
// Request more message credits if needed
if (messageCredits < this.thresholds.messageThreshold) {
const needed = Math.max(
10,
this.thresholds.messageThreshold - messageCredits
);
await this.client.sendFlowUpdate(channel, needed);
}
// Request more byte credits if needed
if (byteCredits < this.thresholds.byteThreshold) {
const needed = Math.max(
1024 * 1024,
this.thresholds.byteThreshold - byteCredits
);
await this.client.sendFlowUpdate(channel, 0, needed);
}
}
}
// Usage
const flowControl = new AutomaticFlowControl(client);
Channel Control
Pausing and Resuming Channels
// Pause message flow on a channel
await client.pauseChannel("USER");
// Resume message flow on a channel
await client.resumeChannel("USER");
Channel State Management
class ChannelManager {
private pausedChannels = new Set<string>();
constructor(private client: any) {
this.setupChannelHandling();
}
private setupChannelHandling() {
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "PAUSE_CHANNEL") {
this.pausedChannels.add(message.payload.channel);
console.log(`Channel ${message.payload.channel} paused`);
} else if (message.type === "RESUME_CHANNEL") {
this.pausedChannels.delete(message.payload.channel);
console.log(`Channel ${message.payload.channel} resumed`);
}
});
}
isChannelPaused(channel: string): boolean {
return this.pausedChannels.has(channel);
}
async sendWithChannelCheck(channel: string, messageFn: () => Promise<void>) {
if (this.isChannelPaused(channel)) {
console.log(`Channel ${channel} is paused, queuing message`);
// Queue the message for later
this.queueMessage(channel, messageFn);
return;
}
await messageFn();
}
private queueMessage(channel: string, messageFn: () => Promise<void>) {
// Implementation for message queuing
// This would store the message and send it when the channel resumes
}
}
// Usage
const channelManager = new ChannelManager(client);
// Send message with channel check
await channelManager.sendWithChannelCheck("USER", async () => {
await client.sendTextMessage("USER", "Hello", "user", runId);
});
Message Queuing
Credit-Aware Message Queue
class CreditAwareQueue {
private queues = new Map<string, Array<() => Promise<void>>>();
private processing = new Map<string, boolean>();
constructor(private client: any) {
this.setupFlowControlHandling();
}
private setupFlowControlHandling() {
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "FLOW_UPDATE") {
const { channel, add_messages, add_bytes } = message.payload;
if (add_messages > 0 || add_bytes > 0) {
this.processQueue(channel);
}
}
});
}
async enqueue(channel: string, messageFn: () => Promise<void>) {
if (!this.queues.has(channel)) {
this.queues.set(channel, []);
}
this.queues.get(channel)!.push(messageFn);
await this.processQueue(channel);
}
private async processQueue(channel: string) {
if (this.processing.get(channel)) {
return; // Already processing
}
this.processing.set(channel, true);
try {
const queue = this.queues.get(channel) || [];
while (queue.length > 0) {
const state = this.client.getConnectionState();
const messageCredits = state.credits.get(channel) || 0;
const byteCredits = state.byteCredits.get(channel) || 0;
// Check if we have credits to send
if (messageCredits <= 0 || byteCredits <= 0) {
break; // Wait for more credits
}
const messageFn = queue.shift();
if (messageFn) {
try {
await messageFn();
} catch (error) {
console.error(
`Failed to send queued message on ${channel}:`,
error
);
}
}
}
} finally {
this.processing.set(channel, false);
}
}
getQueueLength(channel: string): number {
return this.queues.get(channel)?.length || 0;
}
clearQueue(channel: string) {
this.queues.set(channel, []);
}
}
// Usage
const queue = new CreditAwareQueue(client);
// Queue messages
await queue.enqueue("USER", async () => {
await client.sendTextMessage("USER", "Message 1", "user", runId);
});
await queue.enqueue("USER", async () => {
await client.sendTextMessage("USER", "Message 2", "user", runId);
});
// Check queue status
console.log("User channel queue length:", queue.getQueueLength("USER"));
Flow Control Strategies
Conservative Strategy
class ConservativeFlowControl {
constructor(private client: any) {
this.setupConservativeControl();
}
private setupConservativeControl() {
// Request credits early
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "FLOW_UPDATE") {
const { channel, add_messages, add_bytes } = message.payload;
// If we received credits, request more when we're at 50%
if (add_messages > 0) {
setTimeout(async () => {
const state = this.client.getConnectionState();
const currentCredits = state.credits.get(channel) || 0;
if (currentCredits <= add_messages / 2) {
await this.client.sendFlowUpdate(channel, add_messages);
}
}, 1000);
}
}
});
}
}
Aggressive Strategy
class AggressiveFlowControl {
constructor(private client: any) {
this.setupAggressiveControl();
}
private setupAggressiveControl() {
// Request maximum credits immediately
this.client.on("connect", async () => {
const channels = ["USER", "AGENT", "SYSTEM"];
for (const channel of channels) {
await this.client.sendFlowUpdate(channel, 100, 10 * 1024 * 1024);
}
});
}
}
Adaptive Strategy
class AdaptiveFlowControl {
private messageRates = new Map<string, number>();
private lastRequestTime = new Map<string, number>();
constructor(private client: any) {
this.setupAdaptiveControl();
}
private setupAdaptiveControl() {
// Track message rates
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "MESSAGE_START") {
const channel = message.payload.channel;
const now = Date.now();
const rate = this.messageRates.get(channel) || 0;
// Calculate new rate (exponential moving average)
this.messageRates.set(channel, rate * 0.9 + 0.1);
}
});
// Adaptive credit requests
setInterval(() => {
this.requestAdaptiveCredits();
}, 5000);
}
private async requestAdaptiveCredits() {
const channels = ["USER", "AGENT", "SYSTEM"];
for (const channel of channels) {
const rate = this.messageRates.get(channel) || 0;
const lastRequest = this.lastRequestTime.get(channel) || 0;
const now = Date.now();
// Request more credits if rate is high and we haven't requested recently
if (rate > 0.5 && now - lastRequest > 10000) {
const credits = Math.min(50, Math.floor(rate * 100));
await this.client.sendFlowUpdate(channel, credits);
this.lastRequestTime.set(channel, now);
}
}
}
}
Flow Control Testing
Mock Flow Control Testing
class MockFlowControl {
private credits = new Map<string, number>();
private byteCredits = new Map<string, number>();
constructor() {
// Initialize with default credits
["USER", "AGENT", "SYSTEM"].forEach((channel) => {
this.credits.set(channel, 10);
this.byteCredits.set(channel, 1024 * 1024);
});
}
consumeCredits(
channel: string,
messageCredits: number = 1,
byteCredits: number = 0
): boolean {
const currentCredits = this.credits.get(channel) || 0;
const currentByteCredits = this.byteCredits.get(channel) || 0;
if (currentCredits >= messageCredits && currentByteCredits >= byteCredits) {
this.credits.set(channel, currentCredits - messageCredits);
this.byteCredits.set(channel, currentByteCredits - byteCredits);
return true;
}
return false;
}
addCredits(
channel: string,
messageCredits: number = 0,
byteCredits: number = 0
) {
const currentCredits = this.credits.get(channel) || 0;
const currentByteCredits = this.byteCredits.get(channel) || 0;
this.credits.set(channel, currentCredits + messageCredits);
this.byteCredits.set(channel, currentByteCredits + byteCredits);
}
getCredits(channel: string): { messages: number; bytes: number } {
return {
messages: this.credits.get(channel) || 0,
bytes: this.byteCredits.get(channel) || 0,
};
}
}
// Usage in tests
const mockFlowControl = new MockFlowControl();
// Test credit consumption
const canSend = mockFlowControl.consumeCredits("USER", 1, 100);
expect(canSend).toBe(true);
// Test credit exhaustion
for (let i = 0; i < 10; i++) {
mockFlowControl.consumeCredits("USER", 1);
}
const canSendMore = mockFlowControl.consumeCredits("USER", 1);
expect(canSendMore).toBe(false);
Flow Control Integration Testing
describe("Flow Control", () => {
test("should respect credit limits", async () => {
const client = createHAIPClient({
url: "ws://localhost:8080",
token: "test-token",
transport: "websocket",
flowControl: {
initialCredits: 2,
initialCreditBytes: 1024,
},
});
// Mock flow control state
const mockState = {
credits: new Map([["USER", 2]]),
byteCredits: new Map([["USER", 1024]]),
};
jest.spyOn(client, "getConnectionState").mockReturnValue(mockState);
// Should be able to send 2 messages
await client.sendTextMessage("USER", "Message 1", "user", "run-1");
await client.sendTextMessage("USER", "Message 2", "user", "run-1");
// Third message should fail or be queued
await expect(
client.sendTextMessage("USER", "Message 3", "user", "run-1")
).rejects.toThrow("FLOW_CONTROL");
});
});
Performance Monitoring
Flow Control Metrics
class FlowControlMetrics {
private metrics = {
creditRequests: 0,
creditReceived: 0,
messagesBlocked: 0,
averageWaitTime: 0,
totalWaitTime: 0,
};
recordCreditRequest() {
this.metrics.creditRequests++;
}
recordCreditReceived(amount: number) {
this.metrics.creditReceived += amount;
}
recordMessageBlocked(waitTime: number) {
this.metrics.messagesBlocked++;
this.metrics.totalWaitTime += waitTime;
this.metrics.averageWaitTime =
this.metrics.totalWaitTime / this.metrics.messagesBlocked;
}
getMetrics() {
return {
...this.metrics,
creditUtilization:
this.metrics.creditReceived > 0
? (this.metrics.creditRequests / this.metrics.creditReceived) * 100
: 0,
};
}
reset() {
this.metrics = {
creditRequests: 0,
creditReceived: 0,
messagesBlocked: 0,
averageWaitTime: 0,
totalWaitTime: 0,
};
}
}
// Usage
const flowMetrics = new FlowControlMetrics();
// Monitor flow control performance
setInterval(() => {
const metrics = flowMetrics.getMetrics();
console.log("Flow control metrics:", metrics);
}, 10000);
Next Steps
Client API
Learn about the client interface and methods.
Examples
See practical examples of flow control usage.
Error Handling
Understand error handling and recovery.
API Reference
Full API reference documentation.