Reputation: 2143
I'm working on an open source project, GraphAI, a declarative data flow programming environment. I'm adding a multi-threading capability to it so that any portion of the graph can be process by Worker threads. I was able to make it work with some hacks (hard-coded path to a generated JS file), but I am having a hard time in making it available as a part of this rpm package.
Here is the current code (the source code of this file is here).
if (!isMainThread && parentPort) {
const port = parentPort;
port.on("message", async (data) => {
const { graphData } = data;
const graphAI = new GraphAI(graphData, vanillaAgents);
const result = await graphAI.run();
port.postMessage(result);
});
}
export const workerAgent: AgentFunction<{ namedInputs?: Array<string> }, any, any> = async ({ inputs, params, /* agents, log, */ graphData }) => {
const namedInputs = params.namedInputs ?? inputs.map((__input, index) => `$${index}`);
namedInputs.forEach((nodeId, index) => {
if (graphData.nodes[nodeId] === undefined) {
// If the input node does not exist, automatically create a static node
graphData.nodes[nodeId] = { value: inputs[index] };
} else {
// Otherwise, inject the proper data here (instead of calling injectTo method later)
(graphData.nodes[nodeId] as StaticNodeData)["value"] = inputs[index];
}
});
return new Promise((resolve, reject) => {
const worker = new Worker("./lib/experimental_agents/graph_agents/worker_agent.js");
worker.on("message", (result) => {
worker.terminate();
resolve(result);
});
worker.on("error", reject);
worker.on("exit", (code) => {
if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`));
});
worker.postMessage({ graphData });
});
};
I'd really appreciate any help, hint or suggestions.
Upvotes: 0
Views: 26