Reputation: 1101
I'm trying to implement a Server-Sent Events (SSE) streaming API using NestJS and Fastify, similar to the OpenAI's Create chat completion API. And I want my clients to receive the streamed data using axios with responseType: "stream"
just like the official API.
Here is what I've tried (the following code has simplified):
controller.ts
@Post("/chatgpt/chat")
@UseGuards(AuthGuard)
@HttpCode(200)
@Header("Content-type", "text/event-stream")
async chatWithPostStream(@Body() body: ChatGPTChatDto, @Req() fastifyRequest: FastifyRequest, ,@Res() fastifyResponse: FastifyReply) {
return await this.chatGPTService.chatPostStream(body);
}
service.ts
async chatWithPostStream(body: ChatGPTChatDto) {
const openaiKey = "sk-xxxxxxxxxxxxxxxxxxxxxx";
let chatGPTResponse;
try {
chatGPTResponse = await axios.post("https://api.openai.com/v1/chat/completions", {
model: "gpt-3.5-turbo",
messages: [{"role": "user", "content": "Hello!"}],//just for test, originally is `body.content`
stream: true
}, {
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${openaiKey}`
},
responseType: "stream"
});
} catch (e) {
console.error("ChatGPT API Error:", e);
}
let outputText = "";
chatGPTResponse.data.on("data", (chunk) => {
//TODO: send the same chunk to client
if (!chunk.toString("utf-8").includes("data: [DONE]")) {
try {
console.log("chunk text: ", chunk.toString("utf-8"));
let choice = JSON.parse(chunk.toString("utf-8").substring(5)).choices[0];
outputText += choice.delta.content ? choice.delta.content : "";
} catch (e) {
console.log("json parse error: ", e.message, "Original JSON:", chunk.toString("utf-8").substring(5));
}
} else {
console.log("Final output text", outputText);
}
});
chatGPTResponse.data.on("error", (e) => {
console.error("ChatGPT Error", e);
});
chatGPTResponse.data.on("end", () => {
console.log("ChatGPT End");
});
}
Over the past few days, I have attempted to implement this feature using different ways. Unfortunately, all of my attempts have been unsuccessful.
What should I do to create a streaming API just like the official API so that I can use axios in the web client to listen the stream data in the same way?
Upvotes: 5
Views: 4389
Reputation: 19
You can use @Sse directive to send any streamed data. Use RxJS Observer to push values. Here's an example code (I am using OpenAI Nodejs SDK):
In controller.ts
@Post("/chatgpt/chat")
@UseGuards(AuthGuard)
@HttpCode(200)
@Sse()
getCompletion() {
return this.openaiService.getCompletion();
}
And in service.ts
@Injectable()
export class OpenaiService {
async getCompletion (): Promise<Observable<{ data: string; }>> {
return new Observable((subscriber) => {
openai.createCompletion(
{
model: 'text-davinci-003',
prompt: 'These were the best of times',
max_tokens: 5,
temperature: 0,
stream: true,
},
{ responseType: 'stream' },
).then(res => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
res.data.on('data', (data) => {
const lines = data
.toString()
.split('\n')
.filter((line) => line.trim() !== '');
for (const line of lines) {
const message = line.replace(/^data: /, '');
if (message === '[DONE]') {
subscriber.complete();
return;
}
try {
const parsed = JSON.parse(message);
const data = parsed.choices[0].text;
subscriber.next({ data });
} catch (error) {
console.error('Could not JSON parse stream message', message, error);
}
}
});
});
});
}
}
Hope this helps.
Upvotes: 1