FeckNeck
FeckNeck

Reputation: 198

Langchain RAG mix chat history and sources return

I'm developing a small api using langChain and FastAPI and I'm having trouble mixing chat history and returning the source. I can't find an example in the documentation or that mixes the two. I've seen that ConversationRetrievalChain allows you to return the source and add memory, but I don't think it works with RAG and custom templates. Has anyone encountered the same difficulties, and if so, have you found a solution?

from fastapi import FastAPI
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain.embeddings import OpenAIEmbeddings
from langchain_core.runnables import RunnableParallel
import pandas as pd

model = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.7)
embeddings = OpenAIEmbeddings()

vectorstore = FAISS.load_local("products_metadata", embeddings, allow_dangerous_deserialization=True)
retriever = vectorstore.as_retriever()

products = pd.read_json('./data/products.json', orient='records', lines=True)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

app = FastAPI(
    title="LangChain Server",
    version="1.0",
    description="Spin up a simple api server using Langchain's Runnable interfaces",
)

ANSWER_TEMPLATE ="""Answer the question with an introductory sentence and use the following contextual elements to answer the question. Format the context as follows: - product name, brand, country, returning to the line between each product.   
Context: {context}   
Question: {question}  
Answer: """

ANSWER_PROMPT = ChatPromptTemplate.from_template(ANSWER_TEMPLATE)

rag_chain_from_docs = (
    RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
    | ANSWER_PROMPT
    | model
    | StrOutputParser()
)

rag_chain_with_source = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
).assign(answer=rag_chain_from_docs)

from pydantic import BaseModel
class Question(BaseModel):
    question: str

@app.post("/invoke")
def test(question: Question):
    test = rag_chain_with_source.invoke(question.question)
    return test

@app.get("/products")
def get_products():
    return products.to_dict(orient='records')

@app.get("/products/{product_id}")
def get_product(product_id: int):
    product = products[products['code'] == product_id]
    return product.to_dict(orient='records')

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="localhost", port=8000)

Upvotes: 1

Views: 376

Answers (1)

Nikster
Nikster

Reputation: 474

I have run into the same issue as you have but I think I've found a solution that might be able to help you.

I have done it a little differently as I am using AWS Bedrock and AWD Bedrock Knowledge Base for my vector store

It is also important to note that your initial prompt for you system should include the context var to inject your retrieved data into. There was no need for me to include the question as I was streaming the data via the actual method to invoke the model.

I will also include some blogs and documents I read to help me get to this stage of my code as I'm not 100% knowledgeable about langchain and why this works.

initial_prompt = """ You are a helpful AI tool to answer questions {context} """

Message Var Example Data:

[ 
  {
    'role': 'system', 
    'content': 'You are a helpful AI tool to answer questions' 
  }, 
  { 
    'role': 'user',
    'content': 'Hello, how are you?'
  } 
]
Helper functions

def get_retriever():
    return AmazonKnowledgeBasesRetriever(
        knowledge_base_id=awssettings.aws_knowledge_base_id,
        client=BEDROCK_AGENT_RUNTIME,
        retrieval_config={
            "vectorSearchConfiguration": {
                "numberOfResults": 3,
            },
        },
    )


def get_model(
    options={
        "streaming": True,
        "model_kwargs": {},
        "callbacks": []
    }
):

    model_kwargs = {**MODEL_KWARGS, **options.get("model_kwargs", {})}

    return ChatBedrock(
        client=BEDROCK_RUNTIME,
        model_id=MODEL_ID,
        streaming=options.get("streaming", True),
        model_kwargs=model_kwargs,
        callbacks=options.get("callbacks", []),
    )

Next is an example of how I formatted my data to be able to use it with the ChatPromptTemplate

    messages = [x.context for x in interactions] // this data looks like the above example
    formatted_messages = [(item['role'] if item['role'] != 'user' else 'human', item['content']) for item in messages]
    chat_template = ChatPromptTemplate.from_messages(formatted_messages)

I then retrieve my model and retriever

    model = get_model()
    retriever = get_retriever()
def filter_retriever_data(retriever_data, uri_criteria):
    def uri_meets_criteria(uri):
        return uri_criteria in uri

    filtered_context = [
        doc for doc in retriever_data['context']
        if uri_meets_criteria(doc.metadata['location']['s3Location']['uri'])
    ]
    retriever_data['context'] = filtered_context

    return retriever_data


def filter_retriever_data_wrapper(uri_criteria):
    def wrapper(retriever_data):
        return filter_retriever_data(retriever_data, uri_criteria)
    return wrapper

The functions above are just to filter the data that gets returned by the retriever based on the Document URI in S3.

and to be completely honest about this next part I have no idea how it works. It was a combination of multiple blog posts and help from chatgpt

But what I do know for sure that it's running all the code in parallel and then combining it together to then finally be able to prompt my model or something like that.

    chain = (
        RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
        .assign(filtered_context=filter_retriever_data_wrapper(account_uuid))
        .assign(response=chat_template | model | StrOutputParser())
        .pick(["response", "context"])
    )

and then i stream the data back to my app

    question = messages[-1].get('content', '') // last message in my messages is the question the user has asked the AI

    for chunk in chain.stream(question):
        if 'response' in chunk:
            streamed = chunk.get('response', None)

            if streamed is None:
                continue

            response += streamed

REFS:

https://medium.com/@dminhk/knowledge-bases-for-amazon-bedrock-with-langchain-%EF%B8%8F-6cd489646a5c

https://python.langchain.com/v0.1/docs/use_cases/question_answering/streaming/

Upvotes: 0

Related Questions