Introduction
A “thread per task” model looks simple—until it isn’t. Context-switch overhead stacks up, memory usage balloons, and shutdown gets messy. In this tutorial, we’ll write the C codes to utilize Johan Hanssen Seferidis's
lightweight, production-friendly thread pool implementation in C that I have used before in production and proven to be quite robust.
The Thread Pool Code
The actual thread pool code by Johan is quite short, so I will just pasted them here. These shall be compiled to your C application as well. If you are not sure how to compile C application, you can refer to this post to learn more.
thpool.c
/* ******************************** * Author: Johan Hanssen Seferidis * License: MIT * Description: Library providing a threading pool where you can add * work. For usage, check the thpool.h file or README.md * *//** @file thpool.h *//* * ********************************/ #define _POSIX_C_SOURCE 200809L #include <unistd.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <errno.h> #include <time.h> #if defined(__linux__) #include <sys/prctl.h> #endif #include "thpool.h" #ifdef THPOOL_DEBUG #define THPOOL_DEBUG 1 #else #define THPOOL_DEBUG 0 #endif #if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG) #define err(str) fprintf(stderr, str) #else #define err(str) #endif static volatile int threads_keepalive; static volatile int threads_on_hold; /* ========================== STRUCTURES ============================ */ /* Binary semaphore */ typedef struct bsem { pthread_mutex_t mutex; pthread_cond_t cond; int v; } bsem; /* Job */ typedef struct job{ struct job* prev; /* pointer to previous job */ void (*function)(void* arg); /* function pointer */ void* arg; /* function's argument */ } job; /* Job queue */ typedef struct jobqueue{ pthread_mutex_t rwmutex; /* used for queue r/w access */ job *front; /* pointer to front of queue */ job *rear; /* pointer to rear of queue */ bsem *has_jobs; /* flag as binary semaphore */ int len; /* number of jobs in queue */ } jobqueue; /* Thread */ typedef struct thread{ int id; /* friendly id */ pthread_t pthread; /* pointer to actual thread */ struct thpool_* thpool_p; /* access to thpool */ } thread; /* Threadpool */ typedef struct thpool_{ thread** threads; /* pointer to threads */ volatile int num_threads_alive; /* threads currently alive */ volatile int num_threads_working; /* threads currently working */ pthread_mutex_t thcount_lock; /* used for thread count etc */ pthread_cond_t threads_all_idle; /* signal to thpool_wait */ jobqueue jobqueue; /* job queue */ } thpool_; /* ========================== PROTOTYPES ============================ */ static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id); static void* thread_do(struct thread* thread_p); static void thread_hold(int sig_id); static void thread_destroy(struct thread* thread_p); static int jobqueue_init(jobqueue* jobqueue_p); static void jobqueue_clear(jobqueue* jobqueue_p); static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p); static struct job* jobqueue_pull(jobqueue* jobqueue_p); static void jobqueue_destroy(jobqueue* jobqueue_p); static void bsem_init(struct bsem *bsem_p, int value); static void bsem_reset(struct bsem *bsem_p); static void bsem_post(struct bsem *bsem_p); static void bsem_post_all(struct bsem *bsem_p); static void bsem_wait(struct bsem *bsem_p); /* ========================== THREADPOOL ============================ */ /* Initialise thread pool */ struct thpool_* thpool_init(int num_threads){ threads_on_hold = 0; threads_keepalive = 1; if (num_threads < 0){ num_threads = 0; } /* Make new thread pool */ thpool_* thpool_p; thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_)); if (thpool_p == NULL){ err("thpool_init(): Could not allocate memory for thread pool\n"); return NULL; } thpool_p->num_threads_alive = 0; thpool_p->num_threads_working = 0; /* Initialise the job queue */ if (jobqueue_init(&thpool_p->jobqueue) == -1){ err("thpool_init(): Could not allocate memory for job queue\n"); free(thpool_p); return NULL; } /* Make threads in pool */ thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *)); if (thpool_p->threads == NULL){ err("thpool_init(): Could not allocate memory for threads\n"); jobqueue_destroy(&thpool_p->jobqueue); free(thpool_p); return NULL; } pthread_mutex_init(&(thpool_p->thcount_lock), NULL); pthread_cond_init(&thpool_p->threads_all_idle, NULL); /* Thread init */ int n; for (n=0; n<num_threads; n++){ thread_init(thpool_p, &thpool_p->threads[n], n); #if THPOOL_DEBUG printf("THPOOL_DEBUG: Created thread %d in pool \n", n); #endif } /* Wait for threads to initialize */ while (thpool_p->num_threads_alive != num_threads) {} return thpool_p; } /* Add work to the thread pool */ int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){ job* newjob; newjob=(struct job*)malloc(sizeof(struct job)); if (newjob==NULL){ err("thpool_add_work(): Could not allocate memory for new job\n"); return -1; } /* add function and argument */ newjob->function=function_p; newjob->arg=arg_p; /* add job to queue */ jobqueue_push(&thpool_p->jobqueue, newjob); return 0; } /* Wait until all jobs have finished */ void thpool_wait(thpool_* thpool_p){ pthread_mutex_lock(&thpool_p->thcount_lock); while (thpool_p->jobqueue.len || thpool_p->num_threads_working) { pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock); } pthread_mutex_unlock(&thpool_p->thcount_lock); } /* Destroy the threadpool */ void thpool_destroy(thpool_* thpool_p){ /* No need to destory if it's NULL */ if (thpool_p == NULL) return ; volatile int threads_total = thpool_p->num_threads_alive; /* End each thread 's infinite loop */ threads_keepalive = 0; /* Give one second to kill idle threads */ double TIMEOUT = 1.0; time_t start, end; double tpassed = 0.0; time (&start); while (tpassed < TIMEOUT && thpool_p->num_threads_alive){ bsem_post_all(thpool_p->jobqueue.has_jobs); time (&end); tpassed = difftime(end,start); } /* Poll remaining threads */ while (thpool_p->num_threads_alive){ bsem_post_all(thpool_p->jobqueue.has_jobs); sleep(1); } /* Job queue cleanup */ jobqueue_destroy(&thpool_p->jobqueue); /* Deallocs */ int n; for (n=0; n < threads_total; n++){ thread_destroy(thpool_p->threads[n]); } free(thpool_p->threads); free(thpool_p); } /* Pause all threads in threadpool */ void thpool_pause(thpool_* thpool_p) { int n; for (n=0; n < thpool_p->num_threads_alive; n++){ pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1); } } /* Resume all threads in threadpool */ void thpool_resume(thpool_* thpool_p) { // resuming a single threadpool hasn't been // implemented yet, meanwhile this supresses // the warnings (void)thpool_p; threads_on_hold = 0; } int thpool_num_threads_working(thpool_* thpool_p){ return thpool_p->num_threads_working; } /* ============================ THREAD ============================== */ /* Initialize a thread in the thread pool * * @param thread address to the pointer of the thread to be created * @param id id to be given to the thread * @return 0 on success, -1 otherwise. */ static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ *thread_p = (struct thread*)malloc(sizeof(struct thread)); if (thread_p == NULL){ err("thread_init(): Could not allocate memory for thread\n"); return -1; } (*thread_p)->thpool_p = thpool_p; (*thread_p)->id = id; pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p)); pthread_detach((*thread_p)->pthread); return 0; } /* Sets the calling thread on hold */ static void thread_hold(int sig_id) { (void)sig_id; threads_on_hold = 1; while (threads_on_hold){ sleep(1); } } /* What each thread is doing * * In principle this is an endless loop. The only time this loop gets interuppted is once * thpool_destroy() is invoked or the program exits. * * @param thread thread that will run this function * @return nothing */ static void* thread_do(struct thread* thread_p){ /* Set thread name for profiling and debuging */ char thread_name[128] = {0}; sprintf(thread_name, "thread-pool-%d", thread_p->id); #if defined(__linux__) /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */ prctl(PR_SET_NAME, thread_name); #elif defined(__APPLE__) && defined(__MACH__) pthread_setname_np(thread_name); #else err("thread_do(): pthread_setname_np is not supported on this system"); #endif /* Assure all threads have been created before starting serving */ thpool_* thpool_p = thread_p->thpool_p; /* Register signal handler */ struct sigaction act; sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = thread_hold; if (sigaction(SIGUSR1, &act, NULL) == -1) { err("thread_do(): cannot handle SIGUSR1"); } /* Mark thread as alive (initialized) */ pthread_mutex_lock(&thpool_p->thcount_lock); thpool_p->num_threads_alive += 1; pthread_mutex_unlock(&thpool_p->thcount_lock); while(threads_keepalive){ bsem_wait(thpool_p->jobqueue.has_jobs); if (threads_keepalive){ pthread_mutex_lock(&thpool_p->thcount_lock); thpool_p->num_threads_working++; pthread_mutex_unlock(&thpool_p->thcount_lock); /* Read job from queue and execute it */ void (*func_buff)(void*); void* arg_buff; job* job_p = jobqueue_pull(&thpool_p->jobqueue); if (job_p) { func_buff = job_p->function; arg_buff = job_p->arg; func_buff(arg_buff); free(job_p); } pthread_mutex_lock(&thpool_p->thcount_lock); thpool_p->num_threads_working--; if (!thpool_p->num_threads_working) { pthread_cond_signal(&thpool_p->threads_all_idle); } pthread_mutex_unlock(&thpool_p->thcount_lock); } } pthread_mutex_lock(&thpool_p->thcount_lock); thpool_p->num_threads_alive --; pthread_mutex_unlock(&thpool_p->thcount_lock); return NULL; } /* Frees a thread */ static void thread_destroy (thread* thread_p){ free(thread_p); } /* ============================ JOB QUEUE =========================== */ /* Initialize queue */ static int jobqueue_init(jobqueue* jobqueue_p){ jobqueue_p->len = 0; jobqueue_p->front = NULL; jobqueue_p->rear = NULL; jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); if (jobqueue_p->has_jobs == NULL){ return -1; } pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); bsem_init(jobqueue_p->has_jobs, 0); return 0; } /* Clear the queue */ static void jobqueue_clear(jobqueue* jobqueue_p){ while(jobqueue_p->len){ free(jobqueue_pull(jobqueue_p)); } jobqueue_p->front = NULL; jobqueue_p->rear = NULL; bsem_reset(jobqueue_p->has_jobs); jobqueue_p->len = 0; } /* Add (allocated) job to queue */ static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ pthread_mutex_lock(&jobqueue_p->rwmutex); newjob->prev = NULL; switch(jobqueue_p->len){ case 0: /* if no jobs in queue */ jobqueue_p->front = newjob; jobqueue_p->rear = newjob; break; default: /* if jobs in queue */ jobqueue_p->rear->prev = newjob; jobqueue_p->rear = newjob; } jobqueue_p->len++; bsem_post(jobqueue_p->has_jobs); pthread_mutex_unlock(&jobqueue_p->rwmutex); } /* Get first job from queue(removes it from queue) <<<<<<< HEAD * * Notice: Caller MUST hold a mutex ======= >>>>>>> da2c0fe45e43ce0937f272c8cd2704bdc0afb490 */ static struct job* jobqueue_pull(jobqueue* jobqueue_p){ pthread_mutex_lock(&jobqueue_p->rwmutex); job* job_p = jobqueue_p->front; switch(jobqueue_p->len){ case 0: /* if no jobs in queue */ break; case 1: /* if one job in queue */ jobqueue_p->front = NULL; jobqueue_p->rear = NULL; jobqueue_p->len = 0; break; default: /* if >1 jobs in queue */ jobqueue_p->front = job_p->prev; jobqueue_p->len--; /* more than one job in queue -> post it */ bsem_post(jobqueue_p->has_jobs); } pthread_mutex_unlock(&jobqueue_p->rwmutex); return job_p; } /* Free all queue resources back to the system */ static void jobqueue_destroy(jobqueue* jobqueue_p){ jobqueue_clear(jobqueue_p); free(jobqueue_p->has_jobs); } /* ======================== SYNCHRONISATION ========================= */ /* Init semaphore to 1 or 0 */ static void bsem_init(bsem *bsem_p, int value) { if (value < 0 || value > 1) { err("bsem_init(): Binary semaphore can take only values 1 or 0"); exit(1); } pthread_mutex_init(&(bsem_p->mutex), NULL); pthread_cond_init(&(bsem_p->cond), NULL); bsem_p->v = value; } /* Reset semaphore to 0 */ static void bsem_reset(bsem *bsem_p) { bsem_init(bsem_p, 0); } /* Post to at least one thread */ static void bsem_post(bsem *bsem_p) { pthread_mutex_lock(&bsem_p->mutex); bsem_p->v = 1; pthread_cond_signal(&bsem_p->cond); pthread_mutex_unlock(&bsem_p->mutex); } /* Post to all threads */ static void bsem_post_all(bsem *bsem_p) { pthread_mutex_lock(&bsem_p->mutex); bsem_p->v = 1; pthread_cond_broadcast(&bsem_p->cond); pthread_mutex_unlock(&bsem_p->mutex); } /* Wait on semaphore until semaphore has value 0 */ static void bsem_wait(bsem* bsem_p) { pthread_mutex_lock(&bsem_p->mutex); while (bsem_p->v != 1) { pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex); } bsem_p->v = 0; pthread_mutex_unlock(&bsem_p->mutex); }
thpool.h
/********************************** * @author Johan Hanssen Seferidis * License: MIT * **********************************/ #ifndef _THPOOL_ #define _THPOOL_ #ifdef __cplusplus extern "C" { #endif /* =================================== API ======================================= */ typedef struct thpool_* threadpool; /** * @brief Initialize threadpool * * Initializes a threadpool. This function will not return untill all * threads have initialized successfully. * * @example * * .. * threadpool thpool; //First we declare a threadpool * thpool = thpool_init(4); //then we initialize it to 4 threads * .. * * @param num_threads number of threads to be created in the threadpool * @return threadpool created threadpool on success, * NULL on error */ threadpool thpool_init(int num_threads); /** * @brief Add work to the job queue * * Takes an action and its argument and adds it to the threadpool's job queue. * If you want to add to work a function with more than one arguments then * a way to implement this is by passing a pointer to a structure. * * NOTICE: You have to cast both the function and argument to not get warnings. * * @example * * void print_num(int num){ * printf("%d\n", num); * } * * int main() { * .. * int a = 10; * thpool_add_work(thpool, (void*)print_num, (void*)a); * .. * } * * @param threadpool threadpool to which the work will be added * @param function_p pointer to function to add as work * @param arg_p pointer to an argument * @return 0 on successs, -1 otherwise. */ int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p); /** * @brief Wait for all queued jobs to finish * * Will wait for all jobs - both queued and currently running to finish. * Once the queue is empty and all work has completed, the calling thread * (probably the main program) will continue. * * Smart polling is used in wait. The polling is initially 0 - meaning that * there is virtually no polling at all. If after 1 seconds the threads * haven't finished, the polling interval starts growing exponentially * untill it reaches max_secs seconds. Then it jumps down to a maximum polling * interval assuming that heavy processing is being used in the threadpool. * * @example * * .. * threadpool thpool = thpool_init(4); * .. * // Add a bunch of work * .. * thpool_wait(thpool); * puts("All added work has finished"); * .. * * @param threadpool the threadpool to wait for * @return nothing */ void thpool_wait(threadpool); /** * @brief Pauses all threads immediately * * The threads will be paused no matter if they are idle or working. * The threads return to their previous states once thpool_resume * is called. * * While the thread is being paused, new work can be added. * * @example * * threadpool thpool = thpool_init(4); * thpool_pause(thpool); * .. * // Add a bunch of work * .. * thpool_resume(thpool); // Let the threads start their magic * * @param threadpool the threadpool where the threads should be paused * @return nothing */ void thpool_pause(threadpool); /** * @brief Unpauses all threads if they are paused * * @example * .. * thpool_pause(thpool); * sleep(10); // Delay execution 10 seconds * thpool_resume(thpool); * .. * * @param threadpool the threadpool where the threads should be unpaused * @return nothing */ void thpool_resume(threadpool); /** * @brief Destroy the threadpool * * This will wait for the currently active threads to finish and then 'kill' * the whole threadpool to free up memory. * * @example * int main() { * threadpool thpool1 = thpool_init(2); * threadpool thpool2 = thpool_init(2); * .. * thpool_destroy(thpool1); * .. * return 0; * } * * @param threadpool the threadpool to destroy * @return nothing */ void thpool_destroy(threadpool); /** * @brief Show currently working threads * * Working threads are the threads that are performing work (not idle). * * @example * int main() { * threadpool thpool1 = thpool_init(2); * threadpool thpool2 = thpool_init(2); * .. * printf("Working threads: %d\n", thpool_num_threads_working(thpool1)); * .. * return 0; * } * * @param threadpool the threadpool of interest * @return integer number of threads working */ int thpool_num_threads_working(threadpool); #ifdef __cplusplus } #endif #endif
Example Application Using the Thread Pool
We will build a simple message processor that periodically checks a message queue for any message to process. If a message is found, it will pop that message from the message queue and spawns a thread pool to process it. Once the message processing finishes, the thread pool will recycle the thread for future use. If all of the threads in the thread pool are busy, then the message processor will not be able to process any more messages until a thread frees up.
Sounds simple right? It is actually a very common functionality in many modern software. Let me show you the code first and then I will explain.
#include <signal.h> #include <string.h> #include <unistd.h> #include "thpool.h" #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <pthread.h> struct Request { int isOccupied; int requestNumber; }; #define NUM_THREADS 8 /* thread function prototypes */ static void * message_processor_thread(void * in); static void processor_thread(void * req); /* global resources */ static volatile sig_atomic_t g_shutdown = 0; static pthread_mutex_t g_RequestBufferMutex; static int g_RequestBufferSize = 100; static struct Request * g_RequestBuffer = NULL; static int reqnum = 0; static threadpool thpool = NULL; static void signalHandler (int sig, siginfo_t * siginf, void *ptr) { switch(sig) { case SIGTERM: case SIGINT: { printf("shutdown signal received!\n"); g_shutdown = 1; break; } case SIGHUP: default: { printf("unhandled signal %d\n", sig); } } } int installSignalHandler(void) { struct sigaction act; memset(&act , 0 , sizeof(act)); act.sa_flags = SA_SIGINFO | SA_RESTART; act.sa_sigaction = signalHandler; if(sigaction(SIGTERM, &act, NULL) < 0) { printf("Failed to install SIGTERM signal handler\n"); return -1; } if(sigaction(SIGINT, &act, NULL) < 0) { printf("Failed to install SIGINT signal handler\n"); return -1; } return 0; } int main (void) { pthread_t mp_thread; int i = 0; /* initialize signal handler */ if (installSignalHandler()) { printf("failed to init signal handler\n"); exit(-1); } /* allocate a simple array to hold requests */ g_RequestBuffer = (struct Request *) malloc (sizeof(struct Request) * g_RequestBufferSize); if (!g_RequestBuffer) { printf("failed to init request buffer\n"); exit(-1); } memset(g_RequestBuffer, 0, sizeof(struct Request) * g_RequestBufferSize); /* init a thread pool */ thpool = thpool_init(NUM_THREADS); if(!thpool) { printf("failed to init thread pool\n"); exit(-1); } /* create a message processor thread (not a thread pool) * and a mutext to protect the global request buffer */ pthread_mutex_init(&g_RequestBufferMutex, NULL); if(pthread_create(&mp_thread, NULL, message_processor_thread, (void*)NULL)) { printf("failed to create message processor thread\n"); exit(-1); } /* testing code - create a request every 5 seconds and add it to request buffer */ while (!g_shutdown) { int slot = -1; /* find the next unoccupied slot and put the request there */ for (i = 0; i < g_RequestBufferSize; i++) { if (!g_RequestBuffer[i].isOccupied) slot = i; } /* if an empty slot is found, add request */ if (slot != -1 && slot >= 0) { pthread_mutex_lock(&g_RequestBufferMutex); g_RequestBuffer[slot].isOccupied = 1; g_RequestBuffer[slot].requestNumber = reqnum; pthread_mutex_unlock(&g_RequestBufferMutex); reqnum++; } sleep(1); } thpool_destroy(thpool); return 0; } static void * message_processor_thread(void* in) { int i = 0; int ret = -1; while (!g_shutdown) { for (i = 0; i < g_RequestBufferSize; i++) { /* if there is a valid request and thread pool is not full then we attempt to process */ if (g_RequestBuffer[i].isOccupied && thpool_num_threads_working(thpool) < g_RequestBufferSize) { /* make a copy of the request */ struct Request * req = (struct Request *) malloc (sizeof(struct Request)); memcpy(req, &g_RequestBuffer[i], sizeof(struct Request)); /* spawn thread pool to process*/ ret = thpool_add_work(thpool, processor_thread, (void *) req); if (ret) { /* failed to spawn a thread to process - retry in next iteration */ printf("failed to spawn a thread to process request\n"); } else { /* successfully spawned a thread to process - clear the request */ /* use mutext to protect this resource from concurrent writes */ pthread_mutex_lock(&g_RequestBufferMutex); memset(&g_RequestBuffer[i], 0, sizeof(struct Request)); pthread_mutex_unlock(&g_RequestBufferMutex); } } } sleep(5); } return NULL; } static void processor_thread(void * req) { struct Request * myreq = (struct Request *) req; if (!req) { printf("invalid request\n"); } else { /* processing the request - by printing it out */ printf("myreq->requestNumber = %d\n", myreq->requestNumber); } }
The Main Function
The main function does several things:
- Installs signal handler – this ensures the program can respond to
ctrl-c
to break out of the main loops controlled byg_shutdown
flag. - initializes a array of requests– this is an example array that holds requests to process
- initializes thread pool – this creates a thread pool containing
NUM_THREAD
threads. - creates message processor thread – this thread continuously checks for new requests and spawns a thread from thread pool to process.
- main loop to inject test data to array of requests – using a mutex to protect the array from concurrent writes. This loops adds a new request every 1 seconds so that the
message processor
thread can spawn threads from thread pool to process.
The Message Processor Thread
Message processor thread runs concurrently as the main thread that checks the new requests from the array of requests every 5 seconds. It can spawn multiple threads from thread pool to process each request. I purposely made the message processor’s checking rate slower than the injection rate so we could see multiple threads being spawned to process them concurrently.
This function registers a thread from the thread pool by a call to thpool_add_work
, which spawns a thread with name processor_thread
. When the processing completes, the function resets the request slot with a mutex protection.
The Processor Thread
This is the function that is responsible for processing the request. The thread pool will spawns multiple processor threads to process requests concurrently. For this example, the processing is a simple printing of request number.
Summary
That is it! This is how to utilize a very simple and robust thread pool in C. Feel free to copy and use the code pasted here to build your multi-threaded application.

Hi, I’m Cary — a tech enthusiast, educator, and author, currently a software architect at Hornetlabs Technology in Canada. I love simplifying complex ideas, tackling coding challenges, and sharing what I learn with others.