This website uses cookies

Read our Privacy policy and Terms of use for more information.

What we are building

Most RAG tutorials index a folder of PDFs once and call it done. Real systems don't work that way. Documents arrive continuously: a support ticket is filed, a wiki page is edited, a transcript lands. If your knowledge base only refreshes on a nightly batch job, your agent answers this afternoon's question with last night's data.

We're going to fix that. The pipeline has four moving parts. A producer writes document events onto an Amazon Kinesis Data Stream. A Lambda function, wired to the stream through an event source mapping, reads each batch, generates an embedding for every document with Amazon Titan Text Embeddings V2, and writes the vector into an Amazon S3 Vectors index. A query script embeds a question, runs a similarity search against that same index, and hands the top chunks to Amazon Nova to generate a grounded answer.

The non-obvious design choice is the vector store. We're using S3 Vectors, which went generally available in December 2025, instead of standing up an OpenSearch cluster. Writes to S3 Vectors are strongly consistent, so a vector is queryable the moment PutVectors returns. There is no index-refresh interval to wait out and no cluster to keep warm. That property is exactly what makes "searchable in seconds" achievable without paying for provisioned search nodes. Here is what the finished thing does: you run put_event.py "some new fact", wait a couple of seconds, run ask.py "question about that fact", and Nova answers using the document you just streamed in.

Prerequisites

You need an AWS account with console and CLI access, and permissions to create Kinesis streams, Lambda functions, IAM roles, and S3 vector buckets. You need the AWS CLI v2 configured (aws configure) and Python 3.11+ locally with boto3>=1.40 (S3 Vectors needs a recent boto3; check with python -c "import boto3; print(boto3.__version__)").

You must enable model access in Amazon Bedrock for two models in your Region: Amazon Titan Text Embeddings V2 (amazon.titan-embed-text-v2:0) and Amazon Nova Lite (amazon.nova-lite-v1:0). Do this once in the Bedrock console under Model access. This tutorial uses us-west-2; S3 Vectors is not in every Region yet, so if you pick another, confirm availability first.

You should be comfortable reading Python, know what a Lambda execution role is, and understand that an embedding is just a list of floats. You do not need any prior vector-database experience.

Setup

Set a few shell variables and create the working directory. Everything below assumes these are exported.

export AWS_REGION=us-west-2
export STREAM_NAME=doc-events
export VECTOR_BUCKET=sl-rag-vectors
export VECTOR_INDEX=docs
export FUNCTION_NAME=rag-indexer
mkdir sl91-realtime-rag && cd sl91-realtime-rag

Create the Kinesis stream in on-demand mode so you don't manage shards.

aws kinesis create-stream \
  --stream-name "$STREAM_NAME" \
  --stream-mode-details StreamMode=ON_DEMAND \
  --region "$AWS_REGION"
aws kinesis wait stream-exists --stream-name "$STREAM_NAME" --region "$AWS_REGION"
aws kinesis describe-stream-summary --stream-name "$STREAM_NAME" --region "$AWS_REGION" \
  --query 'StreamDescriptionSummary.StreamStatus'

When that prints "ACTIVE", the stream is ready. That is your setup smoke test. Leave the vector bucket and index for Step 1, because we'll create them in code so the whole thing is reproducible.

Step 1: Create the vector bucket and index

S3 Vectors introduces a new bucket type, the vector bucket, and a dedicated s3vectors boto3 client. Create the bucket and one index inside it.

# create_index.py
import os, boto3
region = os.environ["AWS_REGION"]
bucket = os.environ["VECTOR_BUCKET"]
index = os.environ["VECTOR_INDEX"]
s3v = boto3.client("s3vectors", region_name=region)
s3v.create_vector_bucket(vectorBucketName=bucket)
s3v.create_index(
    vectorBucketName=bucket,
    indexName=index,
    dataType="float32",
    dimension=1024,
    distanceMetric="cosine",
    metadataConfiguration={"nonFilterableMetadataKeys": ["source_text"]},
)
print(f"created {bucket}/{index}")

