veriosn 2

This commit is contained in:
dongl 2023-04-13 13:04:52 +08:00
commit 4bc2259787
6 changed files with 282 additions and 0 deletions

8
CMakeLists.txt Normal file
View File

@ -0,0 +1,8 @@
cmake_minimum_required(VERSION 3.24)
project(thread_pool_v2)
set(CMAKE_CXX_STANDARD 17)
aux_source_directory(pool DIR_POOL)
add_executable(thread_pool_v2 main.cpp ${DIR_POOL})

49
main.cpp Normal file
View File

@ -0,0 +1,49 @@
#include <iostream>
#include <random>
#include <csignal>
#include "pool/ThreadPool.h"
std::random_device rd; // 真实随机数产生器
std::mt19937 mt(rd()); //生成计算随机数mt
std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数
auto rnd = std::bind(dist, mt);
// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
const int res = a * b;
}
int main() {
ThreadPool* pool = new ThreadPool(10);
time_t time1 = 0;
time_t time2 = 0;
for (int i = 0; i < 1000000; ++i) {
time_t begin = time(NULL);
pool->pushTask(multiply, rnd(), rnd());
time_t end = time(NULL);
time1 += end -begin;
}
for (int i = 0; i < 1000000; ++i) {
int a = rnd();
int b = rnd();
time_t begin = time(NULL);
std::thread t = std::thread([&]() {
const int res = a * b;
});
t.join();
time_t end = time(NULL);
time2 += end - begin;
}
std::cout << time1 << std::endl;
std::cout << time2 << std::endl;
return 0;
}

79
pool/TaskQueue.h Normal file
View File

@ -0,0 +1,79 @@
//
// Created by dongl on 23-4-12.
//
#ifndef THREAD_POOL_V2_TASKQUEUE_H
#define THREAD_POOL_V2_TASKQUEUE_H
#include <queue>
#include <mutex>
#include <functional>
#include "event.h"
template<typename T>
class TaskQueue {
public:
TaskQueue() {}
TaskQueue(TaskQueue&& taskQueue) {}
~TaskQueue() {}
public:
bool empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
int size() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
void press(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
bool eject(T& t) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) return false;
t = event(std::move(m_queue.front()));
m_queue.pop();
return true;
}
bool ejects(std::queue<event>& ts, size_t size) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) return false;
for (int i = 0; i < size; ++i) {
ts.emplace(std::move(event(index++, m_queue.front())));
m_queue.pop();
}
return true;
}
bool sub_swap(TaskQueue<event>& dest, size_t size) {
std::queue<event> queue;
bool st = this->ejects(queue, size);
if (!st) {
return st;
} else {
for (int i = 0; i < size; ++i) {
dest.press(queue.front());
queue.pop();
}
}
return st;
}
private:
std::queue<T> m_queue;
std::mutex m_mutex;
size_t index = 0;
};
#endif //THREAD_POOL_V2_TASKQUEUE_H

66
pool/ThreadPool.cpp Normal file
View File

@ -0,0 +1,66 @@
//
// Created by dongl on 23-4-12.
//
#include "ThreadPool.h"
class event;
class ThreadPool::thread_tasks {
friend ThreadPool;
public:
explicit thread_tasks(ThreadPool *pool) :
pthread_id(pthread_self()), terminate(false), is_working(false), pool(pool), events(TaskQueue<event>()) {}
void operator () () {
while (true) {
// 为线程环境变量加锁, 互访问工作线程的休眠与唤醒
std::unique_lock<std::mutex> lock(pool->m_conditional_mutex);
if (events.size() <= 10 && !pool->m_tasks.empty()) {
size_t num = pool->m_tasks.size() / pool->m_threads.size();
num = num <= 1 ? 1 : num;
bool st = pool->m_tasks.sub_swap(events, num);
if (!st) continue;
}
if (!events.empty()) {
event fun = event(0, std::function<void()>());
bool st = events.eject(fun);
if (st)
fun();
} else {
pool->m_conditional_lock.wait(lock);
}
}
}
private:
pthread_t pthread_id; // 线程id
bool terminate; // 是否需要结束的标志
bool is_working; // 该事件是否在工作
ThreadPool* pool; // 所属线程池
TaskQueue<event> events;// 任务队列
};
ThreadPool::ThreadPool(const int n_threads) :
m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false) {
for (auto & m_thread : m_threads) {
m_thread = std::thread(thread_tasks(this)); // 分配工作线程
}
}

56
pool/ThreadPool.h Normal file
View File

@ -0,0 +1,56 @@
//
// Created by dongl on 23-4-12.
//
#ifndef THREAD_POOL_V2_THREADPOOL_H
#define THREAD_POOL_V2_THREADPOOL_H
#include "TaskQueue.h"
#include <thread>
#include <functional>
#include <future>
class ThreadPool {
protected:
class thread_tasks;
public:
explicit ThreadPool(int n_threads = 4);
template<class F, class... Args>
auto pushTask(F && f, Args &&... args) -> std::future<decltype(f(args...))>;
protected:
TaskQueue<std::function<void()>> m_tasks;
std::vector<std::thread> m_threads;
bool m_shutdown;
std::mutex m_conditional_mutex; // 线程休眠锁互斥变量
std::condition_variable m_conditional_lock; //条件变量
static int preemption_event_num;
};
template<class F, class... Args>
auto ThreadPool::pushTask(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
std::function<decltype(f(args...))()> function = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(function);
// Warp packaged task into void function
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
};
m_tasks.press(warpper_func);
// 唤醒一个等待中的线程
m_conditional_lock.notify_one();
// 返回先前注册的任务指针
return task_ptr->get_future();
}
#endif //THREAD_POOL_V2_THREADPOOL_H

24
pool/event.h Normal file
View File

@ -0,0 +1,24 @@
//
// Created by dongl on 23-4-12.
//
#ifndef THREAD_POOL_V2_EVENT_H
#define THREAD_POOL_V2_EVENT_H
struct event {
int event_id;
bool terminate; // 是否需要结束的标志
bool is_working; // 该事件是否在工作
pthread_t pthread_id; // 线程id
std::function<void()> function_;
event(int eventId, const std::function<void()>& function)
: event_id(eventId), terminate(false), is_working(false), pthread_id(pthread_self()),
function_(function) {}
void operator() () {
function_();
}
};
#endif //THREAD_POOL_V2_EVENT_H