
本文共 6019 字,大约阅读时间需要 20 分钟。
(一)线程池的组成:
两个结构体: 任务结构体 和 线程池结构体
定义如下:
任务结构体:
任务结构体有三个变量,分别是 任务执行的函数(函数指针),赋给任务的参数,任务的next指针。
struct task { void *(*task)(void *arg); //指针函数(即这个任务要去做什么事情) void *arg; //参数 struct task *next; //指向下一个任务的指针 };
线程池结构体:
注意:一个线程池结构体一共有 7个成员变量。
typedef struct thread_pool{ pthread_mutex_t lock;//互斥锁 pthread_cond_t cond;//条件变量 struct task *task_list;//任务链表 pthread_t *tids; //存储创建线程id号 unsigned waiting_tasks; //任务队列中正在等待的任务个数 unsigned active_threads; //正在工作的线程个数 bool shutdown; //开关标志 }thread_pool;(二)线程池的操作函数:
(1)线程池初始化函数:(输入的变量: 指向线程池的指针,创建的线程数目)
初始化互斥锁,初始化条件变量,初始化任务链表,初始化 waiting_tashs为0,初始化 active_threads 为最大线程值,初始化要创建的线程,shutdown为 false
步骤:
1.初始化互斥锁,条件变量
2.初始化任务链表表头,并把链表头 的next指针设成 NULL
3.等待任务数设置为0. 工作线程数设置为 threads_num
4/开关标志位设置为 falsh
5.for循环创建线程
bool pool_init(thread_pool *pool,unsigned int threads_num)//初始化线程池{ int i; pthread_mutex_init(&pool->lock,NULL); pthread_cond_init(&pool->cond,NULL); pool->task_list=malloc(sizeof(struct task)); //初始化任务链表 pool->waiting_tasks=0; pool->active_threads=threads_num; pool->shutdown=false; pool->tids=malloc(sizeof(pthread_t)*MAX_ACTIVE_THREADS); //MAX_ACTIVE_THREADS为一个宏,表示要能容纳的最大线程数 if(pool->task_list==NULL||pool->tids==NULL) { perror("allocate memory error"); return false; } pool->task_list->next=NULL; for(i=0;i(3)往线程池新增任务:active_threads;i++) { if(pthread_create(&((pool->tids)[i]),NULL,routine,(void *)pool)!=0) { perror("create thread failed!\n"); return false; } } return true; }
输入参数:线程池指针,任务要处理的函数指针,给任务的参数
步骤:
1.创建新的任务结点,并把他的next置空
2.给任务结点赋参数(指针函数,给任务的参数arg)
3.给互斥锁上锁,因为准备操作共享资源(任务链表)
4.把新建的任务结点插入到任务链表的结尾,线程池的等待任务数目+1
5.互斥锁解锁。
bool add_task(thread_pool *pool,void *(*task)(void *arg),void *arg){ struct task *new_task=malloc(sizeof(struct task)); if(new_task==NULL) { perror("allocate memory error"); return false; } new_task->task=task;//任务存入新节点 new_task->arg=arg; new_task->next=NULL; pthread_mutex_lock(&pool->lock); if(pool->waiting_tasks>=MAX_WAITING_TASKS) { pthread_mutex_unlock(&pool->lock); fprintf(stderr,"too many tasks.\n");//任务太多添加失败 free(new_task); return false; } //遍历链表,找到队列尾结点,以便把新结点加入
struct task *tmp=pool->task_list; while(tmp->next!=NULL) tmp=tmp->next; tmp->next=new_task; pool->waiting_tasks++; pthread_mutex_unlock(&pool->lock); //解互斥锁 pthread_cond_signal(&pool->cond); //解锁后唤醒一个阻塞在条件变量的线程,因为线程被创建后执行的函数都会默认被阻塞 return true; }
int add_thread(thread_pool *pool, unsigned additional_threads){ //增加线程数判断:如增加0条线程,则直接退出该函数 if(additional_threads == 0) return 0; //计算总线程数,用于循环创建线程时使用 unsigned total_threads = pool->active_threads + additional_threads; int i, actual_increment = 0; //actual_increment 为实际上增加的线程数 //循环创建指定数目的线程:不能超过最大线程数 for(i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) { //创建线程,并判断是否创建成功,失败提示并退出该函数 if(pthread_create(&((pool->tids)[i]), NULL, thread_routine, (void *)pool) != 0) { perror("add threads error"); if(actual_increment == 0) return -1; break; } //实际新增加的线程数 actual_increment++; } //计算增加指定数目的线程后,线程池活动线程数加上实际成功增加的线程数 pool->active_threads += actual_increment; return actual_increment;}
int remove_thread(thread_pool *pool,unsigned int removing_threads){ if(removing_threads==0) { return pool->active_threads; } int remain_threads=pool->active_threads-removing_threads; remain_threads=remain_threads>0 ? remain_threads:1; int i; for(i=pool->active_threads-1;i>remain_threads-1;i--) { errno=pthread_cancel(pool->tids[i]); //删除线程是按照线程创建的顺序来删的 if(errno!=0) //线程池中的线程不一定都能删除的, { //因为在某线程在执行任务时,有个函数会让他暂时不能被删除 break; //下面会有解释的 } //所以这里就算pthread_cancel失败,也是break,而不是return } if(i==pool->active_threads-1) return -1; else { pool->active_threads=i+1; return i+1; } }
bool destroy_pool(thread_pool *pool){ pool->shutdown = true; pthread_cond_broadcast(&pool->cond); //唤醒因条件变量沉睡的所有线程 int i; for(i=0;iactive_threads;i++) { errno=pthread_join(pool->tids[i],NULL); //等待线程回收完毕 if(errno!=0) { printf("join tids[%d] error:%s\n",i,strerror(errno)); } else printf("[%u] is joined\n",(unsigned)pool->tids[i]); } free(pool->task_list); free(pool->tids); free(pool); return true;}
void *routine(void *arg){ thread_pool *pool = (thread_pool*)arg; struct task *p; while(1) { //从任务链表(head->next)中扣去节点 pthread_cleanup_push(handler,(void *)&pool->lock); pthread_mutex_lock(&pool->lock); //判断任务链表中是否有节点,没有就阻塞当前线程 while(pool->waiting_tasks == 0 && !pool->shutdown) //这个循环的作用是,当没任务时让线程沉睡,有任务时循环取任务 { pthread_cond_wait(&pool->cond,&pool->lock); //让线程沉睡 } if(pool->waiting_tasks==0&&pool->shutdown==true) //shutdown=true也要等任务全执行完了才能删除线程池 { pthread_mutex_unlock(&pool->lock); pthread_exit(NULL); } p =pool->task_list->next; pool->task_list->next =p->next; pool->waiting_tasks--; //解锁 pthread_mutex_unlock(&pool->lock); pthread_cleanup_pop(0); //处理刚才扣下来的那个节点任务 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); //禁止当前这个线程被其他线程调用pthread_cancel来删除 (p->task)(p->arg); //执行任务 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); //取消禁止删除效果 //释放定义的那个节点 free(p); } pthread_exit(NULL);//退出}
发表评论
最新留言
关于作者
