Streaming

Streaming Token Accounting

Token counts for streamed responses are only known after the stream ends. Use a two-phase deduction — pre-check on input, deduct actual output after the stream completes — for safe, accurate limits.

Before & After

Without RLAAS

Can't Enforce Token Limits on Streamed Responses

  • Token count is unknown until the stream finishes — limits enforced too late or not at all
  • Cutting off a stream mid-way leaves the user with truncated, broken output
  • Usage tracking happens in multiple places and drifts out of sync
  • No shared counter — users can bypass per-user limits by opening parallel streams
# ✗ No pre-check — cannot enforce limits on streams async def stream_response(user_id: str, prompt: str): # Can't check limit here — output tokens unknown until end stream = await openai.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) async for chunk in stream: yield chunk.choices[0].delta.content
With RLAAS

Two-Phase Deduction — Fail-Open for Stream Safety

  • Pre-check on estimated input tokens — block the request before the stream starts
  • Stream proceeds; count output tokens from SSE chunks as they arrive
  • Deduct actual total after the stream completes — precise and non-blocking
  • Parallel streams share one RLAAS counter — impossible to bypass the limit
# ✓ Pre-check on input; deduct actual output after stream ends async def stream_response(user_id: str, prompt: str): input_tokens = count_tokens(prompt) decision = client.check(CheckRequest( user_id=user_id, resource="llm:tokens", cost=input_tokens )) if not decision.allowed: raise TokenBudgetExceeded(retry_after=decision.retry_after) output_tokens = 0 stream = await openai.chat.completions.create(..., stream=True) async for chunk in stream: delta = chunk.choices[0].delta.content or "" output_tokens += count_tokens(delta) yield delta client.record(RecordRequest( user_id=user_id, resource="llm:tokens", units=output_tokens ))

How It Works

Policy Configuration

# Daily token budget per user
{
  "id": "llm-tokens-daily",
  "resource": "llm:tokens",
  "algorithm": "quota",
  "config": {
    "limit": 100000,
    "window_seconds": 86400
  },
  "action_deny": "reject",
  "metadata": {
    "description": "100k tokens/day per user"
  }
}

Two-Phase Flow

  1. Count input tokens — estimate prompt tokens before calling the LLM
  2. Pre-check RLAAS — pass cost: inputTokens; deny early if budget is exhausted
  3. Open the stream — stream starts only after the pre-check passes
  4. Count output chunks — accumulate output token counts as chunks arrive
  5. Record after stream — call Record with actual output tokens; RLAAS deducts from the bucket

SDK Examples

Pre-check on input tokens, stream with chunk counting, deduct actual output after completion.

