hash070
hash070

Reputation: 1101

How to create a post-streaming SSE API with NestJS and Fastify, like OpenAI's API?

I'm trying to implement a Server-Sent Events (SSE) streaming API using NestJS and Fastify, similar to the OpenAI's Create chat completion API. And I want my clients to receive the streamed data using axios with responseType: "stream" just like the official API.

Here is what I've tried (the following code has simplified):

controller.ts

  @Post("/chatgpt/chat")
  @UseGuards(AuthGuard)
  @HttpCode(200)
  @Header("Content-type", "text/event-stream")
  async chatWithPostStream(@Body() body: ChatGPTChatDto, @Req() fastifyRequest: FastifyRequest, ,@Res() fastifyResponse: FastifyReply) {
    return await this.chatGPTService.chatPostStream(body);
  }

service.ts

  async chatWithPostStream(body: ChatGPTChatDto) {
        
    const openaiKey = "sk-xxxxxxxxxxxxxxxxxxxxxx";
        
    let chatGPTResponse;
    try {
      chatGPTResponse = await axios.post("https://api.openai.com/v1/chat/completions", {
        model: "gpt-3.5-turbo",
        messages: [{"role": "user", "content": "Hello!"}],//just for test, originally is `body.content`
        stream: true
      }, {
        headers: {
          "Content-Type": "application/json",
          "Authorization": `Bearer ${openaiKey}`
        },
        responseType: "stream"
      });
    } catch (e) {
      console.error("ChatGPT API Error:", e);
    }

    let outputText = "";

    chatGPTResponse.data.on("data", (chunk) => {
      //TODO: send the same chunk to client

      if (!chunk.toString("utf-8").includes("data: [DONE]")) {
        try {
      console.log("chunk text: ", chunk.toString("utf-8"));
          let choice = JSON.parse(chunk.toString("utf-8").substring(5)).choices[0];
          outputText += choice.delta.content ? choice.delta.content : "";
        } catch (e) {
          console.log("json parse error: ", e.message, "Original JSON:", chunk.toString("utf-8").substring(5));
        }
      } else {
        console.log("Final output text", outputText);
      }
    });

    chatGPTResponse.data.on("error", (e) => {
      console.error("ChatGPT Error", e);
    });

    chatGPTResponse.data.on("end", () => {
      console.log("ChatGPT End");
    });
    }

Over the past few days, I have attempted to implement this feature using different ways. Unfortunately, all of my attempts have been unsuccessful.

What should I do to create a streaming API just like the official API so that I can use axios in the web client to listen the stream data in the same way?

Upvotes: 5

Views: 4389

Answers (1)

Ravi Tmg
Ravi Tmg

Reputation: 19

You can use @Sse directive to send any streamed data. Use RxJS Observer to push values. Here's an example code (I am using OpenAI Nodejs SDK):

In controller.ts

  @Post("/chatgpt/chat")
  @UseGuards(AuthGuard)
  @HttpCode(200)
  @Sse()
  getCompletion() {
    return this.openaiService.getCompletion();
  }

And in service.ts

@Injectable()
export class OpenaiService {
  async getCompletion (): Promise<Observable<{ data: string; }>> {
    return new Observable((subscriber) => {
      openai.createCompletion(
        {
          model: 'text-davinci-003',
          prompt: 'These were the best of times',
          max_tokens: 5,
          temperature: 0,
          stream: true,
        },
        { responseType: 'stream' },
      ).then(res => {
        // eslint-disable-next-line @typescript-eslint/ban-ts-comment
        // @ts-ignore
        res.data.on('data', (data) => {
          const lines = data
            .toString()
            .split('\n')
            .filter((line) => line.trim() !== '');
          for (const line of lines) {
            const message = line.replace(/^data: /, '');
            if (message === '[DONE]') {
              subscriber.complete();
              return;
            }
            try {
              const parsed = JSON.parse(message);
              const data = parsed.choices[0].text;
              subscriber.next({ data });
            } catch (error) {
              console.error('Could not JSON parse stream message', message, error);
            }
          }
        });
      });
    });
  }
}

Hope this helps.

Upvotes: 1

Related Questions