Technology & AI

How to Build a Manufacturing-Grade AI System with Hybrid Retrieval, Provenance-First Citations, Maintenance Loops, and Episodic Memory

In this tutorial, we build an AI workflow for a highly advanced agent that behaves like a production-grade research and reasoning system instead of a single quick call. We import real web sources in the same way, divide them into historical fragments, and apply hybrid retrieval using both embedded TF-IDF (sparse) and OpenAI (dense), and combine the results for high recall and stability. We organize multiple agents, plan, assemble, and repair, while enforcing strict surveillance rules so that all major claims are based on recovered evidence, and we continue with episodic memory. Therefore, the program develops its strategy over time. Check it out FULL CODES here.

!pip -q install openai openai-agents pydantic httpx beautifulsoup4 lxml scikit-learn numpy


import os, re, json, time, getpass, asyncio, sqlite3, hashlib
from typing import List, Dict, Tuple, Optional, Any


import numpy as np
import httpx
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field


from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity


from openai import AsyncOpenAI
from agents import Agent, Runner, SQLiteSession


if not os.environ.get("OPENAI_API_KEY"):
   os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
if not os.environ.get("OPENAI_API_KEY"):
   raise RuntimeError("OPENAI_API_KEY not provided.")
print("✅ OpenAI API key loaded securely.")
oa = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])


def sha1(s: str) -> str:
   return hashlib.sha1(s.encode("utf-8", errors="ignore")).hexdigest()


