Reputation: 109
I'm building a very simple LangChain application that takes as an input a customer feedback string and categorizes it into the following pydantic class:
class AnalysisAttributes(BaseModel):
overall_positive: bool = Field(description="<sentiment is positive overall>")
mentions_pricing: bool = Field(description="<pricing is mentioned>")
mentions_competition: bool = Field(description="<competition is mentioned>")
parser = PydanticOutputParser(pydantic_object=AnalysisAttributes)
Here's how this should work, and it does:
full_pipeline = prompt | model | parser
output = full_pipeline.invoke({"feedback": "This bad company is very expensive."})
expected_output = AnalysisAttributes(overall_positive=False, mentions_pricing=True, mentions_competition=False)
assert output == expected_output. # this works! :)
This works very well, all good so far! Let's serve it:
app = FastAPI(
title="LangChain Server",
version="1.0",
description="A simple api server using Langchain's Runnable interfaces",
)
pipeline = prompt | model | parser
add_routes(app, pipeline, path="/categorize_feedback")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="localhost", port=8000)
Now comes the strange part, check this out. On the client side, streaming works:
response = requests.post(
"http://localhost:8000/categorize_feedback/stream/",
json={'input': {'feedback': 'Prices are too high.'}}
)
for chunk in response:
print(chunk.decode())
# event: metadata [...] data: {"overall_positive":false, ...
But the regular invoke does not work, it delivers an empty output:
response = requests.post(
"http://localhost:8000/categorize_feedback/invoke/",
json={'input': {'feedback': 'Prices are too high.'}}
)
print(response.json())
# {'output': {}, 'callback_events': [], 'metadata': {'run_id': 'acdd089d-3c80-4624-8122-17c4173dc1ec'}}
I'm pretty sure the problem is the output parser, as if I remove it from the chain, everything works perfectly fine, the invoke as well.
EDIT:
Currently using a workaround using FastAPI directly:
@app.post('/categorize_feedback')
async def categorize_feedback(input_data: FeedbackInput) -> AnalysisAttributes:
full_pipeline = prompt | model | parser
output = full_pipeline.invoke(input_data.model_dump())
return output
Upvotes: 0
Views: 903
Reputation: 471
Maybe it is not the problem of parser. More than a problem of generation method. Stream returns one by one, but invoke returns when all inference finished.
Upvotes: 0