Skip to main content
New protocol version released: This page may contain outdated information.
Optimize your HAIP applications with intelligent flow control mechanisms. The SDK’s credit-based system prevents message loss, manages back-pressure, and ensures optimal performance under varying load conditions.

Flow Control Overview

What is Flow Control?

Flow control is a mechanism that regulates the rate of message transmission between client and server to:
  • Prevent message loss due to buffer overflow
  • Ensure fair resource allocation
  • Maintain system stability under load
  • Provide back-pressure feedback

Credit-Based System

HAIP uses a credit-based flow control system where:
  • Message Credits: Control how many messages can be sent
  • Byte Credits: Control how much data can be sent
  • Channel-Specific: Each channel has independent credit pools
  • Dynamic Updates: Credits are replenished based on server capacity

Flow Control Configuration

Basic Configuration

import { createHAIPClient } from "haip-sdk";

const client = createHAIPClient({
  url: "ws://localhost:8080",
  token: "your-jwt-token",
  transport: "websocket",
  flowControl: {
    initialCredits: 10, // Initial message credits per channel
    initialCreditBytes: 1024 * 1024, // Initial byte credits (1MB)
    maxCredits: 100, // Maximum credits per channel
    maxCreditBytes: 10 * 1024 * 1024, // Maximum byte credits (10MB)
  },
});

Advanced Configuration

const client = createHAIPClient({
  url: "ws://localhost:8080",
  token: "your-jwt-token",
  transport: "websocket",
  flowControl: {
    initialCredits: {
      USER: 20,
      AGENT: 10,
      SYSTEM: 5,
    },
    initialCreditBytes: {
      USER: 2 * 1024 * 1024, // 2MB for user messages
      AGENT: 5 * 1024 * 1024, // 5MB for agent responses
      SYSTEM: 1024 * 1024, // 1MB for system messages
    },
    maxCredits: {
      USER: 50,
      AGENT: 25,
      SYSTEM: 10,
    },
    maxCreditBytes: {
      USER: 10 * 1024 * 1024,
      AGENT: 20 * 1024 * 1024,
      SYSTEM: 5 * 1024 * 1024,
    },
  },
});

Flow Control State

Checking Current State

// Get current flow control state
const state = client.getConnectionState();

console.log("Flow control state:", {
  userCredits: state.credits.get("USER"),
  agentCredits: state.credits.get("AGENT"),
  systemCredits: state.credits.get("SYSTEM"),
  userBytes: state.byteCredits.get("USER"),
  agentBytes: state.byteCredits.get("AGENT"),
  systemBytes: state.byteCredits.get("SYSTEM"),
});

Monitoring Flow Control

class FlowControlMonitor {
  constructor(private client: any) {
    this.setupMonitoring();
  }

  private setupMonitoring() {
    // Monitor flow control updates
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        this.handleFlowUpdate(message);
      }
    });

    // Periodic state check
    setInterval(() => {
      this.logFlowState();
    }, 5000);
  }

  private handleFlowUpdate(message: HAIPMessage) {
    const { channel, add_messages, add_bytes } = message.payload;
    console.log(`Flow update for ${channel}:`, {
      messageCredits: add_messages,
      byteCredits: add_bytes,
    });
  }

  private logFlowState() {
    const state = this.client.getConnectionState();
    console.log("Current flow state:", {
      user: {
        messages: state.credits.get("USER"),
        bytes: state.byteCredits.get("USER"),
      },
      agent: {
        messages: state.credits.get("AGENT"),
        bytes: state.byteCredits.get("AGENT"),
      },
      system: {
        messages: state.credits.get("SYSTEM"),
        bytes: state.byteCredits.get("SYSTEM"),
      },
    });
  }
}

// Usage
const monitor = new FlowControlMonitor(client);

Sending Flow Updates

Requesting More Credits

// Request more message credits
await client.sendFlowUpdate("USER", 10); // Request 10 more message credits

// Request more byte credits
await client.sendFlowUpdate("USER", 0, 1024 * 1024); // Request 1MB more byte credits

