What we are building
Every RAG system has the same failure mode, and it is not the retriever. It is the corpus. You point a Bedrock Knowledge Base at an S3 bucket, ingest 40,000 documents, and three weeks later your assistant confidently answers with a policy that was superseded two years ago, or repeats the same paragraph four times because the same PDF was uploaded four times under different names. The embeddings were fine. The chunks were garbage going in.
The fix is boring and it works: clean the corpus before it reaches the vector store. In this tutorial we build a Glue 5.1 PySpark job that reads raw documents from an S3 raw/ prefix, normalizes and filters them, removes exact duplicates by content hash, removes near-duplicates with MinHash locality-sensitive hashing, and then runs the result through an AWS Glue Data Quality ruleset that acts as a gate. If the cleaned data does not meet the rules, the job fails and writes nothing. Only a corpus that passes lands in the clean/ prefix that your Knowledge Base ingests.
The non-obvious design choice is the gate. Most pipelines clean data and hope. This one treats the cleaned corpus as a contract: uniqueness, completeness, and minimum length are assertions the job must satisfy before it is allowed to publish, so a bad upstream change fails loudly at ETL time instead of silently at answer time.
Prerequisites
You need an AWS account with permissions to create S3 buckets, IAM roles, and Glue jobs. You need the AWS CLI v2 configured (aws sts get-caller-identity should return your account), and Python 3.10+ locally with boto3 installed (pip install boto3) to drive the job. Comfort reading PySpark helps, but every block here is runnable as-is.
You do not need a running Bedrock Knowledge Base to finish this tutorial. The output is a clean S3 prefix; wiring it to a Knowledge Base is episode 2 of this series (S3 Vectors). Everything here is serverless, so there is nothing to keep running afterward and nothing to patch.
One version note: AWS shipped Glue 5.1 in December 2025 (Apache Spark 3.5.6, Python 3.11, boto3 1.40 on the workers). The plan for this series said "Glue 5"; we follow reality and use 5.1, which is the current default and is where the newer Spark ML and open-table-format libraries live.
Setup
Pick a Region and set a few shell variables. Everything below uses one bucket with three prefixes.
export AWS_REGION=us-east-1
export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export BUCKET=rag-corpus-$ACCOUNT_ID
aws s3 mb s3://$BUCKET --region $AWS_REGION
Create the Glue execution role. Glue needs to assume the role, read and write your bucket, and write CloudWatch logs.
cat > trust.json <<'EOF'
{"Version":"2012-10-17","Statement":[{"Effect":"Allow",
"Principal":{"Service":"glue.amazonaws.com"},"Action":"sts:AssumeRole"}]}
EOF
aws iam create-role --role-name GlueRagCleanerRole \
--assume-role-policy-document file://trust.json
aws iam attach-role-policy --role-name GlueRagCleanerRole \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
aws iam put-role-policy --role-name GlueRagCleanerRole \
--policy-name S3Access --policy-document "{\"Version\":\"2012-10-17\",
\"Statement\":[{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\",
\"s3:DeleteObject\",\"s3:ListBucket\"],\"Resource\":[\"arn:aws:s3:::$BUCKET\",
\"arn:aws:s3:::$BUCKET/*\"]}]}"
Now seed a small, deliberately dirty corpus so you can watch the job earn its keep. Save this as docs.jsonl. Note that kb-002 is an exact duplicate of kb-001, kb-004 is a near-duplicate of kb-003 (one word changed), and kb-005 is a two-word stub that should be filtered out.
cat > docs.jsonl <<'EOF'
{"src_id":"kb-001","text":"<p>AWS Lambda is a serverless compute service that runs your code in response to events and automatically manages the underlying compute resources for you. You pay only for the compute time you consume.</p>"}
{"src_id":"kb-002","text":"AWS Lambda is a serverless compute service that runs your code in response to events and automatically manages the underlying compute resources for you. You pay only for the compute time you consume."}
{"src_id":"kb-003","text":"Amazon S3 is object storage built to store and retrieve any amount of data from anywhere. It offers eleven nines of durability and scales past trillions of objects worldwide."}
{"src_id":"kb-004","text":"Amazon S3 is object storage built to store and retrieve any amount of data from anywhere. It offers eleven nines of durability and scales past trillions of records worldwide."}
{"src_id":"kb-005","text":"See docs."}
EOF
aws s3 cp docs.jsonl s3://$BUCKET/raw/docs.jsonl
Smoke test the setup before writing any Spark: confirm the object landed and the role exists.
aws s3 ls s3://$BUCKET/raw/
aws iam get-role --role-name GlueRagCleanerRole --query Role.Arn --output text
You should see docs.jsonl and an ARN like arn:aws:iam::<ACCOUNT_ID>:role/GlueRagCleanerRole. Hold onto that ARN.
Step 1: Read the raw docs and understand the mess
Start the job script glue_clean_corpus.py. This first block is the standard Glue 5.1 boilerplate plus reading the JSONL into a Spark DataFrame. We pass the paths and the gate threshold as job arguments so the same script works for any corpus.
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, HashingTF, MinHashLSH
args = getResolvedOptions(sys.argv, ["JOB_NAME", "raw_path", "clean_path", "dq_prefix", "min_dq_score"])
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
raw = spark.read.json(args["raw_path"])
print(f"Raw rows in: {raw.count()}")
getResolvedOptions is the Glue way to read --key value arguments. The spark.read.json call reads newline-delimited JSON when each line is a JSON object, which is why we used .jsonl. If your source is Parquet or CSV, swap the reader; everything downstream operates on a DataFrame and does not care about the input format.
The reason we start with a print of the raw count is discipline. Every stage of this pipeline prints how many rows it drops, so when you run it you get a running tally of exactly where documents die. In a real corpus that tally is your first signal that an upstream export changed.
Step 2: Normalize and filter
Raw documents carry HTML tags, collapsed or exploded whitespace, and empty or near-empty stubs. None of that belongs in an embedding. This block strips tags, normalizes whitespace, trims, and drops anything too short to be a useful chunk.
cleaned = (
raw
.withColumn("text", F.regexp_replace("text", r"<[^>]+>", " "))
.withColumn("text", F.regexp_replace("text", r"\s+", " "))
.withColumn("text", F.trim(F.col("text")))
.filter(F.col("text").isNotNull())
.filter(F.length("text") >= 40)
)
print(f"After clean + length filter: {cleaned.count()}")
The <[^>]+> regex is a deliberately crude tag stripper, not an HTML parser. For a production corpus with nested markup you would preprocess with a real parser upstream, but for the tag noise that survives most exports this is enough and it runs at Spark speed across the whole dataset. The \s+ collapse matters more than it looks: models tokenize whitespace, so a document full of \n\n\n from a bad PDF extraction wastes context and skews chunk boundaries.
The length filter is the first quiet win. Our kb-005 stub ("See docs.") is 9 characters and gets dropped here. In a real corpus this filter removes navigation fragments, "click here" links, and empty pages that would otherwise become embeddings pointing at nothing. I use 40 characters as a floor for the demo; for prose chunks a floor of 200 to 500 is more realistic.
Step 3: Kill exact duplicates with a content hash
Exact duplicates are the cheapest to remove and the most common in real corpora, because the same document gets uploaded repeatedly under different filenames. Hash the normalized text and drop rows that share a hash.
cleaned = cleaned.withColumn("content_hash", F.sha2(F.col("text"), 256))
before = cleaned.count()
exact = cleaned.dropDuplicates(["content_hash"])
exact = exact.withColumn("doc_id", F.monotonically_increasing_id())
print(f"Exact-dedup removed: {before - exact.count()} rows")
Hashing the normalized text rather than the raw text is the important detail. kb-001 arrived wrapped in <p> tags and kb-002 did not, but after Step 2 both are byte-identical, so they hash to the same value and one is dropped. Had we hashed before normalizing, the tags would have made them look different and both would have survived. Clean first, then hash.
We also stamp each surviving row with a stable doc_id using monotonically_increasing_id(). The near-dedup join in the next step needs a stable key to decide which of two similar documents to keep, and the raw src_id may be missing or duplicated in messy data, so we mint our own.
Step 4: Remove near-duplicates with MinHash LSH
Exact hashing misses the more insidious case: two documents that are 98 percent identical because one is a lightly edited copy of the other. Our kb-003 and kb-004 differ by a single word ("objects" versus "records"). Their hashes are completely different, so Step 3 kept both. To catch these we use MinHash locality-sensitive hashing from Spark ML, which estimates Jaccard similarity between token sets without comparing every pair directly.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words = tokenizer.transform(exact)
htf = HashingTF(inputCol="words", outputCol="features", numFeatures=1 << 18)
featurized = htf.transform(words).filter(F.size("words") > 0)
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(featurized)
pairs = model.approxSimilarityJoin(featurized, featurized, 0.4, distCol="jdist")
drop_ids = (
pairs
.filter(F.col("datasetA.doc_id") < F.col("datasetB.doc_id"))
.select(F.col("datasetB.doc_id").alias("drop_id"))
.distinct()
)
deduped = featurized.join(drop_ids, featurized.doc_id == drop_ids.drop_id, "left_anti")
print(f"Near-dedup removed: {featurized.count() - deduped.count()} rows")
approxSimilarityJoin returns pairs whose Jaccard distance is below the threshold, so 0.4 means we treat anything more than 60 percent similar as a duplicate. That threshold is the knob you will tune most: too low and you keep real dupes, too high and you delete legitimately distinct documents that happen to share vocabulary. Start at 0.4, inspect the pairs it flags on a sample, and adjust.
The self-join produces each duplicate pair twice plus a row matched with itself, so we filter to datasetA.doc_id < datasetB.doc_id. That keeps the lower doc_id of every similar pair and marks the higher one for removal, which deterministically drops one of kb-003/kb-004 while keeping the other. The final left_anti join is Spark's set-difference: keep every row in featurized whose doc_id is not in drop_ids.
One caveat worth stating plainly: approxSimilarityJoin is a self-join and gets expensive on large corpora. For millions of documents you bucket first (by source, date, or a coarse prefix) and run the join within buckets, not across the whole set. For the tens-of-thousands range this straightforward version is fine.
Step 5: Gate on a Data Quality ruleset
Here is the part that turns a cleaning script into a contract. AWS Glue Data Quality lets you assert rules in DQDL, a small declarative language, and evaluate them against a DynamicFrame. We convert our cleaned DataFrame, define the rules the corpus must satisfy, and evaluate.
final_cols = deduped.select("doc_id", "text", "content_hash")
final_dyf = DynamicFrame.fromDF(final_cols, glueContext, "final_dyf")
ruleset = """
Rules = [
RowCount > 0,
IsComplete "text",
IsUnique "content_hash",
ColumnLength "text" >= 40,
Completeness "text" >= 0.99
]
"""
dq = EvaluateDataQuality.apply(
frame=final_dyf,
ruleset=ruleset,
publishing_options={
"dataQualityEvaluationContext": "rag_corpus_gate",
"enableDataQualityCloudWatchMetrics": True,
"enableDataQualityResultsPublishing": True,
"resultsS3Prefix": args["dq_prefix"],
},
)
Read the rules as English. There must be at least one row. The text column must never be null (IsComplete). Every content_hash must be unique, which is the assertion that dedup actually worked. Every value in text must be at least 40 characters long (ColumnLength). And text must be complete for at least 99 percent of rows (Completeness). IsUnique and Completeness here are doing something subtle: they re-verify the invariants Steps 2 through 4 were supposed to establish, so if a future refactor breaks dedup, the gate catches it rather than trusting the upstream code.
DQDL has a deep rule vocabulary beyond these five, including CustomSql for arbitrary SQL assertions, ColumnValues ... matches for regex checks, and freshness rules. The five above are the minimum viable gate for a RAG corpus; the DQDL reference lists the rest.
Now enforce the gate. EvaluateDataQuality.apply returns a DynamicFrame with one row per rule and an Outcome of Passed or Failed. We compute a pass ratio and refuse to write if it falls below the threshold passed in as --min_dq_score.
results = dq.toDF().collect()
passed = sum(1 for r in results if r["Outcome"] == "Passed")
score = passed / len(results) if results else 0.0
print(f"DQ score: {score:.2f} ({passed}/{len(results)} rules passed)")
for r in results:
if r["Outcome"] != "Passed":
print("FAILED:", r["Rule"], "->", r["FailureReason"])
if score < float(args["min_dq_score"]):
raise Exception(f"Quality gate failed: {score:.2f} < {args['min_dq_score']}, writing nothing")
final_cols.write.mode("overwrite").json(args["clean_path"])
job.commit()
The raise is the whole point. If the corpus fails the rules, the job errors out, the write never runs, and your Knowledge Base keeps ingesting the last known-good clean/ prefix instead of a poisoned one. A failed gate is a successful outcome: it means you caught the problem at ETL time.
Step 6: Deploy and run it with boto3
Upload the script and create the Glue 5.1 job from Python. Replace the role ARN with the one from Setup.
import boto3, time
glue = boto3.client("glue")
s3 = boto3.client("s3")
BUCKET = "rag-corpus-<ACCOUNT_ID>"
ROLE = "arn:aws:iam::<ACCOUNT_ID>:role/GlueRagCleanerRole"
s3.upload_file("glue_clean_corpus.py", BUCKET, "scripts/glue_clean_corpus.py")
glue.create_job(
Name="rag-corpus-cleaner",
Role=ROLE,
GlueVersion="5.1",
WorkerType="G.1X",
NumberOfWorkers=4,
Command={"Name": "glueetl", "PythonVersion": "3",
"ScriptLocation": f"s3://{BUCKET}/scripts/glue_clean_corpus.py"},
DefaultArguments={
"--raw_path": f"s3://{BUCKET}/raw/",
"--clean_path": f"s3://{BUCKET}/clean/",
"--dq_prefix": f"s3://{BUCKET}/dq-results/",
"--min_dq_score": "1.0",
"--enable-metrics": "true",
},
)
run_id = glue.start_job_run(JobName="rag-corpus-cleaner")["JobRunId"]
while True:
st = glue.get_job_run(JobName="rag-corpus-cleaner", RunId=run_id)["JobRun"]["JobRunState"]
print("state:", st)
if st in ("SUCCEEDED", "FAILED", "ERROR", "TIMEOUT"):
break
time.sleep(20)
GlueVersion="5.1" pins the runtime to Spark 3.5.6 and Python 3.11. G.1X workers give one DPU each (4 vCPU, 16 GB); four of them is plenty for a demo and small enough to keep the bill negligible. The --enable-metrics argument turns on job metrics so you can see DPU usage in the console afterward.
The polling loop just waits for a terminal state. In production you would trigger this from an EventBridge rule on s3:ObjectCreated in raw/, or chain it into a Glue workflow, but for a first run watching the state transitions is the fastest way to learn where time goes.
Verify it works
When the run reaches SUCCEEDED, check the CloudWatch log for the running tally the script prints. You are looking for these lines, in order:
Raw rows in: 5
After clean + length filter: 4
Exact-dedup removed: 1 rows
Near-dedup removed: 1 rows
DQ score: 1.00 (5/5 rules passed)
Five in, one stub filtered, one exact dupe removed, one near-dupe removed, leaving two clean documents that pass all five rules. Confirm the output landed:
aws s3 ls s3://$BUCKET/clean/
aws s3 cp s3://$BUCKET/clean/ - --recursive | head
You should see two JSON records, one for Lambda and one for S3, with no duplicates and no stub. To prove the gate bites, re-run with --min_dq_score set to 1.0 after loosening a rule so it fails (for example change ColumnLength "text" >= 40 to >= 5000). The run ends in FAILED, the log shows FAILED: ColumnLength ..., and s3://$BUCKET/clean/ is untouched from the previous good run. That untouched output is the contract working.
When it breaks
If the job fails immediately with ModuleNotFoundError: No module named 'awsgluedq', you are almost certainly running the script outside Glue (locally, or in a plain Spark session). The awsgluedq and awsglue modules only exist on the Glue runtime; run through create_job/start_job_run, not spark-submit on your laptop.
If you see AccessDenied on the S3 read or write, the execution role is missing the bucket in its inline policy, or you passed the wrong bucket name in the job arguments. Re-check that the S3Access policy Resource ARNs match $BUCKET exactly, including the /* variant for objects.
If approxSimilarityJoin runs for a long time or the job runs out of memory, your corpus is too large for a naive self-join. Bucket the data first and join within buckets, or raise NumberOfWorkers. For the five-row demo this never happens; it shows up around the hundreds-of-thousands mark.
If the DQ score is lower than you expect and IsUnique "content_hash" is the failing rule, dedup did not actually run before the gate, usually because the DataFrame you converted to a DynamicFrame was the pre-dedup one. Confirm you converted deduped, not cleaned or exact. This is exactly the class of bug the gate exists to catch.
If the job succeeds but clean/ is empty, the gate failed silently in your reading of the log. Search the log for Quality gate failed; a raised exception marks the run FAILED, so a SUCCEEDED run with empty output means the write path or the clean_path argument is wrong, not the gate.
Where to take it next
First, make the gate richer. Add CustomSql "select count(*) from primary where text like '%lorem ipsum%'" = 0 to reject placeholder text, or a ColumnValues "text" matches rule to enforce a language or encoding constraint. Each rule you add is a class of corpus poison you will never debug at answer time again.
Second, wire the clean prefix into the rest of this series. The clean/ bucket is exactly the shape a Bedrock Knowledge Base on S3 Vectors wants as a data source, so point episode 2's Knowledge Base at it and you have an end-to-end pipeline where only quality-gated documents ever get embedded.
Third, automate the trigger. Replace the manual start_job_run with an EventBridge rule on s3:ObjectCreated:* under raw/, so every new upload runs the gate automatically and a bad batch fails before anyone notices. At that point the corpus stops being something you clean once and becomes something that cannot get dirty without alarming. What would your assistant answer differently if you could prove every document behind it passed five rules this morning?

