Building Continuously Updating RAG Applications

Use native stream processing and vector search in MongoDB Atlas to continuously update, store, and search embeddings through a unified interface.

Use cases: Gen AI

Industries: Finance, Healthcare, Retail

Products: Atlas, Atlas Vector Search, Atlas Stream Processing

Partners: Confluent, AWS

Solution Overview

Whether organizations leverage AI to optimize business processes or enhance customer-facing applications, providing AI models with up-to-date data is essential to delivering a differentiated experience. While retrieval-augmented generation (RAG) systems enable organizations to ground large language models (LLMs) easily and foundational models with the truth of their proprietary data, keeping that data fresh adds another level of complexity.

By continuously updating vector embeddings, the core of RAG systems, AI models have up-to-date data to provide pertinent and accurate answers. Additionally, different embedding models may offer higher levels of accuracy depending on their primary purpose. Take, for example, an embedding model trained primarily on a specific language, such as Japanese or Simplified Chinese, instead of a more popular model that might have general knowledge of several languages. The specialized model will likely create embeddings that enable the foundation model or LLM to output content more accurately.

This solution addresses the issue of continuously updating and routing the creation of vector embeddings in a RAG system. By leveraging MongoDB Atlas Stream Processing and MongoDB Atlas Vector Search, both native capabilities in MongoDB Atlas, this solution walks developers through continuously updating, storing, and searching embeddings with a single interface.

While this solution demonstrates creating vector embeddings of song lyrics in different languages, the scenario is relevant to many industries and use cases, including:

  • Financial services: Financial documents, legal policies, and contracts often use multiple languages and differ based on country regulations. Empowering loan officers with an AI-powered interface for expediting loan creation can optimize banking workflows; however, the optimization will only benefit as much as the data is relevant and fresh.

  • Healthcare and Insurance: From constantly updating patient records to AI-powered underwriting of insurance policies, it’s important that any RAG system that optimizes these processes has access to the latest information.

  • Retail: Personalizing retail experiences by delivering the right offer at the right time to the right customer is critical. However, consider the many languages that shoppers might use and product descriptions that have to match. Routing up-to-date, contextual data to the most accurate embedding model can improve these experiences.

Reference Architectures

With MongoDB

  • MongoDB: With a MongoDB cluster deployed in Atlas, it allows you to store the lyrics and related information under the same document (tags, vectors, etc.). Additionally, Atlas provides a vector index to support semantic searches using the MongoDB Aggregation Framework.

  • Atlas Stream Processing: This Stream Processing Instance subscribes to the events generated by MongoDB, filters the relevant information, transforms the events, and emits them to the corresponding Kafka topic. Additionally, it will subscribe to the Kafka cluster to update the documents that change.

  • Confluent Kafka cluster: This managed Kafka cluster will receive the new documents and updates from current documents to be processed. Additionally, the events received by the processor will be directed to Atlas Stream Processing.

  • Metadata service:

    • Embedding generator: Python script that subscribes to the Kafka input topics (both Spanish and English). For each message received, it reads the lyrics and generates an embedding using a model specific for each language.

    • Tags extractor: Python script that extracts the tags from the lyrics (the 10 most common nouns) and adds it to the resulting event.

Figure 1. Scalable vector updates reference architecture with MongoDB

Data Model Approach

{
   "title": "Hurricane",
   "genre": "rock",
   "artist": "Bob Dylan",
   "year": 1976,
   "views": 307418,
   "lyrics": "...",
   "language": "en",
   "duration": 61,
   "lyrics_embeddings_en": [...],
   "tags": ["man", "story", "night"]
}

The data we currently have about the song consists of the following fields:

  • Title: Name of the song

  • Genre: a single-worded string containing a music style from a list of 6 genres

  • Artist: Name of the artist

  • Year: The year in which the song was written

  • Views: Number of times the song has been listened to

  • Lyrics: A string field containing the lyrics with each line separated by a new line delimiter

  • Language: The language of the lyrics in ISO-369. We are only storing songs in English and Spanish.

  • Duration: Duration of the song in seconds

  • Lyrics embedding vector: language-specific embeddings vector

  • Tags: A list of tags associated with the lyrics

The benefit of using the document data model is that it allows you to store all the related information of a song in a single document for easy and fast retrieval.

Building the Solution

In the GitHub repository you will find detailed instructions on how to build the solution to update your embeddings asynchronously and at scale, leveraging MongoDB Atlas.