Run it with python create_index.py. Three parameters matter and none can be changed after creation, so get them right. dimension=1024 matches Titan Text Embeddings V2's default output size. distanceMetric="cosine" is the right metric for normalized text embeddings, which Titan produces by default. And nonFilterableMetadataKeys=["source_text"] tells S3 Vectors that we'll store the raw document text alongside each vector but never filter on it. That distinction is a cost and performance lever: filterable metadata is indexed for querying, non-filterable metadata is just carried along and returned. You want the original text back at query time to feed the LLM, but you never filter by its contents, so mark it non-filterable.

Step 2: The producer

The producer is deliberately trivial, because in a real system it's whatever already emits your documents: a webhook, a CDC stream, a message queue. Here it's a CLI that puts one document event onto Kinesis.

# put_event.py
import os, sys, json, uuid, time, boto3
kinesis = boto3.client("kinesis", region_name=os.environ["AWS_REGION"])
text = sys.argv[1]
doc_id = sys.argv[2] if len(sys.argv) > 2 else str(uuid.uuid4())
event = {
    "doc_id": doc_id,
    "text": text,
    "source": "cli",
    "ts": int(time.time()),
}
resp = kinesis.put_record(
    StreamName=os.environ["STREAM_NAME"],
    Data=json.dumps(event).encode("utf-8"),
    PartitionKey=doc_id,
)
print(f"put {doc_id} -> shard {resp['ShardId']} seq {resp['SequenceNumber'][:12]}...")

put_record takes the stream name, a Data payload of raw bytes, and a PartitionKey. The partition key decides which shard the record lands on; using doc_id spreads writes evenly and keeps updates to the same document ordered. We JSON-encode the event so the Lambda downstream gets structured fields, not just a blob of text. Don't run it yet, there is no consumer wired up. We build that next.

Step 3: The Lambda consumer

This is the core of the pipeline. Lambda receives a batch of Kinesis records, and for each one it decodes the event, calls Bedrock to embed the text, and writes the vector to S3 Vectors.

# handler.py
import os, json, base64, boto3
region = os.environ["AWS_REGION"]
bedrock = boto3.client("bedrock-runtime", region_name=region)
s3v = boto3.client("s3vectors", region_name=region)
BUCKET = os.environ["VECTOR_BUCKET"]
INDEX = os.environ["VECTOR_INDEX"]
def embed(text):
    resp = bedrock.invoke_model(
        modelId="amazon.titan-embed-text-v2:0",
        body=json.dumps({"inputText": text, "dimensions": 1024, "normalize": True}),
    )
    return json.loads(resp["body"].read())["embedding"]
def handler(event, _ctx):
    vectors = []
    for record in event["Records"]:
        payload = base64.b64decode(record["kinesis"]["data"])
        doc = json.loads(payload)
        vectors.append({
            "key": doc["doc_id"],
            "data": {"float32": embed(doc["text"])},
            "metadata": {
                "source": doc["source"],
                "ts": doc["ts"],
                "source_text": doc["text"],
            },
        })
    if vectors:
        s3v.put_vectors(vectorBucketName=BUCKET, indexName=INDEX, vectors=vectors)
    print(f"indexed {len(vectors)} vectors")
    return {"indexed": len(vectors)}

Three things are worth stopping on. First, Kinesis delivers the record data base64-encoded, so base64.b64decode(record["kinesis"]["data"]) is mandatory; forget it and you'll try to JSON-parse base64 and get a decode error. Second, we accumulate the batch into one put_vectors call rather than writing one vector at a time. S3 Vectors is happiest with batched writes, and one API call per batch keeps latency and cost down. Third, using doc["doc_id"] as the vector key means re-streaming the same document ID overwrites its vector instead of duplicating it. That gives you upsert semantics for free, which is what you want when a wiki page gets edited.

