Building a Production-Grade RAG Engine with LangChain, Postgres, and HNSW Indexing.
- Neha Singh
- 3 days ago
- 6 min read
Updated: 2 hours ago
An AI-powered Retrieval-Augmented Generation (RAG) system that serves as an intelligent document research assistant. The project implements an orchestrated agent architecture with a comprehensive RAG pipeline for ingesting PDF documents and retrieving relevant information from a PostgreSQL vector database.
Most RAG systems suffer from duplicate records and slow retrieval as they grow. I’ve developed a robust RAG orchestrator that solves the most common 'Day 2' database challenges. By integrating the langchain SQLRecordManager for duplicate vector embedding prevention and PGVECTOR’s HNSW indexing for high-speed retrieval, the system ensures data integrity and performance at scale. It features an interactive CLI for seamless document ingestion with surgical cleanup of old records, and optimized Q&A over multiple document collections. No more duplicate chunks, no more slow scans—just a robust, local-first AI storage engine.
Project Overview
AgenticRAG demonstrates a production-grade RAG implementation with the following core components:
Architecture
The system is built on the following architecture:
Postgres Database: For local development I have spinned up a docker image to run postgres database as a container in my machine. You can do it too using Docker Desktop.
docker run --name local-postgres -e POSTGRES_PASSWORD=xxxxxxxx -p 5432:5432 -d postgresVerify through WSL < Ubuntu>:
psql -h localhost -p 5432 -U postgres -d postgres -W
PDF Ingestion Pipeline: Automatically reads PDF files from designated directories
Embedding Generation: Converts documents into embeddings using OpenAI's text-embedding-ada-002
Vector Database: Stores embeddings in PostgreSQL with HNSW vector indexing
Intelligent Retrieval: Fetches relevant documents based on semantic similarity for user queries
Orchestrated Agent: Coordinates the entire workflow using LangChain agents with GPT-4o-mini
Core Components & Function Descriptions