def normalize_url(u: str) -> str:
   u = (u or "").strip()
   return u.rstrip(").,]"'")


def clean_html_to_text(html: str) -> str:
   soup = BeautifulSoup(html, "lxml")
   for tag in soup(["script", "style", "noscript"]):
       tag.decompose()
   txt = soup.get_text("n")
   txt = re.sub(r"n{3,}", "nn", txt).strip()
   txt = re.sub(r"[ t]+", " ", txt)
   return txt


def chunk_text(text: str, chunk_chars: int = 1600, overlap_chars: int = 320) -> List[str]:
   if not text:
       return []
   text = re.sub(r"s+", " ", text).strip()
   n = len(text)
   step = max(1, chunk_chars - overlap_chars)
   chunks = []
   i = 0
   while i < n:
       chunks.append(text[i:i + chunk_chars])
       i += step
   return chunks


def canonical_chunk_id(s: str) -> str:
   if s is None:
       return ""
   s = str(s).strip()
   s = s.strip("<>"'()[]{}")
   s = s.rstrip(".,;:")
   return s


def inject_exec_summary_citations(exec_summary: str, citations: List[str], allowed_chunk_ids: List[str]) -> str:
   exec_summary = exec_summary or ""
   cset = []
   for c in citations:
       c = canonical_chunk_id(c)
       if c and c in allowed_chunk_ids and c not in cset:
           cset.append(c)
       if len(cset) >= 2:
           break
   if len(cset) < 2:
       for c in allowed_chunk_ids:
           if c not in cset:
               cset.append(c)
           if len(cset) >= 2:
               break
   if len(cset) >= 2:
       needed = [c for c in cset if c not in exec_summary]
       if needed:
           exec_summary = exec_summary.strip()
           if exec_summary and not exec_summary.endswith("."):
               exec_summary += "."
           exec_summary += f" (cite: {cset[0]}) (cite: {cset[1]})"
   return exec_summary

We set up the environment, securely load the OpenAI API key, and initialize the essential services everything else depends on. We explain hashing, URL normalization, HTML cleaning, and concatenation so that all the steps below work on clean, consistent text. We also add deterministic helpers to normalize and inject quotes, ensuring that guardrails are always satisfied. Check it out FULL CODES here.

async def fetch_many(urls: List[str], timeout_s: float = 25.0, per_url_char_limit: int = 60000) -> Dict[str, str]:
   headers = {"User-Agent": "Mozilla/5.0 (AgenticAI/4.2)"}
   urls = [normalize_url(u) for u in urls]
   urls = [u for u in urls if u.startswith("http")]
   urls = list(dict.fromkeys(urls))
   out: Dict[str, str] = {}
   async with httpx.AsyncClient(timeout=timeout_s, follow_redirects=True, headers=headers) as client:
       async def _one(url: str):
           try:
               r = await client.get(url)
               r.raise_for_status()
               out[url] = clean_html_to_text(r.text)[:per_url_char_limit]
           except Exception as e:
               out[url] = f"__FETCH_ERROR__ {type(e).__name__}: {e}"
       await asyncio.gather(*[_one(u) for u in urls])
   return out


def dedupe_texts(sources: Dict[str, str]) -> Dict[str, str]:
   seen = set()
   out = {}
   for url, txt in sources.items():
       if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
           continue
       h = sha1(txt[:25000])
       if h in seen:
           continue
       seen.add(h)
       out[url] = txt
   return out


class ChunkRecord(BaseModel):
   chunk_id: str
   url: str
   chunk_index: int
   text: str


class RetrievalHit(BaseModel):
   chunk_id: str
   url: str
   chunk_index: int
   score_sparse: float = 0.0
   score_dense: float = 0.0
   score_fused: float = 0.0
   text: str


class EvidencePack(BaseModel):
   query: str
   hits: List[RetrievalHit]

We automatically download multiple web sources in parallel and extract content hard to avoid unwanted evidence. We convert raw pages into structured text and define basic data models that represent segments and retrieval hits. We ensure that each piece of text is traceable back to a specific source and index of passages. Check it out FULL CODES here.

EPISODE_DB = "agentic_episode_memory.db"


def episode_db_init():
   con = sqlite3.connect(EPISODE_DB)
   cur = con.cursor()
   cur.execute("""
   CREATE TABLE IF NOT EXISTS episodes (
       id INTEGER PRIMARY KEY AUTOINCREMENT,
       ts INTEGER NOT NULL,
       question TEXT NOT NULL,
       urls_json TEXT NOT NULL,
       retrieval_queries_json TEXT NOT NULL,
       useful_sources_json TEXT NOT NULL
   )
   """)
   con.commit()
   con.close()


def episode_store(question: str, urls: List[str], retrieval_queries: List[str], useful_sources: List[str]):
   con = sqlite3.connect(EPISODE_DB)
   cur = con.cursor()
   cur.execute(
       "INSERT INTO episodes(ts, question, urls_json, retrieval_queries_json, useful_sources_json) VALUES(?,?,?,?,?)",
       (int(time.time()), question, json.dumps(urls), json.dumps(retrieval_queries), json.dumps(useful_sources)),
   )
   con.commit()
   con.close()


def episode_recall(question: str, top_k: int = 2) -> List[Dict[str, Any]]:
   con = sqlite3.connect(EPISODE_DB)
   cur = con.cursor()
   cur.execute("SELECT ts, question, urls_json, retrieval_queries_json, useful_sources_json FROM episodes ORDER BY ts DESC LIMIT 200")
   rows = cur.fetchall()
   con.close()
   q_tokens = set(re.findall(r"[A-Za-z]{3,}", (question or "").lower()))
   scored = []
   for ts, q2, u, rq, us in rows:
       t2 = set(re.findall(r"[A-Za-z]{3,}", (q2 or "").lower()))
       if not t2:
           continue
       score = len(q_tokens & t2) / max(1, len(q_tokens))
       if score > 0:
           scored.append((score, {
               "ts": ts,
               "question": q2,
               "urls": json.loads(u),
               "retrieval_queries": json.loads(rq),
               "useful_sources": json.loads(us),
           }))
   scored.sort(key=lambda x: x[0], reverse=True)
   return [x[1] for x in scored[:top_k]]


episode_db_init()

We introduce episodic memory supported by SQLite so that the system remembers what worked in the previous run. We maintain questions, recovery strategies, and useful resources to guide future planning. We also use lightweight similarity-based recall to bias the system toward historically efficient patterns. Check it out FULL CODES here.

class HybridIndex:
   def __init__(self):
       self.records: List[ChunkRecord] = []
       self.tfidf: Optional[TfidfVectorizer] = None
       self.tfidf_mat = None
       self.emb_mat: Optional[np.ndarray] = None


   def build_sparse(self):
       corpus = [r.text for r in self.records] if self.records else [""]
       self.tfidf = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=80000)
       self.tfidf_mat = self.tfidf.fit_transform(corpus)


   def search_sparse(self, query: str, k: int) -> List[Tuple[int, float]]:
       if not self.records or self.tfidf is None or self.tfidf_mat is None:
           return []
       qv = self.tfidf.transform([query])
       sims = cosine_similarity(qv, self.tfidf_mat).flatten()
       top = np.argsort(-sims)[:k]
       return [(int(i), float(sims[i])) for i in top]


   def set_dense(self, mat: np.ndarray):
       self.emb_mat = mat.astype(np.float32)


   def search_dense(self, q_emb: np.ndarray, k: int) -> List[Tuple[int, float]]:
       if self.emb_mat is None or not self.records:
           return []
       M = self.emb_mat
       q = q_emb.astype(np.float32).reshape(1, -1)
       M_norm = M / (np.linalg.norm(M, axis=1, keepdims=True) + 1e-9)
       q_norm = q / (np.linalg.norm(q) + 1e-9)
       sims = (M_norm @ q_norm.T).flatten()
       top = np.argsort(-sims)[:k]
       return [(int(i), float(sims[i])) for i in top]


def rrf_fuse(rankings: List[List[int]], k: int = 60) -> Dict[int, float]:
   scores: Dict[int, float] = {}
   for r in rankings:
       for pos, idx in enumerate(r, start=1):
           scores[idx] = scores.get(idx, 0.0) + 1.0 / (k + pos)
   return scores


HYBRID = HybridIndex()
ALLOWED_URLS: List[str] = []


EMBED_MODEL = "text-embedding-3-small"


async def embed_batch(texts: List[str]) -> np.ndarray:
   resp = await oa.embeddings.create(model=EMBED_MODEL, input=texts, encoding_format="float")
   vecs = [np.array(item.embedding, dtype=np.float32) for item in resp.data]
   return np.vstack(vecs) if vecs else np.zeros((0, 0), dtype=np.float32)


async def embed_texts(texts: List[str], batch_size: int = 96, max_concurrency: int = 3) -> np.ndarray:
   sem = asyncio.Semaphore(max_concurrency)
   mats: List[Tuple[int, np.ndarray]] = []


   async def _one(start: int, batch: List[str]):
       async with sem:
           m = await embed_batch(batch)
           mats.append((start, m))


   tasks = []
   for start in range(0, len(texts), batch_size):
       batch = [t[:7000] for t in texts[start:start + batch_size]]
       tasks.append(_one(start, batch))
   await asyncio.gather(*tasks)


   mats.sort(key=lambda x: x[0])
   emb = np.vstack([m for _, m in mats]) if mats else np.zeros((len(texts), 0), dtype=np.float32)
   if emb.shape[0] != len(texts):
       raise RuntimeError(f"Embedding rows mismatch: got {emb.shape[0]} expected {len(texts)}")
   return emb


async def embed_query(query: str) -> np.ndarray:
   m = await embed_batch([query[:7000]])
   return m[0] if m.shape[0] else np.zeros((0,), dtype=np.float32)


async def build_index(urls: List[str], max_chunks_per_url: int = 60):
   global ALLOWED_URLS
   fetched = await fetch_many(urls)
   fetched = dedupe_texts(fetched)


   records: List[ChunkRecord] = []
   allowed: List[str] = []


   for url, txt in fetched.items():
       if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
           continue
       allowed.append(url)
       chunks = chunk_text(txt)[:max_chunks_per_url]
       for i, ch in enumerate(chunks):
           cid = f"{sha1(url)}:{i}"
           records.append(ChunkRecord(chunk_id=cid, url=url, chunk_index=i, text=ch))


   if not records:
       err_view = {normalize_url(u): fetched.get(normalize_url(u), "") for u in urls}
       raise RuntimeError("No sources fetched successfully.n" + json.dumps(err_view, indent=2)[:4000])


   ALLOWED_URLS = allowed
   HYBRID.records = records
   HYBRID.build_sparse()


   texts = [r.text for r in HYBRID.records]
   emb = await embed_texts(texts, batch_size=96, max_concurrency=3)
   HYBRID.set_dense(emb)

We develop a hybrid retrieval index that combines TF-IDF sparse search with OpenAI dense embedding. We allow balanced positioning, so that smaller and denser signals complement each other rather than compete with each other. We create the index once per run and reuse it for all retrieval queries for efficiency. Check it out FULL CODES here.

def build_evidence_pack(query: str, sparse: List[Tuple[int,float]], dense: List[Tuple[int,float]], k: int = 10) -> EvidencePack:
   sparse_rank = [i for i,_ in sparse]
   dense_rank  = [i for i,_ in dense]
   sparse_scores = {i:s for i,s in sparse}
   dense_scores  = {i:s for i,s in dense}
   fused = rrf_fuse([sparse_rank, dense_rank], k=60) if dense_rank else rrf_fuse([sparse_rank], k=60)
   top = sorted(fused.keys(), key=lambda i: fused[i], reverse=True)[:k]


   hits: List[RetrievalHit] = []
   for idx in top:
       r = HYBRID.records[idx]
       hits.append(RetrievalHit(
           chunk_id=r.chunk_id, url=r.url, chunk_index=r.chunk_index,
           score_sparse=float(sparse_scores.get(idx, 0.0)),
           score_dense=float(dense_scores.get(idx, 0.0)),
           score_fused=float(fused.get(idx, 0.0)),
           text=r.text
       ))
   return EvidencePack(query=query, hits=hits)


async def gather_evidence(queries: List[str], per_query_k: int = 10, sparse_k: int = 60, dense_k: int = 60):
   evidence: List[EvidencePack] = []
   useful_sources_count: Dict[str, int] = {}
   all_chunk_ids: List[str] = []


   for q in queries:
       sparse = HYBRID.search_sparse(q, k=sparse_k)
       q_emb = await embed_query(q)
       dense = HYBRID.search_dense(q_emb, k=dense_k)
       pack = build_evidence_pack(q, sparse, dense, k=per_query_k)
       evidence.append(pack)
       for h in pack.hits[:6]:
           useful_sources_count[h.url] = useful_sources_count.get(h.url, 0) + 1
       for h in pack.hits:
           all_chunk_ids.append(h.chunk_id)


   useful_sources = sorted(useful_sources_count.keys(), key=lambda u: useful_sources_count[u], reverse=True)
   all_chunk_ids = sorted(list(dict.fromkeys(all_chunk_ids)))
   return evidence, useful_sources[:8], all_chunk_ids


class Plan(BaseModel):
   objective: str
   subtasks: List[str]
   retrieval_queries: List[str]
   acceptance_checks: List[str]


class UltraAnswer(BaseModel):
   title: str
   executive_summary: str
   architecture: List[str]
   retrieval_strategy: List[str]
   agent_graph: List[str]
   implementation_notes: List[str]
   risks_and_limits: List[str]
   citations: List[str]
   sources: List[str]


def normalize_answer(ans: UltraAnswer, allowed_chunk_ids: List[str]) -> UltraAnswer:
   data = ans.model_dump()
   data["citations"] = [canonical_chunk_id(x) for x in (data.get("citations") or [])]
   data["citations"] = [x for x in data["citations"] if x in allowed_chunk_ids]
   data["executive_summary"] = inject_exec_summary_citations(data.get("executive_summary",""), data["citations"], allowed_chunk_ids)
   return UltraAnswer(**data)


def validate_ultra(ans: UltraAnswer, allowed_chunk_ids: List[str]) -> None:
   extras = [u for u in ans.sources if u not in ALLOWED_URLS]
   if extras:
       raise ValueError(f"Non-allowed sources in output: {extras}")


   cset = set(ans.citations or [])
   missing = [cid for cid in cset if cid not in set(allowed_chunk_ids)]
   if missing:
       raise ValueError(f"Citations reference unknown chunk_ids (not retrieved): {missing}")


   if len(cset) < 6:
       raise ValueError("Need at least 6 distinct chunk_id citations in ultra mode.")


   es_text = ans.executive_summary or ""
   es_count = sum(1 for cid in cset if cid in es_text)
   if es_count < 2:
       raise ValueError("Executive summary must include at least 2 chunk_id citations verbatim.")


PLANNER = Agent(
   name="Planner",
   model="gpt-4o-mini",
   instructions=(
       "Return a technical Plan schema.n"
       "Make 10-16 retrieval_queries.n"
       "Acceptance must include: at least 6 citations and exec_summary contains at least 2 citations verbatim."
   ),
   output_type=Plan,
)


SYNTHESIZER = Agent(
   name="Synthesizer",
   model="gpt-4o-mini",
   instructions=(
       "Return UltraAnswer schema.n"
       "Hard constraints:n"
       "- executive_summary MUST include at least TWO citations verbatim as: (cite: ).n"
       "- citations must be chosen ONLY from ALLOWED_CHUNK_IDS list.n"
       "- citations list must include at least 6 unique chunk_ids.n"
       "- sources must be subset of allowed URLs.n"
   ),
   output_type=UltraAnswer,
)


FIXER = Agent(
   name="Fixer",
   model="gpt-4o-mini",
   instructions=(
       "Repair to satisfy guardrails.n"
       "Ensure executive_summary includes at least TWO citations verbatim.n"
       "Choose citations ONLY from ALLOWED_CHUNK_IDS list.n"
       "Return UltraAnswer schema."
   ),
   output_type=UltraAnswer,
)


session = SQLiteSession("ultra_agentic_user", "ultra_agentic_session.db")

We collect evidence by asking a number of targeted questions, combining narrow and broad results, and compiling evidence packets with points and a primer. We define robust schemas for programs and final responses, then normalize and validate citations against returned fragment IDs. We use strict rules so that every answer is always supported and easy to read. Check it out FULL CODES here.

async def run_ultra_agentic(question: str, urls: List[str], max_repairs: int = 2) -> UltraAnswer:
   await build_index(urls)
   recall_hint = json.dumps(episode_recall(question, top_k=2), indent=2)[:2000]


   plan_res = await Runner.run(
       PLANNER,
       f"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nnRecall:n{recall_hint}n",
       session=session
   )
   plan: Plan = plan_res.final_output
   queries = (plan.retrieval_queries or [])[:16]


   evidence_packs, useful_sources, allowed_chunk_ids = await gather_evidence(queries)


   evidence_json = json.dumps([p.model_dump() for p in evidence_packs], indent=2)[:16000]
   allowed_chunk_ids_json = json.dumps(allowed_chunk_ids[:200], indent=2)


   draft_res = await Runner.run(
       SYNTHESIZER,
       f"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn"
       f"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn"
       f"Evidence packs:n{evidence_json}nn"
       "Return UltraAnswer.",
       session=session
   )
   draft = normalize_answer(draft_res.final_output, allowed_chunk_ids)


   last_err = None
   for i in range(max_repairs + 1):
       try:
           validate_ultra(draft, allowed_chunk_ids)
           episode_store(question, ALLOWED_URLS, plan.retrieval_queries, useful_sources)
           return draft
       except Exception as e:
           last_err = str(e)
           if i >= max_repairs:
               draft = normalize_answer(draft, allowed_chunk_ids)
               validate_ultra(draft, allowed_chunk_ids)
               return draft


           fixer_res = await Runner.run(
               FIXER,
               f"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn"
               f"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn"
               f"Guardrail error:n{last_err}nn"
               f"Draft:n{json.dumps(draft.model_dump(), indent=2)[:12000]}nn"
               f"Evidence packs:n{evidence_json}nn"
               "Return corrected UltraAnswer that passes guardrails.",
               session=session
           )
           draft = normalize_answer(fixer_res.final_output, allowed_chunk_ids)


   raise RuntimeError(f"Unexpected failure: {last_err}")


question = (
   "Design a production-lean but advanced agentic AI workflow in Python with hybrid retrieval, "
   "provenance-first citations, critique-and-repair loops, and episodic memory. "
   "Explain why each layer matters, failure modes, and evaluation."
)


urls = [
   "
   "
   "
   "
]


ans = await run_ultra_agentic(question, urls, max_repairs=2)


print("nTITLE:n", ans.title)
print("nEXECUTIVE SUMMARY:n", ans.executive_summary)
print("nARCHITECTURE:")
for x in ans.architecture:
   print("-", x)
print("nRETRIEVAL STRATEGY:")
for x in ans.retrieval_strategy:
   print("-", x)
print("nAGENT GRAPH:")
for x in ans.agent_graph:
   print("-", x)
print("nIMPLEMENTATION NOTES:")
for x in ans.implementation_notes:
   print("-", x)
print("nRISKS & LIMITS:")
for x in ans.risks_and_limits:
   print("-", x)
print("nCITATIONS (chunk_ids):")
for c in ans.citations:
   print("-", c)
print("nSOURCES:")
for s in ans.sources:
   print("-", s)

We automate the full agent loop by combining programming, compilation, validation, and debugging into a safe async pipeline. We automatically try and adjust the output until it passes all the obstacles without human intervention. We finish by creating a full example and printing a fully supported, production-ready answer.

In conclusion, we have developed a broad agent pipeline that is resistant to common failure modes: unstable embedment orientation, citation drift, and base failure at high compression. We verified results against authoritative sources, returned stub IDs, normalized citations, and injected embedded citations when needed to ensure compliance without sacrificing validity. By combining hybrid retrieval, critique and repair loops, and episodic memory, we created a reusable foundation that can be extended with rigorous testing (evidence scoring, red-collaring, and regression testing) to further strengthen the system as it expands to new and larger domains.


Check it out FULL CODES here. Also, feel free to follow us Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.


Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button