Reputation: 127
I am working with OpenAI's function calling and handling streaming events using Node.js. I have set up an EventHandler
class that extends EventEmitter
to process events from an OpenAI assistant, and I am trying to handle the thread.run.requires_action
event, which should trigger the calling of tools, processing their output, and submitting it back through streaming.
The issue arises when the event handling stops at thread.run.requires_action
. Despite the tool outputs being processed, the flow does not seem to proceed further as expected, and I am not receiving the event in the controller.
eventHandler.js
onEvent
method: Listens for the event thread.run.requires_action
and processes the required tool calls.submitToolOutputs
method: Submits the processed tool outputs using a stream and emits the event.process.stdout.write
), the event is not emitted properly to continue the flow.const EventEmitter = require('events');
const assistantController = require('./assistant.controller');
class EventHandler extends EventEmitter {
constructor(client, project, dbURL, userID, projectID, res) {
super();
this.client = client;
this.project = project;
this.dbURL = dbURL;
this.userID = userID;
this.res = res;
}
async onEvent(event) {
try {
// Handle 'requires_action' events with tool_calls
if (event.event === "thread.run.requires_action") {
await this.handleRequiresAction(
event.data,
event.data.id,
event.data.thread_id,
);
}
} catch (error) {
console.error("Error handling event:", error);
}
}
async handleRequiresAction(data, runId, threadId) {
try {
let toolCalls = data.required_action.submit_tool_outputs.tool_calls;
const toolOutputs = await Promise.all(toolCalls.map(toolCall => {
return assistantController.processToolCalls(
toolCall,
this.project,
this.dbURL,
this.userID,
this.projectID
);
}));
// Submit tool outputs
await this.submitToolOutputs(toolOutputs, runId, threadId);
} catch (error) {
console.error("Error processing required action:", error);
}
}
async submitToolOutputs(toolOutputs, runId, threadId) {
console.log("Submitting tool outputs");
let finalMessage = '';
try {
const stream = this.client.beta.threads.runs.submitToolOutputsStream(
threadId,
runId,
{ tool_outputs: toolOutputs }
);
for await (const event of stream) {
const message = event.data?.delta?.content[0]?.text?.value;
process.stdout.write(message); //<-- messages are getting printed in console
this.emit("event", event); //<-- But this line supposed to emit the event while accessing from controller file
}
} catch (error) {
console.error("Error submitting tool outputs:", error);
}
}
}
module.exports = EventHandler;
controller.js
EventHandler
is instantiated and set up to listen for the "event"
and handle it through the onEvent
method.thread.run.requires_action
, and even though events are emitted inside submitToolOutputs
, they don't seem to trigger further event handling in the controller.const eventHandler = new EventHandler(openai, project, dbURL, userID, projectID);
eventHandler.on("event", eventHandler.onEvent.bind(eventHandler));
const stream = await openai.beta.threads.runs.createAndStream(
thread,
{ assistant_id: assistant },
eventHandler,
);
for await (const event of stream) {
writeLog(`\n${event.event} : ${event.data.id}`);
eventHandler.emit("event", event);
console.log(event.event); // <-- Stopped at thread.run.requires_action and not proceeding further
}
thread.run.requires_action
event.thread.run.requires_action
.Any insights into why the event isn't being handled further or how I can ensure that the event gets emitted and handled properly would be greatly appreciated!
Upvotes: 0
Views: 90