Reputation: 665
I am implementing a realtime signal processing algorithm in C and I am trying to parallelise a section of the code using multithreading.
The code for the single threaded implementation is
void calcTheta(float *theta, float **s, float ***q, float ***g,
int *Ki, int m, int numObv, int numTask) {
int i, j, k;
for (i = 0; i < m; i++) {
theta[i] = 0;
for (j = 0; j < numObv; j++) {
for (k = 0; k < numTask; k++) {
theta[i] += (Ki[k] * (pow(fabs(q[i][j][k]), 2) / g[i][j][k]) - s[i][k]) /
(s[i][k] * (s[i][k] - (pow(fabs(q[i][j][k]), 2) / g[i][j][k])));
}//k
}//j
theta[i] = (numTask * numObv) / theta[i];
}//i
}
The multithreaded implementation uses a thread spooling idea where I create a few threads and keep signalling them to process with specific data arrays. The code is below:
#define NUM_THREADS_THETA 2
#define TRUE 1
#define FALSE 0
#define READY 1
#define DONE 0
struct threadThetaData {
float *theta;
float **s;
float ***q;
float ***g;
int *Ki;
int numObv;
int numTask;
int threadId;
};
struct threadThetaData dataArrayTheta[NUM_THREADS_THETA];
int termThread[NUM_THREADS_THETA];
int statusThread[NUM_THREADS_THETA];
int iVal[NUM_THREADS_THETA];
pthread_mutex_t mutexThreadProc[NUM_THREADS_THETA];
pthread_mutex_t mutexMainProc[NUM_THREADS_THETA];
pthread_cond_t condThreadProc[NUM_THREADS_THETA];
pthread_cond_t condMainProc[NUM_THREADS_THETA];
void *doProcTheta(void *threadArg) {
struct threadThetaData *myData = (struct threadThetaData *)threadArg;
float *theta = myData->theta;
float **s = myData->s;
float ***q = myData->q;
float ***g = myData->g;
int *Ki = myData->Ki;
int numObv = myData->numObv;
int numTask = myData->numTask;
int threadId = myData->threadId;
int j, k;
while(1) {
//printf("thread %d waiting for signal from master..\n", threadId);
pthread_mutex_lock(&mutexThreadProc[threadId]);
while (statusThread[threadId] != READY)
pthread_cond_wait(&condThreadProc[threadId], &mutexThreadProc[threadId]);
pthread_mutex_unlock(&mutexThreadProc[threadId]);
//printf("thread %d got signal from master..\n", threadId);
if (termThread[threadId] == TRUE)
pthread_exit(NULL);
theta[iVal[threadId]] = 0;
for (j = 0; j < numObv; j++) {
for (k = 0; k < numTask; k++) {
theta[iVal[threadId]] += (Ki[k]*(pow(fabs(q[iVal[threadId]][j][k]),2)/g[iVal[threadId]][j][k]) - s[iVal[threadId]][k])/(s[iVal[threadId]][k]*(s[iVal[threadId]][k] - (pow(fabs(q[iVal[threadId]][j][k]),2)/g[iVal[threadId]][j][k])));
}//k
}//j
theta[iVal[threadId]] = (numTask*numObv)/theta[iVal[threadId]];
pthread_mutex_lock(&mutexMainProc[threadId]);
statusThread[threadId] = DONE;
pthread_cond_signal(&condMainProc[threadId]);
pthread_mutex_unlock(&mutexMainProc[threadId]);
//printf("thread %d signaled to master..\n", threadId);
}
}
void calcTheta(float *theta,float **s,float ***q,float ***g,int *Ki,int m, int numObv, int numTask)
{
int i,j;
pthread_t thetaThreads[NUM_THREADS_THETA];
int numThreadBlks = m/NUM_THREADS_THETA;
int numThreadRem = m%NUM_THREADS_THETA;
int mCount = 0;
for(i=0;i<NUM_THREADS_THETA;i++)
{
pthread_mutex_init(&mutexThreadProc[i], NULL);
pthread_mutex_init(&mutexMainProc[i], NULL);
pthread_cond_init (&condThreadProc[i], NULL);
pthread_cond_init (&condMainProc[i], NULL);
dataArrayTheta[i].theta = theta;
dataArrayTheta[i].s = s;
dataArrayTheta[i].q = q;
dataArrayTheta[i].g = g;
dataArrayTheta[i].Ki = Ki;
dataArrayTheta[i].numObv = numObv;
dataArrayTheta[i].numTask = numTask;
dataArrayTheta[i].threadId = i;
termThread[i] = FALSE;
statusThread[i] = DONE;
pthread_create(&thetaThreads[i],NULL,doProcTheta,(void *)&dataArrayTheta[i]);
}
for(i=0;i<numThreadBlks;i++)
{
for(j=0;j<NUM_THREADS_THETA;j++)
{
pthread_mutex_lock(&mutexThreadProc[j]);
statusThread[j] = READY;
iVal[j] = mCount;
mCount++;
pthread_cond_signal(&condThreadProc[j]);
pthread_mutex_unlock(&mutexThreadProc[j]);
//printf("Signaled thread %d from master ... Waiting on signal ..\n",j);
}
for(j=0;j<NUM_THREADS_THETA;j++)
{
pthread_mutex_lock(&mutexMainProc[j]);
while (statusThread[j] != DONE)
pthread_cond_wait(&condMainProc[j], &mutexMainProc[j]);
pthread_mutex_unlock(&mutexMainProc[j]);
//printf("Got signal from thread %d to master \n",j);
}
}
for(j=0;j<numThreadRem;j++)
{
pthread_mutex_lock(&mutexThreadProc[j]);
statusThread[j] = READY;
iVal[j] = mCount;
mCount++;
pthread_cond_signal(&condThreadProc[j]);
pthread_mutex_unlock(&mutexThreadProc[j]);
}
for(j=0;j<numThreadRem;j++)
{
pthread_mutex_lock(&mutexMainProc[j]);
while (statusThread[j] != DONE)
pthread_cond_wait(&condMainProc[j], &mutexMainProc[j]);
pthread_mutex_unlock(&mutexMainProc[j]);
}
for(j=0;j<NUM_THREADS_THETA;j++)
{
pthread_mutex_lock(&mutexThreadProc[j]);
statusThread[j] = READY;
termThread[j] = TRUE;
pthread_cond_signal(&condThreadProc[j]);
pthread_mutex_unlock(&mutexThreadProc[j]);
pthread_join(thetaThreads[j],NULL);
pthread_mutex_destroy(&mutexThreadProc[j]);
pthread_cond_destroy(&condThreadProc[j]);
pthread_mutex_destroy(&mutexMainProc[j]);
pthread_cond_destroy(&condMainProc[j]);
}
}
Array dimensions :
float theta[m];
float s[m][numTask];
float q[m][numObv][numTask];
float g[m][numObv][numTask];
int Ki[numTask];
For a specific dataset where
m=661
numObv=96
numTask=1024
the runtimes are :
Single threaded : 4.5 seconds
Multithreaded with 2 threads : 6.9 seconds
I expected the runtimes for the multithreaded code to give me some performance improvement over the single threaded code when its the other way around . Any pointers to what I am missing here would be much appreciated.
Upvotes: 2
Views: 104
Reputation: 44274
Your multi threaded implementation seems way to complex for the problem at hand. The single threaded code shows that each theta
element is calculated independently from all other theta
elements.
So you don't need the mutexes and conditionals as there are no need for data exchange/synchronization between the threads. Just let the threads deal with different ranges of the theta
calculation.
With m=661
and 2 threads, then the first thread should calculate theta
in range 0..330 and the second thread should calculate theta
in range 331..660. Launch the two threads and wait for them to finish (aka join).
You can nearly use the single threaded code for a multi threaded implementation. All you need is to add a start-index to the function.
Upvotes: 5