
Introduction
RAG (retrieval augmented generation) systems have become quite popular in recent times. These systems are designed to combine the benefits of LLMs and information retrieval. The idea is to first retrieve relevant documents from a large corpus and then use the retrieved documents to generate the final response. This approach is particularly useful when the answer to a question is not present in the training data of the LLM and it allows for significantly reduce LLM hallucinations.
There are several potential issues that might arise when scaling up RAG systems. In this cookbook we will focus on one such issue: ingestion of large amounts of document data. We will show how you can use Ragbits and Ray to easily scale up the ingestion of document data. Beside these elements we will also use Qdrant and VLLM (in case of GPU-assisted deployment).
High level overview
In this cookbook, we will show you how to use Ragbits and Ray to ingest large amounts of document data into Qdrant vector database. As an example input data we will use several WHO documents (all cancer related), but this approach can be easily extended to any other document data. We will show you two different approaches to deployment: on-premises and cloud-based. This specific cookbook will focus on the on-premises deployment, while the cloud-based deployment will be covered in a separate cookbook.
Tools used
Ragbits
Ragbits is an open-source modular framework by deepsense.ai that streamlines the development of AI applications by supporting over 100 large language models and various vector stores. It processes more than 20 document formats and offers a powerful command-line interface for efficient execution, prompt testing, and vector store management. With built-in observability and integrated testing tools, Ragbits enhances monitoring and evaluation, simplifying the deployment process.
Ray
Ray is a fast, simple, and popular framework for building and running distributed workloads. It provides a simple API for building distributed programs and is designed to be scalable and efficient. Ray is widely used in the machine learning community for scaling up training and inference workloads.
Qdrant
Qdrant is a vector database and search engine. It provides a fast and scalable solution for storing, searching, and managing vectors with additional payload. Qdrant supports various distance metrics and filtering capabilities, making it ideal for building efficient vector search applications like semantic search, recommendation systems, and other AI-powered solutions.
VLLM
vLLM is an open-source inference and serving engine designed for efficient deployment of large language models (LLMs). It utilizes PagedAttention to optimize memory management, significantly reducing memory waste and enabling higher throughput compared to frameworks like Hugging Face Transformers. With features such as continuous batching, OpenAI-compatible APIs, and support for distributed inference, vLLM provides a scalable, cost-efficient solution for real-time and large-scale AI applications.
Pre-requisites
Before we start, make sure you have the following installed on your machine:
- Docker
- Docker Compose
- Python 3.12 or higher
- Nvidia Container Toolkit (if you have a GPU and want to use it for inference)
On-premises deployment
In this approach, the entire system can be set up on your local machine or group of machines. We will start small by setting up the entire infrastructure on a single machine, using Docker Compose. This will allow you to test the system locally before deploying it on a larger scale.
In general we will set up a system that consists of the following containers:
- Qdrant DB
- Ray cluster head
- Ray cluster worker (x3)
- LLM inference server (only if you have a GPU)
Step 0: Choose whether to use a local or external LLM provide API keys
Before we start, you need to decide whether you want to use a local LLM:
- If you want to use a local LLM, you need a GPU and an API key for Hugging Face Hub, so the model can be downloaded.
- If you want to use an OpenAI model via API, you need to provide the API key for OpenAI endpoint.
Please choose one of the following options and provide the necessary API key in the next cell:
USE_GPU = False HUGGING_FACE_HUB_TOKEN="" OPENAI_API_KEY="" if USE_GPU: assert HUGGING_FACE_HUB_TOKEN, "Please set HUGGING_FACE_HUB_TOKEN" else: assert OPENAI_API_KEY, "Please set OPENAI_API_KEY"
Step 1: Set up the environment
Create Docker Compose containers based on the following compose YML (it’s a copy of the docker-compose.yml
file):
services: qdrant: image: qdrant/qdrant:latest ports: - "6333:6333" # API port - "6334:6334" # Web UI port volumes: - qdrant_data:/qdrant/storage environment: - QDRANT_ALLOW_RECOVERY_ON_CORRUPTED_DB=true healthcheck: test: ["CMD", "curl", "-f", "http://localhost:6333/readiness"] interval: 5s timeout: 5s retries: 3 start_period: 5s ray_head: build: context: . dockerfile: Dockerfile.ray ports: - "8265:8265" # Ray dashboard - "10001:10001" # Ray client server - "6379:6379" # Redis command: > ray start --head --port=6379 --redis-password=password --dashboard-host=0.0.0.0 --num-cpus=0 --block healthcheck: test: ["CMD", "ray", "status"] interval: 10s timeout: 10s retries: 5 ray_worker: build: context: . dockerfile: Dockerfile.ray depends_on: - ray_head deploy: replicas: 3 command: > ray start --address=ray_head:6379 --redis-password=password --block environment: - OPENAI_API_KEY=$OPENAI_API_KEY llm: image: vllm/vllm-openai:v0.7.2 restart: always ipc: host profiles: - gpu volumes: - ~/.hf_cache:/root/.cache/huggingface/hub ports: - "8000:8000" environment: - HUGGING_FACE_HUB_TOKEN=$HUGGING_FACE_HUB_TOKEN - PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True deploy: resources: reservations: devices: - driver: nvidia device_ids: ["all"] capabilities: [ gpu ] command: - "--model=alpindale/Llama-3.2-11B-Vision-Instruct" - "--dtype=half" - "--tensor-parallel-size=1" # If you have multiple GPUs, you might want to adjust this value - "--gpu-memory-utilization=0.9" - "--max-num-seqs=20" - "--max-model-len=10000" - "--tokenizer-mode=auto" - "--config-format=auto" - "--load-format=auto" - "--tool-call-parser=pythonic" - "--chat-template=examples/tool_chat_template_llama3.2_pythonic.jinja" - "--enable-auto-tool-choice" volumes: qdrant_data:
As you might notice, we use a custom Dockerfile for Ray nodes. It’s necessary, because we need to install several system dependencies (beside Python libraries) for the processing to work. Here you can find the content of Dockerfile.ray
:
FROM rayproject/ray:latest-py312 USER root # Install dependencies RUN apt update && \ apt install -y \ libgl1 \ libglib2.0-0 \ tesseract-ocr \ libtesseract-dev \ poppler-utils && \ apt clean && \ rm -rf /var/lib/apt/lists/* # Install Python packages RUN pip install \ "ragbits-core[qdrant,fastembed]==0.13.0" \ "ragbits-document-search[ray]==0.13.0" \ "unstructured[pdf]==0.17.2" USER 1000 # Set environment variable to suppress Docker CPU warning ENV RAY_DISABLE_DOCKER_CPU_WARNING=1
Finally, you can deploy the containers simply by running docker compose --profile gpu up -d
(omit profile if using only CPU).
In fact, you can do it inside this notebook by running the next cell. Note the cell will take some time to run as it will download and build the necessary Docker images and set up the containers. The containers will then run in the background.
if USE_GPU: !HUGGING_FACE_HUB_TOKEN={HUGGING_FACE_HUB_TOKEN} docker compose --profile gpu up -d print("GPU-powered deployment has been started") else: !OPENAI_API_KEY={OPENAI_API_KEY} docker compose up -d print("CPU-only deployment has been started")
Step 2: Prepare ingestion job
There are several ways you can work with Ray clusters, but not all of them are created equal. While it might be tempting to use Ray Client directly in a notebook, it is not recommended for production workloads and can result in version mismatch issues. Instead, we will use a more robust approach by creating a Python script that will be executed on the Ray cluster as a Ray Job.
Document search client
First, we need to define the document search client. We’ll keep it in a separate document_search.py
file to make it easier to reuse in e.g. this notebook:
from qdrant_client import AsyncQdrantClient from ragbits.core.embeddings.fastembed import FastEmbedEmbedder from ragbits.core.llms import LLM from ragbits.core.vector_stores.qdrant import QdrantVectorStore from ragbits.document_search import DocumentSearch from ragbits.document_search.documents.element import ImageElement from ragbits.document_search.ingestion.enrichers.image import ImageElementEnricher from ragbits.document_search.ingestion.strategies import RayDistributedIngestStrategy def get_document_search(qdrant_host: str, image_description_llm: LLM) -> DocumentSearch: embedder = FastEmbedEmbedder(model_name="BAAI/bge-base-en") vector_store = QdrantVectorStore( embedder=embedder, client=AsyncQdrantClient(qdrant_host), index_name="documents", ) image_enricher_routing = { ImageElement: ImageElementEnricher(llm=image_description_llm) } processing_strategy = RayDistributedIngestStrategy() return DocumentSearch( vector_store=vector_store, ingest_strategy=processing_strategy, enricher_router=image_enricher_routing, )
The script above defines a function to instantiate a DocumentSearch instance, which facilitates both document ingestion into Qdrant and subsequent retrieval operations. Let’s examine the key components that make up this implementation:
FastEmbedEmbedder
is used to embed the documents into vectors. We use theBAAI/bge-base-en
model, but you can use any other model supported byfastembed
or use any other embedder supported by Ragbits.QdrantVectorStore
is basically a wrapper around Qdrant client that handles the embedding and indexing of the documents. Just like with the embedder, Ragbits supports several vector stores.image_enricher_routing
is a dictionary that maps document elements to their respective handlers. In this case, we overwrite the default handler forImageElement
to use a specific LLM model for image description.RayDistributedIngestStrategy
is a strategy that allows you to distribute the ingestion process across multiple Ray workers.
Ingestion job
As soon as we have the client, we can work on the ingestion script. Let’s call it ingest_files.py
. The script will look like this:
import argparse import asyncio import pprint from ragbits.core.llms import LiteLLM from ragbits.document_search.documents.sources import WebSource from document_search import get_document_search urls = [ "https://iris.who.int/bitstream/handle/10665/379245/9789240101050-eng.pdf", "https://iris.who.int/bitstream/handle/10665/379225/9789240100954-eng.pdf", "https://iris.who.int/bitstream/handle/10665/378875/9789290211709-eng.pdf", ... # Full list available in the source file ] headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:136.0) Gecko/20100101 Firefox/136.0" } documents = [WebSource(url=url, headers=headers) for url in urls] def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--use_gpu", action="store_true") return parser.parse_args() async def main(use_gpu: bool = False): if use_gpu: llm = LiteLLM( model_name="hosted_vllm/alpindale/Llama-3.2-11B-Vision-Instruct", base_url="http://llm:8000/v1", use_structured_output=True, # Necessary, otherwise the model will not report the vision capabilities custom_model_cost_config={ "alpindale/Llama-3.2-11B-Vision-Instruct": { "litellm_provider": "hosted_vllm", "supports_vision": True, } } ) else: llm = LiteLLM(model_name="openai/gpt-4o-mini", use_structured_output=True) document_search_client = get_document_search("http://qdrant:6333", llm) results = await document_search_client.ingest(documents) pprint.pprint(results) if __name__ == "__main__": args = parse_args() asyncio.run(main(args.use_gpu))
The script first defines a list of documents that we want to ingest. In this case, we use the WHO website to get documents about cancer.
Note that Ragbits supports various document sources, including local files, URLs, and text strings and more. WebSource is by far the easiest to work with on Ray clusters, as you don’t need to set up any additional file sharing (e.g. Docker volumes or NFS).
The script then selects GPU/remote LLM variant, creates the DocumentSearch client and ingests the documents into Qdrant. In the end the ingestion results are printed to the console.
Keep in mind Ragbits is designed with asynchrony in mind, that’s why we need to use asyncio.run
to run the main function.
Also, take a look at the address we’re using to connect to Qdrant and local LLM server. These addresses must be reachable from the Ray cluster, so in this case they’re container names (qdrant
and llm
).
Step 3: Submit the ingestion job
Now that we have the ingestion script ready, we can submit it to the Ray cluster. You can do this by using Ray Job Submission API or CLI. In this case, we will use the Python API, which we can run directly in this notebook.
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient("http://localhost:8265") job_id = client.submit_job( entrypoint=f"python ingest_files.py {'--use_gpu' if USE_GPU else ''}", runtime_env={"working_dir": "./"} )
As you can see it’s actually quite simple to submit a Ragbits ingestion job to the Ray cluster. The only thing you need to keep in mind is to correctly specify the pip libraries that need to be installed on the Ray workers. For simple, one-off jobs, you can use the pip
field of runtime_env
argument of the ray.submit
function. For more complex jobs, you might want to use a tailored Docker image, just like the one we defined before.
Let’s wait for the job to finish before we move on:
from ray.job_submission import JobStatus import time def wait_until_status(job_id, status_to_wait_for, timeout_seconds=3600): start = time.time() while time.time() - start <= timeout_seconds: status = client.get_job_status(job_id) print(f"status: {status}") if status in status_to_wait_for: break time.sleep(5) wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
Note that even if you close the notebook, the job will continue running on the Ray cluster (this is one of the main advantages of Ray Jobs). You can check the status of the job by using the Ray dashboard or the Ray CLI.
Step 4: Retrieve the documents
Now that we have ingested the documents into Qdrant, we can check if the retrieval works as expected. We can do this by querying Qdrant for the documents and then printing the results. We will create a DocumentSearch
object and use it to retrieve the documents:
from ragbits.core.llms import LiteLLM from document_search import get_document_search from pprint import pprint if USE_GPU: llm = LiteLLM( model_name="alpindale/Llama-3.2-3B-Instruct", base_url="http://localhost:8000", use_structured_output=True ) else: llm = LiteLLM( model_name="openai/gpt-4o-mini", use_structured_output=True ) search_client = get_document_search("http://localhost:6333", llm) pprint(await search_client.search("cancer"))
A keen eye will notice that even though we are using the same get_document_search
function, we provide different addresses to the Qdrant instance and the LLM. This is because the retrieval process is done on the local machine, not on the Ray cluster.
With a working retrieval you are able to connect your vector search engine with any LLM, but this is out of scope of this cookbook – look up one of our other Ragbits cookbooks to check how easy it is!
Authors
Paweł Chmielak
Staff Machine Learning Engineer
