Building a Production-Grade RAG Engine with LangChain, Postgres, and HNSW Indexing.
- Neha Singh
- Jan 12
- 7 min read
Updated: Jan 16
Most RAG systems suffer from duplicate records and slow retrieval as they grow. This robust RAG orchestrator solves the most common 'Day 2' database challenges. By integrating the langchain SQLRecordManager for duplicate vector embedding prevention and PGVECTOR’s HNSW indexing for high-speed retrieval, the system ensures data integrity and performance at scale. It features an interactive CLI for seamless document ingestion with surgical cleanup of old records, and optimized Q&A over multiple document collections. No more duplicate chunks, no more slow scans—just a robust, local-first AI storage engine.
Understanding Few concepts before deep diving:
What does a Record Manager do?
a simple PGVector vectorstore and add_document is not enough for production.
No Duplicate Prevention: By default, it will happily insert the same text multiple times, creating redundant vectors.
Limited Metadata Control: It stores everything in a generic langchain_pg_embedding table. As you scale, you may need custom columns (like user_id or tenant_id) for performance-critical filtering.
Memory Overhead: Large batch inserts via LangChain can lead to high memory usage in your application layer before the data even reaches the database.
A More Robust Strategy: "The Record Manager"
LangChain actually provides a tool specifically for this called the Record Manager. Instead of just adding docs, you "index" them. This keeps track of what has already been inserted.
RecordManager stores its data in a dedicated table within your PostgreSQL database. By default, this table is named upsertion_record, though you can customize this name when you initialize the manager.This table acts as a "lookup ledger" to track the state of your documents separate from the actual vector embeddings.
Langchain provides an index function acts as the "brain" that coordinates between the Record Manager (which remembers what you've already processed) and the Vector Store (where the searchable vectors live).
Project Overview
AgenticRAG demonstrates a production-grade RAG implementation with the following core components:
System Overview
## 1. INITIALIZATION FLOW
```
┌─────────────────────────────────────────────────────────────────┐
│ COMMAND LINE ENTRY │
│ (asyncio.run(main())) │
└────────────────────────┬────────────────────────────────────────┘
│
┌────▼─────┐
│ Setup │
└────┬─────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
Logger Config Env Variables
Loader Loader (DB Credentials)
│ │ │
└────────────────┼────────────────┘
│
┌────▼──────────────────────────┐
│ Build PostgreSQL Connection │
│ String (DB_URL) │
└────┬──────────────────────────┘
│
┌────▼──────────────────────────┐
│ Display Welcome Menu │
│ (3 Options) │
└────┬──────────────────────────┘
│
```## 2. MAIN MENU - USER INTERACTION
```
┌────────────────────────────────────────────────────────┐
│ CONSOLE: Display Menu & Get User Choice │
│ │
│ 1. Document Ingestion │
│ 2. Q&A (Question & Answer) │
│ 3. Exit │
└────────────────────┬─────────────────────────────────┘
│
┌────────────┼────────────┐
│ │ │
Choice=1 Choice=2 Choice=3
│ │ │
▼ ▼ ▼
[FLOW A] [FLOW B] EXIT
```## 3. FLOW A: DOCUMENT INGESTION PATH
```
┌─────────────────────────────────────────────────────────────┐
│ User Input: Document Name │
│ (folder/zip file/PDF in data/ directory) │
└────────────────┬────────────────────────────────────────────┘
│
┌────▼──────────────────────────────────┐
│ RAGUtility.create() │
│ (Async Factory Method) │
└────┬───────────────────────────────────┘
│
┌───────────┼────────────────┬──────────────┐
▼ ▼ ▼ ▼
Logger SQLRecord OpenAI PGVector
Instance Manager Embeddings Engine
Created (text-embedding-
ada-002)
│ │ │ │
└───────────┼────────────────┴──────────────┘
│
┌────▼──────────────────────────────────┐
│ _check_table_exists() │
│ (Query PostgreSQL) │
└────┬───────────────────────────────────┘
│
┌────────┴─────────┐
│ │
Table Table
Exists Not Exists
│ │
│ ┌───▼──────────────────┐
│ │ Create New Table: │
│ │ - Columns: │
│ │ • langchain_id │
│ │ • langchain_metadata
│ │ • embedding (1536) │
│ │ • content │
│ │ ainit_vectorstore_ │
│ │ table() │
│ └───┬──────────────────┘
│ │
│ ┌───▼──────────────────┐
│ │ Create Indexes: │
│ │ - HNSW index │
│ │ - Metadata GIN index │
│ │ _create_index() │
│ └───┬──────────────────┘
│ │
└────────┬─────────┘
│
┌────▼──────────────────────────────────┐
│ Initialize PGVectorStore │
│ (Connect embeddings to table) │
└────┬───────────────────────────────────┘
│
┌────▼──────────────────────────────────┐
│ await ingest_pdfs_to_pgvector() │
│ │
│ ├─ load_documents(doc_name) │
│ │ ├─ Check if dir, zip, or single │
│ │ ├─ Extract if needed │
│ │ └─ Load PDFs via LangChain │
│ │ PyPDFLoader/ │
│ │ PyPDFDirectoryLoader │
│ │ │
│ ├─ split_documents() [CHUNKING] │
│ │ ├─ RecursiveCharacterText │
│ │ │ Splitter (tiktoken encoder) │
│ │ ├─ chunk_size: 1000 │
│ │ └─ overlap: 200 │
│ │ │
│ ├─ _run_indexing(chunks) │
│ │ ├─ Generate embeddings for │
│ │ │ each chunk │
│ │ ├─ Store in PostgreSQL vector │
│ │ ├─ Record in SQLRecordManager │
│ │ │ (cleanup: 'incremental') │
│ │ └─ Return indexing_stats │
│ │ │
│ ├─ Check if 1000+ chunks added │
│ │ └─ Rebuild HNSW index if true │
│ │ (_maintenance_task()) │
│ │ │
│ └─ update_table_cache() │
│ └─ Save collection list to │
│ cache/vector_tables_ │
│ cache.json │
│ │
└────┬───────────────────────────────────┘
│
┌────▼──────────────────────────────────┐
│ Console Output: │
│ "Successfully ingested chunks..." │
└────┬───────────────────────────────────┘
│
┌────▼──────────────────────────────────┐
│ Return to Main Menu │
└────────────────────────────────────────┘
```
## 5. RESPONSE & LOOP
```
┌─────────────────────────────────────────────────────────────┐
│ Console Output: │
│ "Agent's response:\n{response}" │
└────────────────┬────────────────────────────────────────────┘
│
┌────▼──────────────────────────────┐
│ Return to Main Menu Loop │
│ (Prompt user for next action) │
└────────────────────────────────────┘
```Low level design components:
The system is built on the following architecture:
Postgres Database: For local development I have spinned up a docker image to run postgres database as a container in my machine. You can do it too using Docker Desktop.
docker run --name local-postgres -e POSTGRES_PASSWORD=xxxxxxxx -p 5432:5432 -d postgresVerify through < using WSL if Windows >:
psql -h localhost -p 5432 -U postgres -d postgres -W
PDF Ingestion Pipeline: Automatically reads PDF files from designated directories
Embedding Generation: Converts documents into embeddings using OpenAI's text-embedding-ada-002
Vector Database: Stores embeddings in PostgreSQL with HNSW vector indexing
Intelligent Retrieval: Fetches relevant documents based on semantic similarity for user queries
Orchestrated Agent: Coordinates the entire workflow using LangChain agents with GPT-4o-mini
Core Components & Function Descriptions


KEY DESIGN PATTERNS
### Async/Await Pattern
- `RAGUtility.create()` - Async factory for initialization
- `ingest_pdfs_to_pgvector()` - Async ingestion
- `_check_table_exists()`, `_create_index()` - Async DB operations
- `update_table_cache()` - Async cache updates
### Tool-Based Agent Pattern
- Agent has access to `search_knowledge_base` tool
- Agent decides autonomously whether to use it
- Tool retrieves context from PostgreSQL
### Vector Search Pipeline
1. **Chunking**: Split PDFs into 1000-char chunks (200 overlap)
2. **Embedding**: Convert chunks to 1536-dim vectors
3. **Storage**: Store in PostgreSQL with HNSW index
4. **Retrieval**: Cosine similarity search for top-K resultsFLOW DECISION TREE
```
START
│
├─ User chooses 1 (Ingestion)
│ ├─ Provide doc name
│ ├─ Load PDFs
│ ├─ Chunk documents
│ ├─ Generate embeddings
│ ├─ Store in PostgreSQL
│ ├─ Create HNSW indexes
│ ├─ Update cache
│ └─ Return to menu
│
├─ User chooses 2 (Q&A)
│ ├─ Select collection
│ ├─ Enter query
│ ├─ Initialize Agent
│ ├─ Agent invokes search tool (if needed)
│ ├─ Retrieve context from PostgreSQL
│ ├─ LLM generates response
│ ├─ Return response to console
│ └─ Return to menu
│
└─ User chooses 3 (Exit)
└─ Log "Exiting..." & STOP
```
---CACHING MECHANISM
```
Cache File: cache/vector_tables_cache.json
Content: {"tables": ["doc_name_1", "doc_name_2", ...]}
Updated: After each successful ingestion
Loaded: When user selects option 2 (Q&A)
Purpose: Quick display of available collections without DB query
```DATABASE SCHEMA
```
Table: {table_name-> doc_name}
Description: dynamically created per ingestion using langchain_postgres.PGEngine.ainit_vectorstore_table
Columns:
├─ langchain_id (UUID) - Primary key
├─ langchain_metadata (JSONB) - Document metadata (source, page, etc.)
├─ embedding (vector[1536]) - OpenAI embeddings
└─ content (TEXT) - Chunk text
Indexes:
├─ {table_name}_hnsw_idx (HNSW) - Vector similarity search
└─ {table_name}_metadata (GIN) - Metadata filtering
________________________________________________________________________
Table: upsertion_records
Description: RecordManager stores its data in a dedicated table within PostgreSQL database. By default, this table is named upsertion_record and acts as a "lookup ledger" to track the state of your documents separate from the actual vector embeddings.
langchain_classic.indexex.index integrates langchain_classic.indexes.SQLRecordManager and langchain_postgres.PGVectorStore together to create this table
Columns:
├─ key (TEXT) - A unique ID for the chunk (often a hash of the content).
├─ namespace(TEXT) - Groups records (e.g., postgres/collection_name).
├─ group_id (TEXT) - The source_id (e.g., the filename) used to identify each document source, after chunking langchain documents has this info as part of metadata->source representing the document or filename.
└─ updated_at (DOUBLE PRECISION) - Server timestamp of the last write.
```SYSTEM DEPENDENCIES
```
External Services:
├─ PostgreSQL + pgvector - Vector storage & retrieval
├─ OpenAI API - Embeddings (text-embedding-ada-002)
└─ OpenAI API - LLM (gpt-4o-mini)
Python Libraries:
├─ LangChain - RAG framework & agent orchestration
├─ LangChain_postgres - For PGVectorStore,PGEngine for async doc ingestion
├─ Psycopg3 - PostgreSQL async driver
├─ RecursiveCharacterTextSplitter - Document chunking
└─ PyPDFLoader - PDF document loading
```Configuration
The system requires a config/config.json file with the following structure:

Data Flow
1. Ingestion Flow:
User selects "Document Ingestion" → Provides document name
RAGUtility.create() initializes vector store and record manager
ingest_pdfs_to_pgvector() loads documents via load_documents()
Documents are split into chunks using RecursiveCharacterTextSplitter
Chunks are embedded and indexed in PostgreSQL vector table
HNSW index is optimized if needed
Table cache is updated

Note that num_skipped: 0, so if a user tries to re-upload the same document < name should remain same>, RecordManager will check for chunks <metadata+content> and skip those which are duplicates. 
An example of how the langchain-postgres v2 table schema and vector embedding looks like. Note there is an upsertion_record table, RecordManager stores its data in this dedicated table within PostgreSQL database and this table acts as a "lookup ledger" to track the state of our documents separate from the actual vector embeddings.
2. Query Flow:
User selects "Q&A" → Selects collection and enters query
RAGUtility.create() initializes vector store for selected collection
AgentOrchestrator is initialized with user query
Agent decides whether to call search_knowledge_base tool
retrieve_docs_from_pgvector() fetches relevant document chunks
LLM generates response based on retrieved context and user query
Response is returned to user


Key Features
Intelligent Document Processing: Handles single PDFs, directories, and ZIP archives
Semantic Search: Uses vector embeddings for context-aware document retrieval
Agent-Driven: LLM decides whether to consult knowledge base or respond directly
Production Database: PostgreSQL with HNSW vector indexing for scalability
Caching System: Maintains cache of available collections for quick access
Async Operations: Non-blocking database operations for better performance
Comprehensive Logging: Color-coded console and file logging for debugging


Comments