Embeddings / RAG

Embedding API Throttling

Bulk indexing jobs hit provider rate limits and fail halfway through. Throttle your pipeline to just below the cap so every job completes without 429 errors.

Before & After

Without RLAAS

Indexing Job Fails at Document 38,000 with a 429

  • Pipeline calls the embedding API as fast as possible — hits 3,000 RPM cap
  • Provider returns 429; job crashes mid-run and leaves the index half-updated
  • Exponential back-off is implemented inconsistently across services
  • Concurrent indexers don't know about each other's usage
# ✗ Full speed — guaranteed 429 on large corpora async def index_documents(docs: list[str]): for doc in docs: embedding = await embed(doc) # no throttle — 429 mid-run await vector_store.upsert(embedding)
With RLAAS

Throttle to Just Below the Cap — Job Completes First Time

  • RLAAS enforces 2,800 RPM across all indexers — safely below the 3,000 cap
  • Multiple concurrent indexers share the same RLAAS counter automatically
  • On deny: sleep for retry_after seconds and continue — zero restarts
  • Adjust the limit live to match provider tier upgrades
# ✓ RLAAS throttles across all indexers sharing the counter async def index_documents(docs: list[str]): for doc in docs: while True: decision = client.check(CheckRequest( user_id="indexer-service", resource="embed:openai" )) if decision.allowed: break await asyncio.sleep(decision.retry_after) embedding = await embed(doc) await vector_store.upsert(embedding)

How It Works

Policy Configuration

# Shared counter across all indexer instances
{
  "id": "embed-openai-rpm",
  "resource": "embed:openai",
  "algorithm": "sliding_window_counter",
  "config": {
    "limit": 2800,
    "window_seconds": 60
  },
  "action_deny": "reject",
  "metadata": {
    "description": "2,800 RPM — below 3k provider cap"
  }
}

Request Flow

  1. Spin up multiple indexer workers — each calls RLAAS before embedding
  2. RLAAS shares one counter via Redis across all instances — total RPM capped globally
  3. If denied — sleep for retry_after seconds and retry; no crash, no restart
  4. Job completes — all documents indexed, provider never sees a 429
  5. Tier upgrade? — PATCH the limit to 5,800 and workers speed up immediately

SDK Examples

Throttle embedding calls across distributed workers with a shared sliding window.

// index documents — throttled to 2,800 RPM across all workers func (idx *Indexer) IndexDocuments(ctx context.Context, docs []string) error { for _, doc := range docs { for { decision, err := idx.rlaas.Check(ctx, &rlaas.CheckRequest{ UserID: "indexer-service", Resource: "embed:openai", }) if err != nil { return err } if decision.Allowed { break } select { case <-ctx.Done(): return ctx.Err() case <-time.After(decision.RetryAfter): } } emb, err := idx.Embed(ctx, doc) if err != nil { return err } if err := idx.store.Upsert(ctx, emb); err != nil { return err } } return nil }
# Shared 2,800 RPM limit across all indexer processes import asyncio from rlaas_sdk import RlaasClient, CheckRequest client = RlaasClient(base_url="http://rlaas:8080") async def index_documents(docs: list[str]): for doc in docs: while True: decision = client.check(CheckRequest( user_id="indexer-service", resource="embed:openai", )) if decision.allowed: break await asyncio.sleep(decision.retry_after) embedding = await embed(doc) await vector_store.upsert(embedding)
// Shared 2,800 RPM — safe below provider 3k cap async function indexDocuments(docs: string[]) { for (const doc of docs) { while (true) { const decision = await rlaas.check({ userId: 'indexer-service', resource: 'embed:openai', }); if (decision.allowed) break; await new Promise(res => setTimeout(res, decision.retryAfter * 1000) ); } const embedding = await embed(doc); await vectorStore.upsert(embedding); } }
// Shared 2,800 RPM limit across all indexer instances import io.rlaas.sdk.RlaasClient; import io.rlaas.sdk.model.*; RlaasClient rlaas = new RlaasClient("http://rlaas:8080"); void indexDocuments(List<String> docs) throws Exception { for (String doc : docs) { while (true) { Decision decision = rlaas.checkLimit( new CheckRequest("indexer-service", "embed:openai")); if (decision.isAllowed()) break; Thread.sleep((long) (decision.getRetryAfter() * 1000)); } var embedding = embed(doc); vectorStore.upsert(embedding); } }
// Shared 2,800 RPM limit across all indexer instances using Rlaas.Sdk; using Rlaas.Sdk.Models; var rlaas = new RlaasClient("http://rlaas:8080"); async Task IndexDocumentsAsync(List<string> docs) { foreach (var doc in docs) { while (true) { var decision = await rlaas.CheckLimitAsync( new CheckRequest("indexer-service", "embed:openai")); if (decision.Allowed) break; await Task.Delay(TimeSpan.FromSeconds(decision.RetryAfter)); } var embedding = await EmbedAsync(doc); await vectorStore.UpsertAsync(embedding); } }
// embedding rate limiter (Node.js) const { RlaasClient } = require('@rlaas/node-sdk'); const client = new RlaasClient('http://rlaas:8080'); async function embedWithLimit(chunks) { for (const chunk of chunks) { let decision = await client.check({ resource: 'ai:embeddings', tokens: chunk.token_count, }); while (!decision.allowed) { await sleep(decision.retry_after * 1000); decision = await client.check({ resource: 'ai:embeddings', tokens: chunk.token_count }); } const vec = await embed(chunk.text); await vectorStore.upsert(chunk.id, vec); } }
// embedding rate limiter (C++) #include "rlaas/client.h" rlaas::Client client("http://rlaas:8080"); void embed_with_limit(const std::vector<Chunk>& chunks) { for (const auto& chunk : chunks) { rlaas::CheckRequest req; req.resource = "ai:embeddings"; req.tokens = chunk.token_count; auto decision = client.check(req); while (!decision.allowed) { sleep_ms(decision.retry_after_ms); decision = client.check(req); } auto vec = embed(chunk.text); vector_store.upsert(chunk.id, vec); } }
// embedding rate limiter (Rust) use rlaas_sdk::{Client, CheckRequest}; let client = Client::new("http://rlaas:8080"); async fn embed_with_limit(client: &Client, chunks: &[Chunk]) -> Result<()> { for chunk in chunks { let mut decision = client.check(&CheckRequest { resource: "ai:embeddings".into(), tokens: Some(chunk.token_count), ..Default::default() }).await?; while !decision.allowed { tokio::time::sleep(decision.retry_after).await; decision = client.check(&req).await?; } let vec = embed(&chunk.text).await?; vector_store.upsert(&chunk.id, &vec).await?; } Ok(()) }
# embedding rate limiter (Ruby) require 'rlaas_sdk' client = Rlaas::Client.new('http://rlaas:8080') def embed_with_limit(chunks) chunks.each do |chunk| decision = client.check( resource: 'ai:embeddings', tokens: chunk.token_count ) until decision.allowed sleep(decision.retry_after) decision = client.check(resource: 'ai:embeddings', tokens: chunk.token_count) end vec = embed(chunk.text) vector_store.upsert(chunk.id, vec) end end