smolloy
smolloy

Reputation: 368

Realloc fails, but only when a printf statement is used: "invalid next size"

I am trying to write a ThreadPool library for C; mostly for educational purposes, but also perhaps for practical usage later. I have an implementation that "works", in that I can spin up the required number of threads into a waiting state, and then populate them from a queue of tasks.

The last feature I added was to allow the task queue to expand dynamically (using realloc) if the number of tasks added to the queue would exceed its capacity. Y'know, the usual dynamic array stuff.

This works well with the following main function:

int main(void) {
  srand((unsigned int)time(0));

  sdm_threadpool_t *pool = sdm_threadpool_create(NUM_THREADS, DEFAULT_QUEUE_LENGTH);

  WrapperArg arg[NUM_TASKS] = {0};
  for (int i = 0; i < NUM_TASKS; i++) {
    arg[i].input = i;
  }
  for (int i = 0; i < NUM_TASKS; i++) {
    // printf("Submitting job %d...\n", i);
    sdm_threadpool_add(pool, function_wrapper, &(arg[i]));
  }

  sdm_threadpool_join(pool);

  for (size_t i=0; i<NUM_TASKS; i++) {
    printf("arg[%zu].input = %d :: arg[%zu].output = %d\n",
           i, arg[i].input, i, arg[i].output);
  }

  return 0;
}

Note the commented-out printf just before the sdm_threadpool_add call. If I remove the comments and allow this to print, I get a core dump with realloc(): invalid next size. Or, I should say, I very often get a core dump, but not always.

From browsing around I suspect that I am doing something wrong with a pointer somewhere that is corrupting the heap, but I can't figure out where. To be frank, I can't even think about how to go about investigating this to narrow down the problem.

Any advice you can give that pinpoints my mistake or that helps me investigate it myself would be gratefully received.

Thanks


Here are some definitions that are needed to compile the code:

typedef struct {
  void (*function)(void *);
  void *arg;
} sdm_threadpool_task_t;

typedef struct {
  pthread_mutex_t lock;
  pthread_cond_t notify;
  pthread_t *threads;
  sdm_threadpool_task_t *task_queue;
  size_t num_threads;
  size_t queue_capacity;
  size_t next_in_queue;
  size_t queue_length;
  size_t waiting_in_queue;
  bool shutdown;
} sdm_threadpool_t;
int slow_square(int n) {
  unsigned int sleepy_time = (unsigned int)rand() % 2000000;
  usleep(sleepy_time);
  int n_sqr = n*n;
  return n_sqr;
}

typedef struct {
  int input;
  int output;
} WrapperArg;

void function_wrapper(void *arg) {
  WrapperArg *cast_arg = (WrapperArg *)arg;

  cast_arg->output = slow_square(cast_arg->input);
}

Since I suspect it will be useful, I also include the code for the various functions I use in my little library.

#define _XOPEN_SOURCE 500
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include "cthreadpool.h"

void *sdm_threadpool_thread(void *threadpool);

sdm_threadpool_t *sdm_threadpool_create(size_t num_threads, size_t queue_capacity) {
  sdm_threadpool_t *pool = NULL;

  pool = malloc(sizeof(sdm_threadpool_t));
  if (pool == NULL) {
    fprintf(stderr, "Memory allocation problem. Quiting\n");
    exit(1);
  }
  memset(pool, 0, sizeof(sdm_threadpool_t));

  pool->threads = malloc(sizeof(pthread_t) * num_threads);
  if (pool->threads == NULL) {
    fprintf(stderr, "Memory allocation problem. Quiting\n");
    exit(1);
  }
  memset(pool->threads, 0, pool->num_threads * sizeof(pool->threads[0]));

  pool->task_queue = malloc(sizeof(sdm_threadpool_task_t) * queue_capacity);
  if (pool->task_queue == NULL) {
    fprintf(stderr, "Memory allocation problem. Quiting\n");
    exit(1);
  }
  memset(pool->task_queue, 0, pool->num_threads * sizeof(pool->task_queue[0]));

  pool->num_threads = num_threads;
  pool->queue_capacity = queue_capacity;

  pthread_mutex_init(&(pool->lock), NULL);
  pthread_cond_init(&(pool->notify), NULL);

  for (size_t i = 0; i < num_threads; i++) {
    pthread_create(&(pool->threads[i]), NULL, &sdm_threadpool_thread, (void*)pool);
  }

  return pool;
}

int sdm_threadpool_add(sdm_threadpool_t *pool, void (*function)(void *), void *arg) {
  pthread_mutex_lock(&(pool->lock));

  while (pool->waiting_in_queue >= pool->queue_capacity) {
    pool->queue_capacity *= 2;
    size_t new_size = sizeof(sdm_threadpool_task_t) * pool->queue_capacity;
    printf("Extending queue to %zu bytes\n", new_size);
    sdm_threadpool_task_t *new_task_queue = realloc(pool->task_queue, new_size);
    if (new_task_queue == NULL) {
      fprintf(stderr, "Memory allocation problem. Quiting\n");
      exit(1);
    }
    pool->task_queue = new_task_queue;
  }

  pool->task_queue[pool->queue_length].function = function;
  pool->task_queue[pool->queue_length].arg = arg;
  pool->queue_length++;
  pool->waiting_in_queue++;

  pthread_cond_broadcast(&(pool->notify));
  pthread_mutex_unlock(&(pool->lock));

  return 0;
}

void *sdm_threadpool_thread(void *threadpool) {
  sdm_threadpool_t *pool = (sdm_threadpool_t *)threadpool;

  while (1) {
    pthread_mutex_lock(&(pool->lock));

    while ((pool->waiting_in_queue == 0) && (!pool->shutdown)) {
      pthread_cond_wait(&(pool->notify), &(pool->lock));
    }

    if (pool->shutdown) {
      pthread_mutex_unlock(&(pool->lock));
      pthread_exit(NULL);
    }

    void (*function)(void *) = pool->task_queue[pool->next_in_queue].function;
    void *arg = pool->task_queue[pool->next_in_queue].arg;
    pool->next_in_queue++;
    pool->waiting_in_queue--;

    pthread_mutex_unlock(&(pool->lock));

    function(arg);
  }

  return NULL;
}

void sdm_threadpool_join(sdm_threadpool_t *pool) {
  while (pool->waiting_in_queue > 0) {
    usleep(1000);
  }

  sdm_threadpool_destroy(pool);
}

void sdm_threadpool_destroy(sdm_threadpool_t *pool) {
  pthread_mutex_lock(&(pool->lock));

  pool->shutdown = true;

  pthread_cond_broadcast(&(pool->notify));
  pthread_mutex_unlock(&(pool->lock));

  for (size_t i = 0; i < pool->num_threads; i++) {
    pthread_join(pool->threads[i], NULL);
  }

  free(pool->threads);
  free(pool->task_queue);
  pthread_mutex_destroy(&(pool->lock));
  pthread_cond_destroy(&(pool->notify));
  free(pool);
}

Upvotes: 1

Views: 113

Answers (1)

smolloy
smolloy

Reputation: 368

A comment by IanAbbott led me to the bug. The issue is the comparison I was doing to decide whether or not the queue needed to be realloc'ed. I was comparing against pool->waiting_in_queue, which is incorrect since this is the number of unstarted jobs. What I needed to do was this:

  while (pool->queue_length >= pool->queue_capacity) {

That is, comparing the length of the queue itself, and not the number of unstarted jobs.

Thanks for all the helpful comments and advice :)

Upvotes: 1

Related Questions