本文介绍线程池的作用、线程池的应用场景、线程池的工作原理、代码实现线程池以及与nginx的线程池对比分析。
为什么会有线程池,到底解决了什么问题
以日志为例,在写日志loginfo(“xxx”),与日志落盘,是两码事,它们两之间应该是异步的。那么异步解耦就是将日志当作一个任务task,将这个任务抛给线程池去处理,由线程池去负责日志落盘。对于应用程序而言,就可以提升落盘的效率。
以nginx为例,一秒几万的请求,速度很快。如果在其中加一个日志,那么qps一下子就掉下来了,因为每请求一次就需要落盘一次,那么整个服务器的性能就下降。我们可以引入一个线程池,把日志这个任务抛给线程池,对于主循环来说,就只抛任务即可,这样就可以大大提升主线程的效率。这就是线程池异步解耦的作用
不仅仅是日志落盘,还有很多地方都可以用线程池,比较耗时的操作如数据库操作,io处理等,都可以用线程池。
线程池有必要将线程与cpu做亲和性吗? 在注重cpu处理能力的时候,可以做黏合;如果注重的是异步解耦,那么这里更加注重的是任务,没必要将线程和cpu做绑定。
我们在使用线程池的时候,是当作一个组件去使用。所以在使用组件的时候,我们首先想到的是线程池应该提供哪些api。
这三个api是最核心的api,其他可扩展的api都是可有可无的,而这三个api是一定要有的。
想象去银行营业厅的场景。柜员:为客户提供服务;客户:是来办业务的,对于柜员来说,这些人就是任务。那么这两个形象就构建出了pthread和task。
那么这个公示牌(xxx号来几号柜台办理业务),是谁的属性呢?告示牌的作用是管理客户和柜员有秩序工作,它不隶属于柜员,也不隶属于客户,它是一个管理工具。
这么这就自然而然的形成了3个组件,那么它们都应该有什么属性呢:
下面将柜员称为执行队列,客户称为任务队列,告示牌称为池管理组件。
错误理解:要使用线程就从线程池里面拿一个线程出来使用,用完再返回给线程池。这种理解是连接池的概念。而线程池是多个线程去任务队列取任务,竞争任务。
所以线程的核心就是下面的伪代码:
while(1){ get_task(); task->func();}
相关视频推荐
150行代码,带你手写线程池,自行准备linux环境
cpu密集型和io密集型的线程池在开源框架中的应用
学习地址:C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂
需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
//执行队列typedef struct NWORKER { pthread_t id; int termination; struct NTHREADPOLL *thread_poll; struct NWORKER *prev; struct NWORKER *next;} worker_t;//任务队列typedef struct NTASK { void (*task_func)(void *arg); void *user_data; struct NTASK *prev; struct NTASK *next;} task_t;//池管理组件typedef struct NTHREADPOLL { worker_t *workers; task_t *tasks; pthread_cond_t cond; pthread_mutex_t mutex;} thread_poll_t;
//头插法#define LL_ADD(item, list)do{ item->prev=NULL; item->next=list; if(list!=NULL){ list->prev=item; } list=item }while(0)#define LL_REMOVE(item, list)do{ if(item->prev!=NULL){ item->prev->next=item->next; } if(item->next!=NULL){ item->next->prev=item->prev; } if(list==item){ list=item->next; } item->prev=item->next=NULL; }while(0)
创建其实就是创建thread_poll_t结构体,然后按照给定的宏创建线程和worker。
push就是给task队列增加一个任务,然后用signal通知cond。
销毁将所有线程的termination置1,然后广播cond即可。
//return access create thread num;int thread_poll_create(thread_poll_t *thread_poll, int thread_num) { if (thread_num < 1)thread_num = 1; memset(thread_poll, 0, sizeof(thread_poll_t)); //init cond pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&thread_poll->cond, &blank_cond, sizeof(pthread_cond_t)); //init mutex pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&thread_poll->mutex, &blank_mutex, sizeof(pthread_mutex_t)); // one thread one worker int idx = 0; for (idx = 0; idx < thread_num; idx++) { worker_t *worker = malloc(sizeof(worker_t)); if (worker == NULL) { perror("worker malloc err
"); return idx; } memset(worker, 0, sizeof(worker_t)); worker->thread_poll = thread_poll; int ret = pthread_create(&worker->id, NULL, thread_callback, worker); if (ret) { perror("pthread_create err
"); free(worker); return idx; } LL_ADD(worker, thread_poll->workers); } return idx;}int thread_poll_push_task(thread_poll_t *thread_poll, task_t *task) { pthread_mutex_lock(&thread_poll->mutex); LL_ADD(task, thread_poll->tasks); pthread_cond_signal(&thread_poll->cond); pthread_mutex_unlock(&thread_poll->mutex);}int thread_destroy(thread_poll_t *thread_poll) { worker_t *worker = NULL; for (worker = thread_poll->workers; worker != NULL; worker = worker->next) { worker->termination = 1; } pthread_mutex_lock(&thread_poll->mutex); pthread_cond_broadcast(&thread_poll->cond); pthread_mutex_unlock(&thread_poll->mutex);}
线程要做的就是取任务,执行任务。取任务从任务队列里面取。
task_t *get_task(worker_t *worker) { while (1) { pthread_mutex_lock(&worker->thread_poll->mutex); while (worker->thread_poll->workers == NULL) { if (worker->termination)break; pthread_cond_wait(&worker->thread_poll->cond, &worker->thread_poll->mutex); } if (worker->termination) { pthread_mutex_unlock(&worker->thread_poll->mutex); return NULL; } task_t *task = worker->thread_poll->tasks; if (task) { LL_REMOVE(task, worker->thread_poll->tasks); } pthread_mutex_unlock(&worker->thread_poll->mutex); if (task != NULL) { return task; } }};void *thread_callback(void *arg) { worker_t *worker = (worker_t *) arg; while (1) { task_t *task = get_task(worker); if (task == NULL) { free(worker); pthread_exit("thread termination
"); } task->task_func(task); }}
这里我们创建了1000个task,开了10个thread。记住task以及task的参数,由task的func来销毁。
//// Created by 68725 on 2022/7/25.//#include #include #include #include #include //头插法#define LL_ADD(item, list)do{ item->prev=NULL; item->next=list; if(list!=NULL){ list->prev=item; } list=item; }while(0)#define LL_REMOVE(item, list)do{ if(item->prev!=NULL){ item->prev->next=item->next; } if(item->next!=NULL){ item->next->prev=item->prev; } if(list==item){ list=item->next; } item->prev=item->next=NULL; }while(0)//执行队列typedef struct NWORKER { pthread_t id; int termination; struct NTHREADPOLL *thread_poll; struct NWORKER *prev; struct NWORKER *next;} worker_t;//任务队列typedef struct NTASK { void (*task_func)(void *arg); void *user_data; struct NTASK *prev; struct NTASK *next;} task_t;//池管理组件typedef struct NTHREADPOLL { worker_t *workers; task_t *tasks; pthread_cond_t cond; pthread_mutex_t mutex;} thread_poll_t;task_t *get_task(worker_t *worker) { while (1) { pthread_mutex_lock(&worker->thread_poll->mutex); while (worker->thread_poll->workers == NULL) { if (worker->termination)break; pthread_cond_wait(&worker->thread_poll->cond, &worker->thread_poll->mutex); } if (worker->termination) { pthread_mutex_unlock(&worker->thread_poll->mutex); return NULL; } task_t *task = worker->thread_poll->tasks; if (task) { LL_REMOVE(task, worker->thread_poll->tasks); } pthread_mutex_unlock(&worker->thread_poll->mutex); if (task != NULL) { return task; } }};void *thread_callback(void *arg) { worker_t *worker = (worker_t *) arg; while (1) { task_t *task = get_task(worker); if (task == NULL) { free(worker); pthread_exit("thread termination
"); } task->task_func(task); }}//return access create thread num;int thread_poll_create(thread_poll_t *thread_poll, int thread_num) { if (thread_num < 1)thread_num = 1; memset(thread_poll, 0, sizeof(thread_poll_t)); //init cond pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&thread_poll->cond, &blank_cond, sizeof(pthread_cond_t)); //init mutex pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&thread_poll->mutex, &blank_mutex, sizeof(pthread_mutex_t)); // one thread one worker int idx = 0; for (idx = 0; idx < thread_num; idx++) { worker_t *worker = malloc(sizeof(worker_t)); if (worker == NULL) { perror("worker malloc err
"); return idx; } memset(worker, 0, sizeof(worker_t)); worker->thread_poll = thread_poll; int ret = pthread_create(&worker->id, NULL, thread_callback, worker); if (ret) { perror("pthread_create err
"); free(worker); return idx; } LL_ADD(worker, thread_poll->workers); } return idx;}int thread_poll_push_task(thread_poll_t *thread_poll, task_t *task) { pthread_mutex_lock(&thread_poll->mutex); LL_ADD(task, thread_poll->tasks); pthread_cond_signal(&thread_poll->cond); pthread_mutex_unlock(&thread_poll->mutex);}int thread_destroy(thread_poll_t *thread_poll) { worker_t *worker = NULL; for (worker = thread_poll->workers; worker != NULL; worker = worker->next) { worker->termination = 1; } pthread_mutex_lock(&thread_poll->mutex); pthread_cond_broadcast(&thread_poll->cond); pthread_mutex_unlock(&thread_poll->mutex);}void counter(task_t *task) { int idx = *(int *) task->user_data; printf("idx:%d pthread_id:%llu
", idx, pthread_self()); free(task->user_data); free(task);}#define THREAD_COUNT 10#define TASK_COUNT 1000int main() { thread_poll_t thread_poll = {0}; int ret = thread_poll_create(&thread_poll, THREAD_COUNT); if (ret != THREAD_COUNT) { thread_destroy(&thread_poll); } int i = 0; for (i = 0; i < TASK_COUNT; i++) { //create task task_t *task = (task_t *) malloc(sizeof(task_t)); if (task == NULL) { perror("task malloc err
"); exit(1); } task->task_func = counter; task->user_data = malloc(sizeof(int)); *(int *) task->user_data = i; //push task thread_poll_push_task(&thread_poll, task); } getchar(); thread_destroy(&thread_poll);}
cond初始化,mutex初始化,创建线程
取任务,执行任务
nginx是将任务插到尾部,我们做的是插到头部
线程到底初始化多少呢?如果是计算密集型就不用太多的线程,如果是任务密集型可以多几个。以下是经验值,不一定一定按照这个来。
随着任务越来越多,线程不够用怎么办?我们可以开一个监控线程,设n=running线程 / 总线程。当n>上水位时,监控线程创建几个线程;当n<下水位时,监控线程销毁几个线程。可以设置30%和70%。
页面更新:2024-05-05
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号