// two-phase token accounting for streaming responses func (s *LLMService) StreamCompletion(ctx context.Context, userID, prompt string, out io.Writer) error { inputTokens := CountTokens(prompt) decision, err := s.rlaas.Check(ctx, &rlaas.CheckRequest{ UserID: userID, Resource: "llm:tokens", Cost: int64(inputTokens), }) if err != nil { return err } if !decision.Allowed { return ErrTokenBudgetExceeded } stream, err := s.openai.CreateChatCompletionStream(ctx, ...) if err != nil { return err } defer stream.Close() var outputTokens int for { chunk, err := stream.Recv() if errors.Is(err, io.EOF) { break } if err != nil { return err } delta := chunk.Choices[0].Delta.Content outputTokens += CountTokens(delta) fmt.Fprint(out, delta) } _ = s.rlaas.Record(ctx, &rlaas.RecordRequest{ UserID: userID, Resource: "llm:tokens", Units: int64(outputTokens), }) return nil }
import tiktoken enc = tiktoken.encoding_for_model("gpt-4o") async def stream_response(user_id: str, prompt: str): # Phase 1 — pre-check on input tokens input_tokens = len(enc.encode(prompt)) decision = client.check(CheckRequest( user_id=user_id, resource="llm:tokens", cost=input_tokens )) if not decision.allowed: raise TokenBudgetExceeded(retry_after=decision.retry_after) # Phase 2 — stream and count output output_tokens = 0 stream = await openai.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True, ) async for chunk in stream: delta = chunk.choices[0].delta.content or "" output_tokens += len(enc.encode(delta)) yield delta # Phase 3 — deduct actual output tokens client.record(RecordRequest( user_id=user_id, resource="llm:tokens", units=output_tokens ))
import { encode } from 'gpt-tokenizer'; async function* streamCompletion(userId: string, prompt: string) { // Phase 1 — pre-check on input tokens const inputTokens = encode(prompt).length; const decision = await rlaas.check({ userId, resource: 'llm:tokens', cost: inputTokens, }); if (!decision.allowed) { throw new TokenBudgetExceeded({ userId, retryAfter: decision.retryAfter }); } // Phase 2 — stream and count output tokens let outputTokens = 0; const stream = await openai.chat.completions.create({ model: 'gpt-4o', messages: [{ role: 'user', content: prompt }], stream: true, }); for await (const chunk of stream) { const delta = chunk.choices[0]?.delta?.content ?? ''; outputTokens += encode(delta).length; yield delta; } // Phase 3 — deduct actual output await rlaas.record({ userId, resource: 'llm:tokens', units: outputTokens }); }
// two-phase token accounting for streaming responses import io.rlaas.sdk.RlaasClient; import io.rlaas.sdk.model.*; RlaasClient rlaas = new RlaasClient("http://rlaas:8080"); String streamCompletion(String userId, String prompt, Writer out) throws Exception { int inputTokens = countTokens(prompt); // Phase 1 — pre-check input tokens Decision decision = rlaas.checkLimit( new CheckRequest(userId, "llm:tokens", inputTokens)); if (!decision.isAllowed()) throw new TokenBudgetExceededException(decision.getRetryAfter()); // Phase 2 — stream and count output var stream = openAi.createChatCompletionStream(...); int outputTokens = 0; for (var chunk : stream) { String delta = chunk.getChoices().get(0).getDelta().getContent(); outputTokens += countTokens(delta); out.write(delta); out.flush(); } // Phase 3 — deduct actual output tokens rlaas.record(new RecordRequest(userId, "llm:tokens", outputTokens)); return null; }
// two-phase token accounting for streaming responses using Rlaas.Sdk; using Rlaas.Sdk.Models; var rlaas = new RlaasClient("http://rlaas:8080"); async IAsyncEnumerable<string> StreamCompletionAsync(string userId, string prompt) { var inputTokens = CountTokens(prompt); // Phase 1 — pre-check input tokens var decision = await rlaas.CheckLimitAsync( new CheckRequest(userId, "llm:tokens", inputTokens)); if (!decision.Allowed) throw new TokenBudgetExceededException(decision.RetryAfter); // Phase 2 — stream and count output tokens var outputTokens = 0; await foreach (var chunk in openAi.StreamChatCompletionAsync(...)) { var delta = chunk.Choices[0].Delta.Content ?? ""; outputTokens += CountTokens(delta); yield return delta; } // Phase 3 — deduct actual output tokens await rlaas.RecordAsync( new RecordRequest(userId, "llm:tokens", outputTokens)); }
// streaming token accounting (Node.js) const { RlaasClient } = require('@rlaas/node-sdk'); const client = new RlaasClient('http://rlaas:8080'); async function streamWithAccounting(userId, messages) { const inputTokens = countTokens(messages); const decision = await client.check({ user_id: userId, resource: 'ai:tokens', tokens: inputTokens, }); if (!decision.allowed) throw new Error('Token budget exceeded'); let outputTokens = 0; const stream = await llm.chatStream(messages); for await (const chunk of stream) { outputTokens += chunk.token_count; emit(chunk.text); } await client.record({ user_id: userId, resource: 'ai:tokens', tokens: outputTokens }); }
// streaming token accounting (C++) #include "rlaas/client.h" rlaas::Client client("http://rlaas:8080"); void stream_with_accounting(const std::string& user_id, const Messages& messages) { int input_tokens = count_tokens(messages); rlaas::CheckRequest req; req.user_id = user_id; req.resource = "ai:tokens"; req.tokens = input_tokens; auto decision = client.check(req); if (!decision.allowed) throw TokenBudgetExceeded(); int output_tokens = 0; llm.chat_stream(messages, [&](const Chunk& c) { output_tokens += c.token_count; emit(c.text); }); client.record(user_id, "ai:tokens", output_tokens); }
// streaming token accounting (Rust) use rlaas_sdk::{Client, CheckRequest}; let client = Client::new("http://rlaas:8080"); async fn stream_with_accounting( client: &Client, user_id: &str, messages: &[Message], ) -> Result<()> { let input_tokens = count_tokens(messages); let decision = client.check(&CheckRequest { user_id: user_id.into(), resource: "ai:tokens".into(), tokens: Some(input_tokens), ..Default::default() }).await?; if !decision.allowed { return Err(anyhow!("Token budget exceeded")); } let mut output_tokens = 0; let mut stream = llm.chat_stream(messages).await?; while let Some(chunk) = stream.next().await { output_tokens += chunk.token_count; emit(&chunk.text); } client.record(user_id, "ai:tokens", output_tokens).await?; Ok(()) }
# streaming token accounting (Ruby) require 'rlaas_sdk' client = Rlaas::Client.new('http://rlaas:8080') def stream_with_accounting(user_id, messages) input_tokens = count_tokens(messages) decision = client.check( user_id: user_id, resource: 'ai:tokens', tokens: input_tokens ) raise TokenBudgetExceeded unless decision.allowed output_tokens = 0 llm.chat_stream(messages) do |chunk| output_tokens += chunk.token_count emit(chunk.text) end client.record(user_id: user_id, resource: 'ai:tokens', tokens: output_tokens) end

You've seen all 7 AI & ML scenarios.

Ready to add RLAAS to your stack? Deploy in minutes with Docker Compose.