Philip Nelson
Philip Nelson

Reputation: 1231

How can you call an emitter callback from separate c++ thread in an addon?

For context, I started with this question. I need to call the callback for the emitter in another thread. I made a minimal example but it segfaults on emit.Call({cb, result}); My first instinct is that I have a problem with the lifetimes of env or the emit function.

addon.cpp

#include <napi.h>
#include <iostream>
#include <thread>
#include <memory>
#include <functional>
#include <chrono>

std::shared_ptr<std::thread> thread;
bool running = true;

void generate(Napi::Env& env, Napi::Function& emit)
{
  while(running)
  {
    Napi::Array result = Napi::Array::New(env);

    for(int i = 0; i < 3; ++i)
    {
      result[i] = rand()%100;
    }

    auto cb = Napi::String::New(env, "onFeedData");

    emit.Call({cb, result});

    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
}

Napi::Value Start(const Napi::CallbackInfo& info)
{
  Napi::Env env = info.Env();
  Napi::Function emit = info[0].As<Napi::Function>();

  auto cb = std::bind(generate, env, emit);
  thread = std::make_shared<std::thread>(cb);

  return Napi::String::New(env, "OK");
}

Napi::Value Stop(const Napi::CallbackInfo& info)
{
  Napi::Env env = info.Env();
  Napi::Function emit = info[0].As<Napi::Function>();

  running = false;
  thread->join();

  return Napi::String::New(env, "OK");
}

Napi::Object Init(Napi::Env env, Napi::Object exports)
{
  exports.Set(
      Napi::String::New(env, "Start"),
      Napi::Function::New(env, Start));

  exports.Set(Napi::String::New(env, "Stop"),
      Napi::Function::New(env, Stop));

  return exports;
}

NODE_API_MODULE(addon, Init)

index.js

'use strict'

const EventEmitter = require('events').EventEmitter;
const addon = require('./build/addon.node');

function Main() {
  const emitter = new EventEmitter();

  emitter.on('onFeedData', (evt) => {
    console.log(evt);
  })

  setTimeout(() => {
    addon.Stop( emitter.emit.bind(emitter) );
  }, 5000);

  addon.Start( emitter.emit.bind(emitter) );
}

Main();

Upvotes: 6

Views: 3030

Answers (3)

remon78eg
remon78eg

Reputation: 59

addon.cc

#include <napi.h>
static Napi::FunctionReference emit;

Napi::Value EMIT_SET(const Napi::CallbackInfo& info) {
    Napi::Function fn = info[0].As<Napi::Function>();
    emit = Napi::Weak(fn);//Persistent(fn);
    emit.SuppressDestruct();    
    return info.Env().Null();
}

Napi::Value EMIT_CALL(const Napi::CallbackInfo& info) {
    std::string type = info[0].As<Napi::String>();
    uint        arg1 = info[1].As<Number>().Uint32Value();
    std::string arg2 = info[2].As<Napi::String>();
    std::thread([](std::string type, uint arg1,std::string arg2, Napi::ThreadSafeFunction tsfn){
            struct output_data{
                std::string type;
                uint arg1;
                std::string arg2;
            };          
            auto data = new output_data();
            ///---------------
            ///fill output data
            data->type=type;
            data->arg1=arg1;
            data->arg2=arg2;
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));
            ///---------------
            ///output thread result to nodejs
            napi_status status = tsfn.BlockingCall(data,[](Napi::Env env, Napi::Function emit,output_data* data){
                if(data->type=="data 1") emit.Call({Napi::String::New(env, "data 1"), Napi::Number::New(env, 111111), Napi::String::New(env, "to data 1")});
                if(data->type=="data 2") emit.Call({Napi::String::New(env, "data 2"), Napi::Number::New(env, 222222), Napi::String::New(env, "to data 2")});
                delete data;
            });

            if(status != napi_ok) { std::cout << "error!" << "\n"; }
            tsfn.Release();         
        },type,arg1,arg2,Napi::ThreadSafeFunction::New(info.Env(), emit.Value(), "TSFN", 0, 1,[](Napi::Env env, void *finalizeData){},(void *)nullptr)).detach();

    return info.Env().Null();
}

index.js

const ADDON = require('./EMIT/build/Release/addon');
const emitter = new(require('events'))
ADDON.EMIT_SET(emitter.emit.bind(emitter));

emitter.on('data 1',(arg1,arg2)=>{
    console.log('data1: ',arg1);
    console.log('data1: ',arg2);
})

emitter.on('data 2',(arg1,arg2)=>{
    console.log('data2: ',arg1);
    console.log('data2: ',arg2);
})

ADDON.EMIT_CALL('data 1',111,"aaa");
ADDON.EMIT_CALL('data 2',222,"bbb");

console.log('fin')

output:

  • fin
  • data2: 222222
  • data2: to data 2
  • data1: 111111
  • data1: to data 1

see also how to return data from thread

Upvotes: 0

Satyan
Satyan

Reputation: 1406

We can achieve this by utilizing napi_create_threadsafe_function() function; such usage is explained in detail in the StackOverflow posting How to use napi_threadsafe_function for NodeJS Native Addon

Here is the node.js documentation for Asynchronous Thread-safe Function Calls

Upvotes: 3

Val
Val

Reputation: 22797

I've tried many solutions but only this works.

With napi-thread-safe-callback, you can callback from sub-thread safely:

void example_async_work(const CallbackInfo& info)
{
    // Capture callback in main thread
    auto callback = std::make_shared<ThreadSafeCallback>(info[0].As<Function>());
    bool fail = info.Length() > 1;

    // Pass callback to other thread
    std::thread([callback, fail]
    {
        try
        {
            // Do some work to get a result
            if (fail)
                throw std::runtime_error("Failure during async work");
            std::string result = "foo";

            // Call back with result
            callback->call([result](Napi::Env env, std::vector<napi_value>& args)
            {
                // This will run in main thread and needs to construct the
                // arguments for the call
                args = { env.Undefined(), Napi::String::New(env, result) };
            });
        }
        catch (std::exception& e)
        {
            // Call back with error
            callback->callError(e.what());
        }
    }).detach();
}

Upvotes: 2

Related Questions