Throttle embedding calls across distributed workers with a shared sliding window.
// index documents — throttled to 2,800 RPM across all workers
func (idx *Indexer) IndexDocuments(ctx context.Context, docs []string) error {
for _, doc := range docs {
for {
decision, err := idx.rlaas.Check(ctx, &rlaas.CheckRequest{
UserID: "indexer-service",
Resource: "embed:openai",
})
if err != nil { return err }
if decision.Allowed { break }
select {
case <-ctx.Done(): return ctx.Err()
case <-time.After(decision.RetryAfter):
}
}
emb, err := idx.Embed(ctx, doc)
if err != nil { return err }
if err := idx.store.Upsert(ctx, emb); err != nil { return err }
}
return nil
}
# Shared 2,800 RPM limit across all indexer processes
import asyncio
from rlaas_sdk import RlaasClient, CheckRequest
client = RlaasClient(base_url="http://rlaas:8080")
async def index_documents(docs: list[str]):
for doc in docs:
while True:
decision = client.check(CheckRequest(
user_id="indexer-service",
resource="embed:openai",
))
if decision.allowed:
break
await asyncio.sleep(decision.retry_after)
embedding = await embed(doc)
await vector_store.upsert(embedding)
// Shared 2,800 RPM — safe below provider 3k cap
async function indexDocuments(docs: string[]) {
for (const doc of docs) {
while (true) {
const decision = await rlaas.check({
userId: 'indexer-service',
resource: 'embed:openai',
});
if (decision.allowed) break;
await new Promise(res =>
setTimeout(res, decision.retryAfter * 1000)
);
}
const embedding = await embed(doc);
await vectorStore.upsert(embedding);
}
}
// Shared 2,800 RPM limit across all indexer instances
import io.rlaas.sdk.RlaasClient;
import io.rlaas.sdk.model.*;
RlaasClient rlaas = new RlaasClient("http://rlaas:8080");
void indexDocuments(List<String> docs) throws Exception {
for (String doc : docs) {
while (true) {
Decision decision = rlaas.checkLimit(
new CheckRequest("indexer-service", "embed:openai"));
if (decision.isAllowed()) break;
Thread.sleep((long) (decision.getRetryAfter() * 1000));
}
var embedding = embed(doc);
vectorStore.upsert(embedding);
}
}
// Shared 2,800 RPM limit across all indexer instances
using Rlaas.Sdk;
using Rlaas.Sdk.Models;
var rlaas = new RlaasClient("http://rlaas:8080");
async Task IndexDocumentsAsync(List<string> docs)
{
foreach (var doc in docs)
{
while (true)
{
var decision = await rlaas.CheckLimitAsync(
new CheckRequest("indexer-service", "embed:openai"));
if (decision.Allowed) break;
await Task.Delay(TimeSpan.FromSeconds(decision.RetryAfter));
}
var embedding = await EmbedAsync(doc);
await vectorStore.UpsertAsync(embedding);
}
}
// embedding rate limiter (Node.js)
const { RlaasClient } = require('@rlaas/node-sdk');
const client = new RlaasClient('http://rlaas:8080');
async function embedWithLimit(chunks) {
for (const chunk of chunks) {
let decision = await client.check({
resource: 'ai:embeddings',
tokens: chunk.token_count,
});
while (!decision.allowed) {
await sleep(decision.retry_after * 1000);
decision = await client.check({
resource: 'ai:embeddings', tokens: chunk.token_count });
}
const vec = await embed(chunk.text);
await vectorStore.upsert(chunk.id, vec);
}
}
// embedding rate limiter (C++)
#include "rlaas/client.h"
rlaas::Client client("http://rlaas:8080");
void embed_with_limit(const std::vector<Chunk>& chunks) {
for (const auto& chunk : chunks) {
rlaas::CheckRequest req;
req.resource = "ai:embeddings";
req.tokens = chunk.token_count;
auto decision = client.check(req);
while (!decision.allowed) {
sleep_ms(decision.retry_after_ms);
decision = client.check(req);
}
auto vec = embed(chunk.text);
vector_store.upsert(chunk.id, vec);
}
}
// embedding rate limiter (Rust)
use rlaas_sdk::{Client, CheckRequest};
let client = Client::new("http://rlaas:8080");
async fn embed_with_limit(client: &Client, chunks: &[Chunk]) -> Result<()> {
for chunk in chunks {
let mut decision = client.check(&CheckRequest {
resource: "ai:embeddings".into(),
tokens: Some(chunk.token_count),
..Default::default()
}).await?;
while !decision.allowed {
tokio::time::sleep(decision.retry_after).await;
decision = client.check(&req).await?;
}
let vec = embed(&chunk.text).await?;
vector_store.upsert(&chunk.id, &vec).await?;
}
Ok(())
}
# embedding rate limiter (Ruby)
require 'rlaas_sdk'
client = Rlaas::Client.new('http://rlaas:8080')
def embed_with_limit(chunks)
chunks.each do |chunk|
decision = client.check(
resource: 'ai:embeddings',
tokens: chunk.token_count
)
until decision.allowed
sleep(decision.retry_after)
decision = client.check(resource: 'ai:embeddings',
tokens: chunk.token_count)
end
vec = embed(chunk.text)
vector_store.upsert(chunk.id, vec)
end
end