nothing

由于muduo的线程池是抢占式的,在锁过多利用的情况下我们需要自己利用自己的规则去分配和调度线程,我这个实现的就是一个可自己分配和调度的线程池,其特点有以下几点

  1. 每个线程都有自己的消息队列并且大小不限
  2. 每个线程都可自由阻塞,但是消息队列正常接受消息
  3. 可利用Run进行开启线程或者启动的时候利用一个列表开启线程池,很方便
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/* 
* File: NonPreemptiveThreadPool.h
* Author: zerber
*
* Created on 2017年9月1日, 下午1:27
*/
#pragma once

#include "local_common/Public.h"

#include <muduo/base/Thread.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Condition.h>
#include <muduo/base/CountDownLatch.h>

#ifndef NONPREEMPTIVETHREADPOOL_H
#define NONPREEMPTIVETHREADPOOL_H

typedef boost::function<void ()> Task;

struct RunThreadNode
{
RunThreadNode(const std::string& name,int queuesize = 0):
countDownLatch_(1),
listMutex_(),
blockMutex_(),
notEmpty_(listMutex_),
blockCond_(blockMutex_),
isBolck_(false),
threadName_(name),
thread_(boost::bind(&RunThreadNode::RunInThread_,this),threadName_),
running_(false),
queuesize_(queuesize){}
~RunThreadNode()
{
if(running_)
{
ThreadStop();
}
}

void PushTaskInQueue(const Task& task);
void ThreadStart();
void ThreadStop();
const std::string& GetThreadName();
void ThreadBlock();
void ThreadWakeUp();

muduo::CountDownLatch countDownLatch_;
private:
Task Take_();
void RunInThread_();

muduo::MutexLock listMutex_;
muduo::MutexLock blockMutex_;
muduo::Condition notEmpty_;
muduo::Condition blockCond_;
bool isBolck_;
std::string threadName_;

muduo::Thread thread_;
std::queue<Task> queue_;

bool running_;
int queuesize_;
};

typedef boost::shared_ptr<RunThreadNode> RunThreadNodePtr;

class NonPreemptiveThreadPool
{
public:
//指定队列长度,指定生成哪些线程
NonPreemptiveThreadPool(int queuesize,
const std::string& name,
int* threadList,
int threadListNum);
~NonPreemptiveThreadPool();

void Run(int threadNum, const Task& f);
void Stop(int threadNum);
void AllStart();
void AllStop();
void ThreadBlock(int threadNum);
void ThreadWakeUp(int threadNum);
private:

boost::unordered_map<int, RunThreadNodePtr> threads_;
int queuesize_;
std::string name_;
};

#endif /* NONPREEMPTIVETHREADPOOL_H */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/* 
* File: NonPreemptiveThreadPool.cpp
* Author: zerber
*
* Created on 2017年9月1日, 下午1:27
*/
#include "NonPreemptiveThreadPool.h"

#include <muduo/base/Exception.h>

void RunThreadNode::PushTaskInQueue(const Task& task)
{
muduo::MutexLockGuard lock(listMutex_);

queue_.push(task);
notEmpty_.notify();
}

void RunThreadNode::ThreadStart()
{
running_ = true;
thread_.start();
}

void RunThreadNode::ThreadStop()
{
{
muduo::MutexLockGuard lock(listMutex_);
running_ = false;
notEmpty_.notify();
}
thread_.join();

LOG_DEBUG << "thread " << threadName_ << " stop";
}

const std::string& RunThreadNode::GetThreadName()
{
return threadName_;
}

void RunThreadNode::ThreadBlock()
{
isBolck_ = true;
}

void RunThreadNode::ThreadWakeUp()
{
if(isBolck_)
{
isBolck_ = false;
blockCond_.notify();
}
}

Task RunThreadNode::Take_()
{
muduo::MutexLockGuard lock(listMutex_);
while(queue_.empty() && running_)
{
notEmpty_.wait();
}

Task task;
if(!queue_.empty())
{
task = queue_.front();
queue_.pop();
}
return task;
}

void RunThreadNode::RunInThread_()
{
muduo::MutexLockGuard lock(blockMutex_);
try
{
while (running_)
{
Task task(Take_());
while(isBolck_)
{
blockCond_.wait();
}

if(task)
{
task();
}
}
}
catch (const muduo::Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", threadName_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", threadName_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", threadName_.c_str());
throw;
}
}
////////////////////////////////////////////////////////////////////////////////

NonPreemptiveThreadPool::NonPreemptiveThreadPool(int queuesize,
const std::string& name,
int* threadList = NULL,
int threadListNum = 0)
:queuesize_(queuesize),
name_(name)
{
if(threadList != NULL && threadListNum > 0)
{
for(int i=0 ; i<threadListNum ; i++)
{
char id[32];
snprintf(id, sizeof(id), "%d", threadList[i]);
RunThreadNodePtr pCurRunThreadNode = boost::make_shared<RunThreadNode>(name_+id,queuesize_);
threads_.insert(make_pair(threadList[i],pCurRunThreadNode));
}
}
}

NonPreemptiveThreadPool::~NonPreemptiveThreadPool()
{
AllStop();
}

void NonPreemptiveThreadPool::Run(int threadNum, const Task& f)
{
auto iter = threads_.find(threadNum);
if(iter != threads_.end())
{
iter->second->PushTaskInQueue(f);
}
else
{
char id[32];
snprintf(id, sizeof(id), "%d", threadNum);
RunThreadNodePtr pCurRunThreadNode = boost::make_shared<RunThreadNode>(name_+id,queuesize_);
threads_.insert(make_pair(threadNum,pCurRunThreadNode));
pCurRunThreadNode->ThreadStart();
LOG_DEBUG << "thread " << pCurRunThreadNode->GetThreadName() << " start";
}
}

void NonPreemptiveThreadPool::Stop(int threadNum)
{
auto iter = threads_.find(threadNum);
if(iter == threads_.end()) return;

iter->second->ThreadStop();
LOG_DEBUG << "thread " << iter->second->GetThreadName() << " stop";
}

void NonPreemptiveThreadPool::AllStart()
{
auto iter = threads_.begin();
for(; iter!=threads_.end() ;iter++)
{
iter->second->ThreadStart();
}
LOG_DEBUG << "all threads has been start";
}

void NonPreemptiveThreadPool::AllStop()
{
auto iter = threads_.begin();
for(; iter!=threads_.end() ;iter++)
{
iter->second->ThreadStop();
}
LOG_DEBUG << "all threads has been stop";
}

void NonPreemptiveThreadPool::ThreadBlock(int threadNum)
{
auto iter = threads_.find(threadNum);
if(iter == threads_.end()) return;
iter->second->ThreadBlock();
LOG_DEBUG << "thread " << iter->second->GetThreadName() << " block";
}

void NonPreemptiveThreadPool::ThreadWakeUp(int threadNum)
{
auto iter = threads_.find(threadNum);
if(iter == threads_.end()) return;
iter->second->ThreadWakeUp();
LOG_DEBUG << "thread " << iter->second->GetThreadName() << " wake up";
}