// Request both
await client.sendFlowUpdate("USER", 10, 1024 * 1024); // Request both message and byte credits

Automatic Credit Management

class AutomaticFlowControl {
  private thresholds = {
    messageThreshold: 5, // Request more when below 5 message credits
    byteThreshold: 512 * 1024, // Request more when below 512KB byte credits
  };

  constructor(private client: any) {
    this.setupAutomaticControl();
  }

  private setupAutomaticControl() {
    // Check flow control before sending messages
    const originalSendTextMessage = this.client.sendTextMessage.bind(
      this.client
    );

    this.client.sendTextMessage = async (
      channel: string,
      text: string,
      author?: string,
      runId?: string
    ) => {
      await this.ensureCredits(channel);
      return originalSendTextMessage(channel, text, author, runId);
    };
  }

  private async ensureCredits(channel: string) {
    const state = this.client.getConnectionState();
    const messageCredits = state.credits.get(channel) || 0;
    const byteCredits = state.byteCredits.get(channel) || 0;

    // Request more message credits if needed
    if (messageCredits < this.thresholds.messageThreshold) {
      const needed = Math.max(
        10,
        this.thresholds.messageThreshold - messageCredits
      );
      await this.client.sendFlowUpdate(channel, needed);
    }

    // Request more byte credits if needed
    if (byteCredits < this.thresholds.byteThreshold) {
      const needed = Math.max(
        1024 * 1024,
        this.thresholds.byteThreshold - byteCredits
      );
      await this.client.sendFlowUpdate(channel, 0, needed);
    }
  }
}

// Usage
const flowControl = new AutomaticFlowControl(client);

Channel Control

Pausing and Resuming Channels

// Pause message flow on a channel
await client.pauseChannel("USER");

// Resume message flow on a channel
await client.resumeChannel("USER");

Channel State Management

class ChannelManager {
  private pausedChannels = new Set<string>();

  constructor(private client: any) {
    this.setupChannelHandling();
  }

  private setupChannelHandling() {
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "PAUSE_CHANNEL") {
        this.pausedChannels.add(message.payload.channel);
        console.log(`Channel ${message.payload.channel} paused`);
      } else if (message.type === "RESUME_CHANNEL") {
        this.pausedChannels.delete(message.payload.channel);
        console.log(`Channel ${message.payload.channel} resumed`);
      }
    });
  }

  isChannelPaused(channel: string): boolean {
    return this.pausedChannels.has(channel);
  }

  async sendWithChannelCheck(channel: string, messageFn: () => Promise<void>) {
    if (this.isChannelPaused(channel)) {
      console.log(`Channel ${channel} is paused, queuing message`);
      // Queue the message for later
      this.queueMessage(channel, messageFn);
      return;
    }

    await messageFn();
  }

  private queueMessage(channel: string, messageFn: () => Promise<void>) {
    // Implementation for message queuing
    // This would store the message and send it when the channel resumes
  }
}

// Usage
const channelManager = new ChannelManager(client);

// Send message with channel check
await channelManager.sendWithChannelCheck("USER", async () => {
  await client.sendTextMessage("USER", "Hello", "user", runId);
});

Message Queuing

Credit-Aware Message Queue

class CreditAwareQueue {
  private queues = new Map<string, Array<() => Promise<void>>>();
  private processing = new Map<string, boolean>();

  constructor(private client: any) {
    this.setupFlowControlHandling();
  }

