Reputation: 555
I have built a RAG application with Langchain and now want to deploy it with FastAPI. Generally it works tto call a FastAPI endpoint and that the answer of the LCEL-chain gets streamed. However I want to achieve that my answer gets streamed and if streaming is done I want to return the source documents. Here is the code, where streaming is working when calling the endpoint. At the moment I am yielding the source_documents but I don't want the user to see them. I would like to preprocess the source_documents before the user sees them:
# example endpoint call: `http://127.0.0.1:8000/rag_model_response?question=Welche%203%20wesentlichen%20Merkmale%20hat%20die%20BCMS%20Leitlinie%3F`
# this example call streams the response perfectly in the browser
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large-instruct", model_kwargs={'device': "mps"})
db = FAISS.load_local("streamlit_vectorstores/vectorstores/db_maxiw_testfreitag", embeddings, allow_dangerous_deserialization=True)
retriever = db.as_retriever(search_kwargs={'k': cfg.STREAMLIT_VECTOR_COUNT, 'score_threshold': cfg.SCORE_THRESHOLD,'sorted': True}, search_type="similarity_score_threshold")
model_path = cfg.MIXTRAL_PATH
llm = build_llm(model_path) # loads a model from Llamacpp with streaming enabled
def rag_model_response(question: str):
start_time = time.time()
context = retriever.get_relevant_documents(question)
response_dict = {"question": question, "result": "", "source_documents": []}
rag_prompt = f"""<s> [INST] Du bist RagBot, ein hilfsbereiter Assistent. Antworte nur auf Deutsch:
{context}
{question}
Antwort: [/INST]
"""
result_content = ""
first_response = True
for resp in llm.stream(rag_prompt):
if resp:
result_content += resp
if first_response:
# Calculate and print time after the first batch of text is streamed
end_time = time.time()
elapsed_time = round(end_time - start_time, 1)
first_response = False
yield f"(Response Time: {elapsed_time} seconds)\n"
yield resp
if context:
# yield context # hier aufgehört
yield "\n\nQuellen:\n"
for i, doc in enumerate(context):
yield doc.metadata["source"].split("/")[-1] + ", Seite: " + str(doc.metadata["page"]+1) + "\n\n"
response_dict["source_documents"] = [{"source": doc.metadata["source"], "page": doc.metadata["page"]+1} for doc in context]
else:
yield "\n\nVorsicht, für die vorliegende Antwort wurden keine interne Quellen verwendet, da die Suche nach relevanten Dokumenten kein Ergebnis geliefert hat."
yield response_dict
app = FastAPI(
title="FastAPI for Database Management",
description="An API that handles user Vectordatabase creation or deletion",
version="1.0",)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get('/rag_model_response',response_class=JSONResponse)
async def main(question: str):
return StreamingResponse(rag_model_response(question), media_type='text/event-stream')
So my first question would be:
One alternative solution, which is not very effective I think, was that I just create a new endpoint that returns the source documents:
@app.get('/source_documents')
async def source_documents(question: str):
source_docs = retriever.get_relevant_documents(question)
return source_docs
But with this it always gets searched 2 times for every question, one time for the chain and one time for the retriever.
Thanks in advance!
Upvotes: 3
Views: 4284
Reputation: 1188
Have you seen this: https://python.langchain.com/docs/use_cases/question_answering/streaming/
First off, the chain is built so that it also return the retrieved documents - so if you call invoke
, the contexts is also included in the results.
Next, when you are streaming instead, if would stream the same values. You can see in the output example the results contains the context.
On the client side you will have to know to handle the data you receive appropriately (assuming you want to do something different with each part) - there an example code in python just below the output that does this aggregation into separate parts that you can then use.
I hope I understood the question and this answer helps...
Upvotes: 1