Reputation: 75
I am creating a simple kdb+ shared library which generates values on a seperate thread and executes a callback function when data is ready. The application is writing data to a file descriptor in the new thread and reading from this in the main event loop. The application appears to be segfaulting when trying to lock and unlock the mutex.
If I introduce a small sleep into the loop, the segfault seems to disappear. This would suggest that the pthread_mutex_lock
call is not blocking the thread until a lock is obtained as I would have expected.
#include <k.h>
#include <pthread.h>
#include <time.h>
#define PIPE_CAPACITY 65536
static int fd;
static pthread_t thread;
static pthread_mutex_t lock;
K callback(int d)
{
K data;
// Aquire mutex lock and read from fd
pthread_mutex_lock(&lock);
read(d, &data, PIPE_CAPACITY);
pthread_mutex_unlock(&lock);
// kdb+ callback
k(0, (char *)"callback", r1(data), (K)0);
return (K)0;
}
void* loop()
{
while (1) {
struct timespec ts;
struct tm *time;
// Get seconds and nanoseconds since epoch
clock_gettime(CLOCK_REALTIME, &ts);
// Adjust for kdb+
time = gmtime(&ts.tv_sec);
time->tm_sec = 0;
time->tm_min = 0;
time->tm_hour = 0;
ts.tv_sec -= mktime(time); // Subtract seconds between epoch and midnight
// Create kdb+ timestamp
K data = ktj(-KN, ts.tv_sec * 1000000000 + ts.tv_nsec);
// Aquire mutex lock and write to fd
pthread_mutex_lock(&lock);
write(fd, &data, sizeof(K));
pthread_mutex_unlock(&lock);
}
}
K init()
{
// Initialize mutex
pthread_mutex_init(&lock, NULL);
// Create file descriptor
fd = eventfd(0, 0);
// Register callback
sd1(fd, callback);
// Launch thread
pthread_create(&thread, NULL, loop, NULL);
}
Upvotes: 5
Views: 5521
Reputation: 2268
Recall that K is a pointer type defined in k.h as:
typedef struct k0{..}*K;
This means you are sending a pointer to an object created in the "loop" thread to the callback executed in the main thread. This does not work because kdb+ uses a separate memory pull for each thread. I would recommend passing a copy of the data instead.
Another problem is at the line
read(d, &data, PIPE_CAPACITY);
You are reading 65536 bytes, but pass the address of an 8-byte variable as the destination. The reason you don't get a segfault when you introduce the delay is that in this case the loop does not get a chance to write more than 8 bytes between the reads.
Finally, I am not sure you can use the file descriptor returned by eventfd as a read-write buffer. I would recommend using the good old pipe() call.
The following modification of your code works for me:
#include <k.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
static int fd[2];
static pthread_t thread;
static pthread_mutex_t lock;
K callback(int d)
{
K data = ktj(-KN, 0);
// Aquire mutex lock and read from fd
pthread_mutex_lock(&lock);
read(d, (void *)&data->j, sizeof(data->j));
pthread_mutex_unlock(&lock);
// kdb+ callback
k(0, (char *)"callback", data, (K)0);
return (K)0;
}
void* loop()
{
while (1) {
struct timespec ts;
struct tm *time;
// Get seconds and nanoseconds since epoch
clock_gettime(CLOCK_REALTIME, &ts);
// Adjust for kdb+
time = gmtime(&ts.tv_sec);
time->tm_sec = 0;
time->tm_min = 0;
time->tm_hour = 0;
ts.tv_sec -= mktime(time); // Subtract seconds between epoch and midnight
// Create kdb+ timestamp
J data = (J)ts.tv_sec * 1000000000 + ts.tv_nsec;
// Aquire mutex lock and write to fd
pthread_mutex_lock(&lock);
write(fd[1], &data, sizeof(data));
pthread_mutex_unlock(&lock);
}
}
K1(init)
{
// Initialize mutex
pthread_mutex_init(&lock, NULL);
// Create file descriptor
pipe(fd);
// Register callback
sd1(fd[0], callback);
// Launch thread
pthread_create(&thread, NULL, loop, NULL);
R ktj(0, 0);
}
To test, copy the code above in x.c, compile
$ gcc -Wall -shared -fPIC -I $(pwd) -DKXVER=3 x.c -o x.so
and run the following q code:
callback:0N!
init:`:./x 2:(`init;1)
init[]
Upvotes: 4