  private setupFlowControlHandling() {
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        const { channel, add_messages, add_bytes } = message.payload;
        if (add_messages > 0 || add_bytes > 0) {
          this.processQueue(channel);
        }
      }
    });
  }

  async enqueue(channel: string, messageFn: () => Promise<void>) {
    if (!this.queues.has(channel)) {
      this.queues.set(channel, []);
    }

    this.queues.get(channel)!.push(messageFn);
    await this.processQueue(channel);
  }

  private async processQueue(channel: string) {
    if (this.processing.get(channel)) {
      return; // Already processing
    }

    this.processing.set(channel, true);

    try {
      const queue = this.queues.get(channel) || [];

      while (queue.length > 0) {
        const state = this.client.getConnectionState();
        const messageCredits = state.credits.get(channel) || 0;
        const byteCredits = state.byteCredits.get(channel) || 0;

        // Check if we have credits to send
        if (messageCredits <= 0 || byteCredits <= 0) {
          break; // Wait for more credits
        }

        const messageFn = queue.shift();
        if (messageFn) {
          try {
            await messageFn();
          } catch (error) {
            console.error(
              `Failed to send queued message on ${channel}:`,
              error
            );
          }
        }
      }
    } finally {
      this.processing.set(channel, false);
    }
  }

  getQueueLength(channel: string): number {
    return this.queues.get(channel)?.length || 0;
  }

  clearQueue(channel: string) {
    this.queues.set(channel, []);
  }
}

// Usage
const queue = new CreditAwareQueue(client);

// Queue messages
await queue.enqueue("USER", async () => {
  await client.sendTextMessage("USER", "Message 1", "user", runId);
});

await queue.enqueue("USER", async () => {
  await client.sendTextMessage("USER", "Message 2", "user", runId);
});

// Check queue status
console.log("User channel queue length:", queue.getQueueLength("USER"));

Flow Control Strategies

Conservative Strategy

class ConservativeFlowControl {
  constructor(private client: any) {
    this.setupConservativeControl();
  }

  private setupConservativeControl() {
    // Request credits early
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        const { channel, add_messages, add_bytes } = message.payload;

        // If we received credits, request more when we're at 50%
        if (add_messages > 0) {
          setTimeout(async () => {
            const state = this.client.getConnectionState();
            const currentCredits = state.credits.get(channel) || 0;

            if (currentCredits <= add_messages / 2) {
              await this.client.sendFlowUpdate(channel, add_messages);
            }
          }, 1000);
        }
      }
    });
  }
}

Aggressive Strategy

class AggressiveFlowControl {
  constructor(private client: any) {
    this.setupAggressiveControl();
  }

  private setupAggressiveControl() {
    // Request maximum credits immediately
    this.client.on("connect", async () => {
      const channels = ["USER", "AGENT", "SYSTEM"];

      for (const channel of channels) {
        await this.client.sendFlowUpdate(channel, 100, 10 * 1024 * 1024);
      }
    });
  }
}

Adaptive Strategy

class AdaptiveFlowControl {
  private messageRates = new Map<string, number>();
  private lastRequestTime = new Map<string, number>();

  constructor(private client: any) {
    this.setupAdaptiveControl();
  }

  private setupAdaptiveControl() {
    // Track message rates
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "MESSAGE_START") {
        const channel = message.payload.channel;
        const now = Date.now();
        const rate = this.messageRates.get(channel) || 0;

        // Calculate new rate (exponential moving average)
        this.messageRates.set(channel, rate * 0.9 + 0.1);
      }
    });

    // Adaptive credit requests
    setInterval(() => {
      this.requestAdaptiveCredits();
    }, 5000);
  }

  private async requestAdaptiveCredits() {
    const channels = ["USER", "AGENT", "SYSTEM"];

    for (const channel of channels) {
      const rate = this.messageRates.get(channel) || 0;
      const lastRequest = this.lastRequestTime.get(channel) || 0;
      const now = Date.now();

      // Request more credits if rate is high and we haven't requested recently
      if (rate > 0.5 && now - lastRequest > 10000) {
        const credits = Math.min(50, Math.floor(rate * 100));
        await this.client.sendFlowUpdate(channel, credits);
        this.lastRequestTime.set(channel, now);
      }
    }
  }
}

Flow Control Testing

Mock Flow Control Testing

class MockFlowControl {
  private credits = new Map<string, number>();
  private byteCredits = new Map<string, number>();

  constructor() {
    // Initialize with default credits
    ["USER", "AGENT", "SYSTEM"].forEach((channel) => {
      this.credits.set(channel, 10);
      this.byteCredits.set(channel, 1024 * 1024);
    });
  }

