ARUN BALAJI
ARUN BALAJI

Reputation: 127

Streaming Stopped at `thread.run.requires_action` When Handling OpenAI Assistants Function Calling

I am working with OpenAI's function calling and handling streaming events using Node.js. I have set up an EventHandler class that extends EventEmitter to process events from an OpenAI assistant, and I am trying to handle the thread.run.requires_action event, which should trigger the calling of tools, processing their output, and submitting it back through streaming.

The issue arises when the event handling stops at thread.run.requires_action. Despite the tool outputs being processed, the flow does not seem to proceed further as expected, and I am not receiving the event in the controller.

Code Overview:

eventHandler.js

const EventEmitter = require('events');
const assistantController = require('./assistant.controller');

class EventHandler extends EventEmitter {
    constructor(client, project, dbURL, userID, projectID, res) {
        super();
        this.client = client;
        this.project = project;
        this.dbURL = dbURL;
        this.userID = userID;
        this.res = res;
    }

    async onEvent(event) {
        try {
            // Handle 'requires_action' events with tool_calls
            if (event.event === "thread.run.requires_action") {
                await this.handleRequiresAction(
                    event.data,
                    event.data.id,
                    event.data.thread_id,
                );
            }
        } catch (error) {
            console.error("Error handling event:", error);
        }
    }

    async handleRequiresAction(data, runId, threadId) {
        try {
            let toolCalls = data.required_action.submit_tool_outputs.tool_calls;
            const toolOutputs = await Promise.all(toolCalls.map(toolCall => {
                return assistantController.processToolCalls(
                    toolCall,
                    this.project,
                    this.dbURL,
                    this.userID,
                    this.projectID
                );
            }));

            // Submit tool outputs
            await this.submitToolOutputs(toolOutputs, runId, threadId);
        } catch (error) {
            console.error("Error processing required action:", error);
        }
    }

    async submitToolOutputs(toolOutputs, runId, threadId) {
        console.log("Submitting tool outputs");
        let finalMessage = '';
        try {
            const stream = this.client.beta.threads.runs.submitToolOutputsStream(
                threadId,
                runId,
                { tool_outputs: toolOutputs }
            );

            for await (const event of stream) {
                const message = event.data?.delta?.content[0]?.text?.value;
                process.stdout.write(message);  //<-- messages are getting printed in console
                this.emit("event", event);  //<-- But this line supposed to emit the event while accessing from controller file
            }
        } catch (error) {
            console.error("Error submitting tool outputs:", error);
        }
    }
}

module.exports = EventHandler;

controller.js

const eventHandler = new EventHandler(openai, project, dbURL, userID, projectID);
eventHandler.on("event", eventHandler.onEvent.bind(eventHandler));

const stream = await openai.beta.threads.runs.createAndStream(
    thread,
    { assistant_id: assistant },
    eventHandler,
);

for await (const event of stream) {
    writeLog(`\n${event.event}  : ${event.data.id}`);
    eventHandler.emit("event", event);
    console.log(event.event);     // <-- Stopped at thread.run.requires_action and not proceeding further
}

Expected Behavior:

Actual Behavior:

Any insights into why the event isn't being handled further or how I can ensure that the event gets emitted and handled properly would be greatly appreciated!

Upvotes: 0

Views: 90

Answers (0)

Related Questions