Create a MongoDB cluster

The first step is to create a MongoDB cluster. If you don't have an Atlas account, create one following the steps in this link: https://www.mongodb.com/docs/guides/atlas/account/.

We will create a cluster in Atlas using AWS as our cloud provider and us-east-1 as our region. Additionally, create an Atlas Stream Processing Instance (SPI) following the instructions in the documentation: https://www.mongodb.com/docs/atlas/atlas-sp/manage-processing-instance/.

Create a Kafka cluster in Confluent

To create a Kafka cluster in Confluent Cloud follow the instructions in their documentation: https://docs.confluent.io/cloud/current/clusters/create-cluster.html#create-ak-clusters.

Once you have created the cluster, go to cluster settings and copy the bootstrap URL.

Figure 2. Kafka cluster settings

Then, create an API key to connect to your cluster.

Figure 3. API key settings

The next step is to configure the topics for use in this solution: SpanishInputTopic, EnglishInputTopic, and OutputTopic.

Figure 4. Topic settings

Configure the Stream Processing connection registry

To configure a new connection, click the configure button in the Stream Processing Instance, then click Connection Registry and add a new connection.

You will use this to connect the Atlas Stream Processing Instance with the Kafka Cluster.

Once you have created your Kafka cluster, Confluent will provide you with the bootstrap server URL, username, and password for the Connection Registry.

Figure 5. Stream Processing connection registry settings

Next, create a connection from the Atlas Stream Processing Instance to the MongoDB Atlas cluster.

Figure 6. Stream Processing to Atlas settings

Connect to the Stream Processing instance

To configure the pipelines and connections in the Stream Processing Instance, you can connect to the cluster using the Mongo Shell (mongosh).

When clicking on the Connect button in the Stream Processing Instance, the Atlas UI provides instructions on connecting to the instance.

Figure 7. Connect to Stream Processing

Configuring Atlas Stream Processing

You can follow the steps to configure Atlas Stream Processing in the README file in the GitHub repo. There you will learn how to create the pipelines to subscribe to changes in MongoDB, emit to each language-specific topic, and merge the events containing the processed data with the embeddings received from the Kafka cluster into MongoDB using a MongoDB aggregation stage.

Create the Atlas Vector Search indexes

Next, you will create language-specific vector indexes in Atlas Search.

Visit the Atlas Vector Search Quick Start guide and start building smarter searches.

The definition for the Atlas Vector Search Index for Spanish is as follows:

{
   "fields": [
      {
         "type": "vector",
         "path": "lyrics_embeddings_es",
         "numDimensions": 768,
         "similarity": "cosine"
      }
   ]
}

The definition for the Atlas Vector Search Index for English is as follows:

{
   "fields": [
      {
         "type": "vector",
         "path": "lyrics_embeddings_en",
         "numDimensions": 384,
         "similarity": "cosine"
      }
   ]
}

Run the metadata service

The metadata service is a Python script that will subscribe to the input topics, create the tags and embeddings for the corresponding language according to the information received in the event, and write the event to the output topic.

Run a semantic search

We created a script in Python to help you interactively run semantic queries. You can find the script in the repository under the client folder.

Key Learnings

  • Maintain embedding relevancy: Regularly update data embeddings to ensure your semantic searches remain accurate, especially if your documents change frequently.

  • Optimize language-model pairing: To maximize semantic search accuracy, ensure your large language model (LLM) closely aligns with the language of your data to significantly enhance the relevance and precision of your search results.

  • Embrace flexible embeddings: MongoDB's flexible data model eliminates the need for rigid schema definitions. This flexibility allows you to store embeddings directly alongside your data, regardless of their length or the model used to generate them.

  • Choose the right similarity function: The effectiveness of your semantic searches depends on the chosen similarity function. Tailor your selection to your specific use case.

  • Asynchronous embedding generation: Generating embeddings can be computationally expensive. Consider running this task asynchronously to avoid impacting your application's performance. Leverage the cloud's elasticity by horizontally scaling the functions responsible for embedding generation to handle bursts in workload.

Technologies and Products Used

MongoDB Developer Data Platform

Partner Technologies

  • Confluent Cloud

  • AWS EC2

Author

David Sanchez, MongoDB

This is a copy of the original solution published at MongoDB: https://www.mongodb.com/docs/atlas/architecture/current/solutions-library/rag-applications/