  consumeCredits(
    channel: string,
    messageCredits: number = 1,
    byteCredits: number = 0
  ): boolean {
    const currentCredits = this.credits.get(channel) || 0;
    const currentByteCredits = this.byteCredits.get(channel) || 0;

    if (currentCredits >= messageCredits && currentByteCredits >= byteCredits) {
      this.credits.set(channel, currentCredits - messageCredits);
      this.byteCredits.set(channel, currentByteCredits - byteCredits);
      return true;
    }

    return false;
  }

  addCredits(
    channel: string,
    messageCredits: number = 0,
    byteCredits: number = 0
  ) {
    const currentCredits = this.credits.get(channel) || 0;
    const currentByteCredits = this.byteCredits.get(channel) || 0;

    this.credits.set(channel, currentCredits + messageCredits);
    this.byteCredits.set(channel, currentByteCredits + byteCredits);
  }

  getCredits(channel: string): { messages: number; bytes: number } {
    return {
      messages: this.credits.get(channel) || 0,
      bytes: this.byteCredits.get(channel) || 0,
    };
  }
}

// Usage in tests
const mockFlowControl = new MockFlowControl();

// Test credit consumption
const canSend = mockFlowControl.consumeCredits("USER", 1, 100);
expect(canSend).toBe(true);

// Test credit exhaustion
for (let i = 0; i < 10; i++) {
  mockFlowControl.consumeCredits("USER", 1);
}

const canSendMore = mockFlowControl.consumeCredits("USER", 1);
expect(canSendMore).toBe(false);

Flow Control Integration Testing

describe("Flow Control", () => {
  test("should respect credit limits", async () => {
    const client = createHAIPClient({
      url: "ws://localhost:8080",
      token: "test-token",
      transport: "websocket",
      flowControl: {
        initialCredits: 2,
        initialCreditBytes: 1024,
      },
    });

    // Mock flow control state
    const mockState = {
      credits: new Map([["USER", 2]]),
      byteCredits: new Map([["USER", 1024]]),
    };

    jest.spyOn(client, "getConnectionState").mockReturnValue(mockState);

    // Should be able to send 2 messages
    await client.sendTextMessage("USER", "Message 1", "user", "run-1");
    await client.sendTextMessage("USER", "Message 2", "user", "run-1");

    // Third message should fail or be queued
    await expect(
      client.sendTextMessage("USER", "Message 3", "user", "run-1")
    ).rejects.toThrow("FLOW_CONTROL");
  });
});

Performance Monitoring

Flow Control Metrics

class FlowControlMetrics {
  private metrics = {
    creditRequests: 0,
    creditReceived: 0,
    messagesBlocked: 0,
    averageWaitTime: 0,
    totalWaitTime: 0,
  };

  recordCreditRequest() {
    this.metrics.creditRequests++;
  }

  recordCreditReceived(amount: number) {
    this.metrics.creditReceived += amount;
  }

  recordMessageBlocked(waitTime: number) {
    this.metrics.messagesBlocked++;
    this.metrics.totalWaitTime += waitTime;
    this.metrics.averageWaitTime =
      this.metrics.totalWaitTime / this.metrics.messagesBlocked;
  }

  getMetrics() {
    return {
      ...this.metrics,
      creditUtilization:
        this.metrics.creditReceived > 0
          ? (this.metrics.creditRequests / this.metrics.creditReceived) * 100
          : 0,
    };
  }

  reset() {
    this.metrics = {
      creditRequests: 0,
      creditReceived: 0,
      messagesBlocked: 0,
      averageWaitTime: 0,
      totalWaitTime: 0,
    };
  }
}

// Usage
const flowMetrics = new FlowControlMetrics();

// Monitor flow control performance
setInterval(() => {
  const metrics = flowMetrics.getMetrics();
  console.log("Flow control metrics:", metrics);
}, 10000);

Next Steps

Client API

Learn about the client interface and methods.

Examples

See practical examples of flow control usage.

Error Handling

Understand error handling and recovery.

API Reference

Full API reference documentation.