Package it with boto3 bundled, because Lambda's built-in boto3 may lag behind the S3 Vectors API.

pip install boto3 -t package/
cp handler.py package/
cd package && zip -r ../function.zip . >/dev/null && cd ..

Step 4: Create the role and wire the event source mapping

Lambda needs an execution role with three capabilities: read the Kinesis stream, invoke the Bedrock embedding model, and write to the vector index. Create the role and attach an inline policy.

ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
aws iam create-role --role-name rag-indexer-role \
  --assume-role-policy-document '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":"lambda.amazonaws.com"},"Action":"sts:AssumeRole"}]}'
aws iam put-role-policy --role-name rag-indexer-role --policy-name rag-indexer-policy \
  --policy-document "{\"Version\":\"2012-10-17\",\"Statement\":[
    {\"Effect\":\"Allow\",\"Action\":[\"kinesis:GetRecords\",\"kinesis:GetShardIterator\",\"kinesis:DescribeStream\",\"kinesis:ListShards\"],\"Resource\":\"arn:aws:kinesis:${AWS_REGION}:${ACCOUNT}:stream/${STREAM_NAME}\"},
    {\"Effect\":\"Allow\",\"Action\":[\"bedrock:InvokeModel\"],\"Resource\":\"arn:aws:bedrock:${AWS_REGION}::foundation-model/amazon.titan-embed-text-v2:0\"},
    {\"Effect\":\"Allow\",\"Action\":[\"s3vectors:PutVectors\"],\"Resource\":\"arn:aws:s3vectors:${AWS_REGION}:${ACCOUNT}:bucket/${VECTOR_BUCKET}/*\"},
    {\"Effect\":\"Allow\",\"Action\":[\"logs:CreateLogGroup\",\"logs:CreateLogStream\",\"logs:PutLogEvents\"],\"Resource\":\"*\"}
  ]}"

Now create the function and the event source mapping that connects the stream to it.

aws lambda create-function --function-name "$FUNCTION_NAME" \
  --runtime python3.12 --handler handler.handler \
  --role "arn:aws:iam::${ACCOUNT}:role/rag-indexer-role" \
  --timeout 60 --memory-size 512 \
  --environment "Variables={AWS_REGION=$AWS_REGION,VECTOR_BUCKET=$VECTOR_BUCKET,VECTOR_INDEX=$VECTOR_INDEX}" \
  --zip-file fileb://function.zip --region "$AWS_REGION"
STREAM_ARN=$(aws kinesis describe-stream-summary --stream-name "$STREAM_NAME" \
  --region "$AWS_REGION" --query 'StreamDescriptionSummary.StreamARN' --output text)
aws lambda create-event-source-mapping --function-name "$FUNCTION_NAME" \
  --event-source-arn "$STREAM_ARN" \
  --starting-position LATEST \
  --batch-size 10 --maximum-batching-window-in-seconds 2 \
  --region "$AWS_REGION"

The event source mapping is Lambda's poller: it reads the stream on your behalf and invokes the function with batches. --starting-position LATEST means it processes records that arrive after the mapping is created, which is what you want for a live feed. --batch-size 10 caps how many records come per invocation. The --maximum-batching-window-in-seconds 2 is the latency knob: Lambda waits up to 2 seconds to fill a batch before invoking. Raise it and you get bigger, cheaper batches at the cost of freshness. Note the AWS rule that if you set a batch size above 10 you must set a batching window of at least 1 second. Give the mapping a minute to reach Enabled state before testing.

Step 5: Query it - real-time RAG

Now the payoff. Embed a question with the same model, run a similarity search, and feed the retrieved text to Nova.

# ask.py
import os, sys, json, boto3
region = os.environ["AWS_REGION"]
bedrock = boto3.client("bedrock-runtime", region_name=region)
s3v = boto3.client("s3vectors", region_name=region)
question = sys.argv[1]
qvec = json.loads(bedrock.invoke_model(
    modelId="amazon.titan-embed-text-v2:0",
    body=json.dumps({"inputText": question, "dimensions": 1024, "normalize": True}),
)["body"].read())["embedding"]
hits = s3v.query_vectors(
    vectorBucketName=os.environ["VECTOR_BUCKET"],
    indexName=os.environ["VECTOR_INDEX"],
    queryVector={"float32": qvec},
    topK=3, returnMetadata=True, returnDistance=True,
)["vectors"]
context = "\n\n".join(h["metadata"]["source_text"] for h in hits)
prompt = f"Answer using only the context. If it is not there, say so.\n\nContext:\n{context}\n\nQuestion: {question}"
answer = bedrock.converse(
    modelId="amazon.nova-lite-v1:0",
    messages=[{"role": "user", "content": [{"text": prompt}]}],
)
print(answer["output"]["message"]["content"][0]["text"])

The retrieval half uses query_vectors with topK=3 to pull the three nearest documents, asking for metadata so we get the original source_text back. The generation half uses the Bedrock Converse API, the same unified interface we used in episode 1, so swapping Nova for Claude is a one-line model-ID change. The prompt pins the model to the retrieved context and tells it to admit when the answer isn't there, which is the cheapest hallucination guard you can add.

Verify it works

Stream a fact that no foundation model could know, then ask about it.

python put_event.py "Project Nimbus ships its v3 release on 2026-08-14 and drops Python 3.9 support."
sleep 4
python ask.py "When does Project Nimbus v3 ship and what does it drop?"

Expected output is something close to:

Project Nimbus v3 ships on 2026-08-14 and it drops support for Python 3.9.

The four-second sleep covers the batching window plus embedding and write time. Confirm the round trip in the logs:

aws logs tail /aws/lambda/rag-indexer --since 2m --region "$AWS_REGION"

You should see an indexed 1 vectors line. If the answer names the date and the dropped Python version, the whole pipeline is live: producer to stream to Lambda to Bedrock to S3 Vectors to query, end to end, in seconds.

When it breaks

ask.py says the info isn't in the context. Your query ran before the write landed. Increase the sleep, and check aws logs tail for the indexed line. If the log shows nothing, the event source mapping is not Enabled yet or is still on LATEST and your record predated it.

Lambda logs show a base64 or JSON decode error. You removed the base64.b64decode step, or the producer sent something that wasn't JSON. Kinesis always base64-encodes the record data; the decode is not optional.

AccessDeniedException on bedrock:InvokeModel. Either model access isn't enabled for Titan or Nova in this Region (fix it in the Bedrock console under Model access), or the role's Resource ARN doesn't match the model. The embedding model and Nova are separate grants.

ValidationException on put_vectors about dimension. Your index was created at a different dimension than the embedding. The index is fixed at 1024; make sure the Titan call also requests "dimensions": 1024. You cannot resize an index, so you'd delete and recreate it.

ResourceNotFoundException on the s3vectors client, or the client is missing entirely. Your boto3 is too old. Upgrade with pip install -U boto3 locally and confirm the bundled version in function.zip is recent.

Where to take it next

First, tee the raw stream to durable storage. Attach an Amazon Data Firehose delivery stream reading from the same Kinesis stream and landing every event in S3 as JSON. That gives you a replayable archive, so when you improve chunking or swap embedding models you can reindex history instead of losing it. Second, add real chunking: right now one event equals one vector, which breaks down for long documents. Split on token count before embedding and give each chunk a key like f"{doc_id}#0". Third, and this is episode 26, add a tiering strategy: keep the full corpus in S3 Vectors for cost, and export the hot, frequently queried slice to OpenSearch for single-digit-millisecond latency. S3 Vectors has a native OpenSearch export path built for exactly this.

The thing to sit with is that "real-time" here cost you almost nothing in infrastructure. No search cluster, no refresh interval, no capacity planning. Strong write consistency plus a serverless stream is the whole trick. What would you actually put on that stream if freshness were free?

Sources

Keep Reading