1. Main Application (main.py)
Entry point for the RAG system with an interactive menu system.
Functions:
get_cached_tables()
Reads the cached list of vector tables from disk for immediate UI display
Returns a list of available collections
Returns empty list if cache file doesn't exist
main() (async)
Primary orchestration function that initializes the system and manages user interactions
Loads environment variables, database configuration, and system logger
Displays interactive menu with three options:
Document Ingestion: Ingest PDFs into the vector database
Q&A: Query the knowledge base
Exit: Terminate the application
Handles connection string construction for PostgreSQL
Manages the flow between RAGUtility and AgentOrchestrator based on user choice
2. Agent Orchestrator (agents/agent.py)
Manages the LLM-based agent that decides when and how to use the RAG tools.
Class: AgentOrchestrator
Constructor: __init__(rag_utility, query: str)Initializes the agent with a RAGUtility instance and user query
Sets up GPT-4o-mini as the LLM with temperature=0 for deterministic responses
Defines system prompt to guide agent behavior
Registers available tools (search_knowledge_base)
Methods:
_initialize_tools() → List[Any]Defines and returns the list of tools available to the agent
Currently implements one tool: search_knowledge_base
Tool definition uses LangChain's @tool decorator
search_knowledge_base(query: str) (tool)Consults the internal PDF documentation by querying the RAG system
Acts as a bridge between the LLM and the vector database
Returns relevant document chunks that match the query
chat() → strMain entry point for user interactions with the agent
Invokes the LangChain agent with the user query wrapped in HumanMessage
Logs all tool calls made by the agent for debugging
Extracts and returns the final text response from the agent
Handles exceptions gracefully with error logging
3. RAG Utility Module (tools/rag_utility.py)
Core engine for all RAG operations including document ingestion and retrieval.
Class: RAGUtility
Constructor: __init__(vector_store, record_manager, logger, connection_string, engine, db_url)Initializes RAG system with PostgreSQL vector store configuration
Stores references to LangChain vector store, record manager, and database connections
Class Methods:
@classmethod
async def create(cls,connection_string, table_name)Factory method that initializes the complete RAG pipeline
Creates or validates PostgreSQL vector table
Initializes embeddings using OpenAI's text-embedding-ada-002
Sets up Record Manager for document lifecycle tracking
Creates HNSW indexes for vector similarity search
Handles async PostgreSQL connections with proper error handling
Static/Async Methods:
_check_table_exists(db_url: str, table_name: str) → bool (async)Checks if a vector table exists in PostgreSQL
Queries information_schema to verify table presence
Returns boolean indicating table existence
_create_index(db_url: str, table_name: str) (async)Creates HNSW index for vector similarity search
Creates GIN index for metadata queries
Converts langchain_metadata column to JSONB type
Enables efficient vector and metadata-based queries
Instance Methods:
def _run_indexing(self,docs) → dictRuns the indexing pipeline for document chunks
Uses LangChain's index function with SQLRecordManager
Implements incremental cleanup to avoid duplicating existing content
Returns indexing statistics (num_added, num_updated, etc.)
async def _maintenance_task(self,table_name):Optimizes HNSW index after large batch ingestions
Reindexes concurrently to allow reads/writes during optimization
Prevents HNSW bloating from dead rows and nodes
async def update_table_cache(self):Saves the current list of vector tables to disk cache
Queries PostgreSQL for all vector-type columns
Stores table names in JSON cache file for immediate UI display
async def ingest_pdfs_to_pgvector(self,doc_name):Main PDF ingestion pipeline orchestrator
Loads documents from specified document name
Splits documents into chunks using RecursiveCharacterTextSplitter (1000 chunk size, 200 overlap)
Runs indexing pipeline <self._run_indexing(docs)> to store chunks in PostgreSQL
Triggers maintenance task <self._maintenance_task(doc_name)> if more than 1000 chunks were added
Updates table cache for UI availability <self.update_table_cache()>
Logs all operations for debugging
retrieve_docs_from_pgvector(user_query: str, k: int = 3) → strRetrieves relevant document chunks from the vector database
Performs semantic similarity search using the user's query
Returns top k results (default 3) concatenated as context string
Returns empty string on error with exception logging
Acts as the retrieval backend for the agent
4. Utility Modules
Provides consistent, color-coded logging across the application.
Class: ColorFormatter(logging.Formatter)
Custom formatter that adds ANSI color codes based on log level
Maps: DEBUG (cyan), INFO (green), WARNING (yellow), ERROR (red), CRITICAL (bold red)
Function: get_logger(name: str, level: int = logging.INFO) → logging.LoggerCreates and configures a logger instance with the specified name
Sets up console handler with ColorFormatter for terminal output
Sets up file handler for persistent logging (unless name starts with "test")
Creates logs directory automatically if it doesn't exist
Returns fully configured logger ready for use
config_loader.py
Manages configuration loading from JSON configuration files.
Function: load_config() → dictReads configuration from config/config.json
Loads and sets environment variables for:
OpenAI API key and base URL
PostgreSQL connection parameters (host, port, database, user, password)
Returns the complete configuration dictionary
Called at application startup to initialize system settings
data_ingestion.py
Handles PDF document loading from various sources.
Function: load_documents(doc_name: str = None) → List[Document]Intelligent document loader that handles multiple input formats:
Directory of PDFs: Uses PyPDFDirectoryLoader if folder exists
ZIP file: Extracts contents to data folder and loads all PDFs
Single PDF file: Uses PyPDFLoader for individual file
Not found: Returns empty Document with error logging
Converts all PDF pages into LangChain Document objects
Logs document loading progress and file count
Handles file path construction relative to project structure
Returns list of Document objects ready for chunking and ingestion
Configuration
The system requires a config/config.json file with the following structure:

Data Flow
Ingestion Flow:
User selects "Document Ingestion" → Provides document name
RAGUtility.create() initializes vector store and record manager
ingest_pdfs_to_pgvector() loads documents via load_documents()
Documents are split into chunks using RecursiveCharacterTextSplitter
Chunks are embedded and indexed in PostgreSQL vector table
HNSW index is optimized if needed
Table cache is updated

Note that num_skipped: 0, so if a user tries to re-upload the same document < name should remain same>, RecordManager will check for chunks <metadata+content> and skip those which are duplicates. 
An example of how the langchain-postgres v2 table schema and vector embedding looks like. Note there is an upsertion_record table, RecordManager stores its data in this dedicated table within PostgreSQL database and this table acts as a "lookup ledger" to track the state of our documents separate from the actual vector embeddings.
2. Query Flow:
User selects "Q&A" → Selects collection and enters query
RAGUtility.create() initializes vector store for selected collection
AgentOrchestrator is initialized with user query
Agent decides whether to call search_knowledge_base tool
retrieve_docs_from_pgvector() fetches relevant document chunks
LLM generates response based on retrieved context and user query
Response is returned to user


Key Features
Intelligent Document Processing: Handles single PDFs, directories, and ZIP archives
Semantic Search: Uses vector embeddings for context-aware document retrieval
Agent-Driven: LLM decides whether to consult knowledge base or respond directly
Production Database: PostgreSQL with HNSW vector indexing for scalability
Caching System: Maintains cache of available collections for quick access
Async Operations: Non-blocking database operations for better performance
Comprehensive Logging: Color-coded console and file logging for debugging


Comments