线程池
发布日期:2021-05-08 05:02:54 浏览次数:19 分类:原创文章

本文共 5252 字,大约阅读时间需要 17 分钟。

一、状态封装头文件
#ifndef CONDITION_H
#define CONDITION_H

#include <pthread.h>

//封装一个互斥量和条件变量作为状态
typedef struct condition
{
pthread_cond_t pcond;
pthread_mutex_t pmutex;
}condition_t;

int condition_init(condition_t* cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t cond);
int condition_timedwait(condition_t
cond,const struct timespec *abstime);
int condition_signal(condition_t *cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif // CONDITION_H

二、状态封装实现
#include “condition.h”

int condition_init(condition_t *cond)
{
int status;
if(status=pthread_mutex_init(&cond->pmutex,NULL))
return status;
if(status=pthread_cond_init(&cond->pcond,NULL))
return status;
return 0;
}

int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond,&cond->pmutex);
}

int condition_timedwait(condition_t *cond, const timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond,&cond->pmutex,abstime);
}

int condition_signal(condition_t *cond)
{
return pthread_cond_signal(&cond->pcond);
}

int condition_broadcast(condition_t *cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
int condition_destroy(condition_t *cond)
{
int status;
if(status=pthread_mutex_destroy(&cond->pmutex))
return status;
if(status=pthread_cond_destroy(&cond->pcond))
return status;
return 0;
}

三、线程池头文件
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include “condition.h”
typedef struct task
{
void* (*run)(void args);
void
arg;
struct task *next;
}task_t;

typedef struct threadpool
{
condition_t ready;
task_t *taskfirst;
task_t *tasklast;
int counter;
int idle;
int max_threads;
int quit;
}threadpool_t;

void threadpool_init(threadpool_t *pool,int threads);

void threadpool_add_task(threadpool_t pool,void(*run)(void *args),void *arg);

void threadpool_destroy(threadpool_t *pool);
#endif // THREADPOOL_H

四、线程池实现
#include “threadpool.h”
#include <stdlib.h>
#include <stdio.h>
#include
#include <errno.h>
#include <time.h>
//创建线程的执行 pthread_create调用执行
void *thread_routine(void *arg)
{
struct itimerspec abstime;
int timeout;
printf(“thread %d is starting\n”,(int)pthread_self());
threadpool_t pool =(threadpool_t)arg;
while(1)
{
timeout=0;

    //加锁    condition_lock(&pool->ready);    pool->idle++;    //没有任务,等待    while (pool->taskfirst==NULL&&!pool->quit)    {        printf("thread %d is waiting\n",(int)pthread_self());        //获取当前时间        clock_gettime(CLOCK_REALTIME,&abstime.it_value);        abstime.it_value.tv_sec+=2;        int status;        status = condition_timedwait(&pool->ready,&abstime.it_value);        if(status==ETIMEDOUT)        {            printf("thread %d is timeout\n",(int)pthread_self());            timeout =1;            break;        }    }    pool->idle--;    if(pool->taskfirst!=NULL)    {        //取出任务        task_t *t = pool->taskfirst;        pool->taskfirst=t->next;        //解锁任务队列        condition_unlock(&pool->ready);        //执行任务        t->run(t->arg);        //释放内存        free(t);        //重新加锁        condition_lock(&pool->ready);    }    //退出线程池,销毁线程    if(pool->quit&&pool->taskfirst==NULL)    {        pool->counter--;        if(pool->counter==0)        {            condition_signal(&pool->ready);        }        condition_unlock(&pool->ready);        break;    }    //超时销毁线程    if(timeout==1)    {        pool->counter--;        condition_unlock(&pool->ready);        break;    }    condition_unlock(&pool->ready);}printf("thread %d is exiting\n",(int)pthread_self());return NULL;

}
void threadpool_init(threadpool_t *pool, int threads)
{
pool->idle=0;
pool->quit=0;
pool->counter=0;
pool->tasklast=NULL;
pool->taskfirst=NULL;
pool->max_threads=threads;
condition_init(&pool->ready);
}

void threadpool_add_task(threadpool_t *pool, void *(*run)(void *), void *arg)
{
task_t newtask = (task_t)malloc(sizeof (task_t));
newtask->next=NULL;
newtask->run=run;
newtask->arg=arg;

//加锁,添加新任务condition_lock(&pool->ready);if(pool->taskfirst==NULL){    pool->taskfirst=newtask;}else{    pool->tasklast->next=newtask;}pool->tasklast=newtask;//如果有空闲线程,唤醒线程做任务if(pool->idle>0){    condition_signal(&pool->ready);}//没有空闲的线程,创建一个线程else if(pool->counter<pool->max_threads){    pthread_t tid;    pthread_create(&tid,NULL,thread_routine,pool);    pool->counter++;}//解锁共享资源condition_unlock(&pool->ready);

}

void threadpool_destroy(threadpool_t *pool)
{
if(pool->quit)
{
return;
}
//加锁
condition_lock(&pool->ready);
pool->quit=1;
if(pool->idle>0)
{
condition_broadcast(&pool->ready);
}
while(pool->counter)
{
condition_wait(&pool->ready);
}
//解锁
condition_unlock(&pool->ready);
//释放锁资源
condition_destroy(&pool->ready);
}

五、测试线程池
#include
#include “threadpool.h”
#include <unistd.h>
using namespace std;

void* mytask(void arg)
{
printf(“thread %d is working on task %d\n”,(int)pthread_self(),
(int*)arg);
sleep(1);
free(arg);
return NULL;
}
int main()
{
threadpool_t pool;
threadpool_init(&pool,10);
for(int i=0;i<100;i++)
{
int arg = (int) malloc(sizeof(int));
*arg = i;
threadpool_add_task(&pool,mytask,arg);
}
threadpool_destroy(&pool);
return 0;
}

六、测试结果
在这里插入图片描述

上一篇:使用BAT批处理 匹配查找指定文件夹,并在当文件夹下创建空文件
下一篇:简单Makefile的编写

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2025年03月28日 08时11分28秒