From 4bc225978756a8e0489b87337e78749f8554aa30 Mon Sep 17 00:00:00 2001 From: dongl <2725096176@qq.com> Date: Thu, 13 Apr 2023 13:04:52 +0800 Subject: [PATCH] veriosn 2 --- CMakeLists.txt | 8 +++++ main.cpp | 49 ++++++++++++++++++++++++++++ pool/TaskQueue.h | 79 +++++++++++++++++++++++++++++++++++++++++++++ pool/ThreadPool.cpp | 66 +++++++++++++++++++++++++++++++++++++ pool/ThreadPool.h | 56 ++++++++++++++++++++++++++++++++ pool/event.h | 24 ++++++++++++++ 6 files changed, 282 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 main.cpp create mode 100644 pool/TaskQueue.h create mode 100644 pool/ThreadPool.cpp create mode 100644 pool/ThreadPool.h create mode 100644 pool/event.h diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..bece42a --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 3.24) +project(thread_pool_v2) + +set(CMAKE_CXX_STANDARD 17) + +aux_source_directory(pool DIR_POOL) + +add_executable(thread_pool_v2 main.cpp ${DIR_POOL}) diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..7c12159 --- /dev/null +++ b/main.cpp @@ -0,0 +1,49 @@ +#include +#include +#include +#include "pool/ThreadPool.h" +std::random_device rd; // 真实随机数产生器 + +std::mt19937 mt(rd()); //生成计算随机数mt + +std::uniform_int_distribution dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数 + +auto rnd = std::bind(dist, mt); + +// 添加两个数字的简单函数并打印结果 + +void multiply(const int a, const int b) +{ + const int res = a * b; +} + +int main() { + ThreadPool* pool = new ThreadPool(10); + + time_t time1 = 0; + time_t time2 = 0; + + for (int i = 0; i < 1000000; ++i) { + time_t begin = time(NULL); + pool->pushTask(multiply, rnd(), rnd()); + time_t end = time(NULL); + time1 += end -begin; + } + + for (int i = 0; i < 1000000; ++i) { + int a = rnd(); + int b = rnd(); + time_t begin = time(NULL); + std::thread t = std::thread([&]() { + const int res = a * b; + }); + t.join(); + time_t end = time(NULL); + time2 += end - begin; + } + + std::cout << time1 << std::endl; + std::cout << time2 << std::endl; + + return 0; +} diff --git a/pool/TaskQueue.h b/pool/TaskQueue.h new file mode 100644 index 0000000..79381a7 --- /dev/null +++ b/pool/TaskQueue.h @@ -0,0 +1,79 @@ +// +// Created by dongl on 23-4-12. +// + +#ifndef THREAD_POOL_V2_TASKQUEUE_H +#define THREAD_POOL_V2_TASKQUEUE_H + +#include +#include +#include +#include "event.h" + +template +class TaskQueue { +public: + TaskQueue() {} + TaskQueue(TaskQueue&& taskQueue) {} + ~TaskQueue() {} + +public: + bool empty() { + std::unique_lock lock(m_mutex); + return m_queue.empty(); + } + + int size() { + std::unique_lock lock(m_mutex); + return m_queue.empty(); + } + + void press(T& t) { + std::unique_lock lock(m_mutex); + m_queue.emplace(t); + } + + bool eject(T& t) { + std::unique_lock lock(m_mutex); + + if (m_queue.empty()) return false; + t = event(std::move(m_queue.front())); + m_queue.pop(); + return true; + } + + bool ejects(std::queue& ts, size_t size) { + std::unique_lock lock(m_mutex); + + if (m_queue.empty()) return false; + + for (int i = 0; i < size; ++i) { + ts.emplace(std::move(event(index++, m_queue.front()))); + m_queue.pop(); + } + return true; + } + + bool sub_swap(TaskQueue& dest, size_t size) { + std::queue queue; + + bool st = this->ejects(queue, size); + if (!st) { + return st; + } else { + for (int i = 0; i < size; ++i) { + dest.press(queue.front()); + queue.pop(); + } + } + + return st; + } + +private: + std::queue m_queue; + std::mutex m_mutex; + size_t index = 0; +}; + +#endif //THREAD_POOL_V2_TASKQUEUE_H diff --git a/pool/ThreadPool.cpp b/pool/ThreadPool.cpp new file mode 100644 index 0000000..32efe18 --- /dev/null +++ b/pool/ThreadPool.cpp @@ -0,0 +1,66 @@ +// +// Created by dongl on 23-4-12. +// + +#include "ThreadPool.h" + +class event; + +class ThreadPool::thread_tasks { + friend ThreadPool; +public: + explicit thread_tasks(ThreadPool *pool) : + pthread_id(pthread_self()), terminate(false), is_working(false), pool(pool), events(TaskQueue()) {} + + void operator () () { + while (true) { + // 为线程环境变量加锁, 互访问工作线程的休眠与唤醒 + std::unique_lock lock(pool->m_conditional_mutex); + + if (events.size() <= 10 && !pool->m_tasks.empty()) { + size_t num = pool->m_tasks.size() / pool->m_threads.size(); + num = num <= 1 ? 1 : num; + + bool st = pool->m_tasks.sub_swap(events, num); + if (!st) continue; + } + + if (!events.empty()) { + event fun = event(0, std::function()); + bool st = events.eject(fun); + if (st) + fun(); + } else { + pool->m_conditional_lock.wait(lock); + } + } + } +private: + pthread_t pthread_id; // 线程id + bool terminate; // 是否需要结束的标志 + bool is_working; // 该事件是否在工作 + ThreadPool* pool; // 所属线程池 + + TaskQueue events;// 任务队列 +}; + + + +ThreadPool::ThreadPool(const int n_threads) : +m_threads(std::vector(n_threads)), m_shutdown(false) { + + for (auto & m_thread : m_threads) { + m_thread = std::thread(thread_tasks(this)); // 分配工作线程 + } +} + + + + + + + + + + + diff --git a/pool/ThreadPool.h b/pool/ThreadPool.h new file mode 100644 index 0000000..7a548c2 --- /dev/null +++ b/pool/ThreadPool.h @@ -0,0 +1,56 @@ +// +// Created by dongl on 23-4-12. +// + +#ifndef THREAD_POOL_V2_THREADPOOL_H +#define THREAD_POOL_V2_THREADPOOL_H + + +#include "TaskQueue.h" +#include +#include +#include + +class ThreadPool { +protected: + class thread_tasks; +public: + explicit ThreadPool(int n_threads = 4); + + template + auto pushTask(F && f, Args &&... args) -> std::future; + +protected: + TaskQueue> m_tasks; + std::vector m_threads; + bool m_shutdown; + std::mutex m_conditional_mutex; // 线程休眠锁互斥变量 + std::condition_variable m_conditional_lock; //条件变量 + + static int preemption_event_num; +}; + +template +auto ThreadPool::pushTask(F&& f, Args&&... args) -> std::future { + std::function function = std::bind(std::forward(f), std::forward(args)...); + auto task_ptr = std::make_shared>(function); + + // Warp packaged task into void function + std::function warpper_func = [task_ptr]() + { + (*task_ptr)(); + }; + + m_tasks.press(warpper_func); + + // 唤醒一个等待中的线程 + m_conditional_lock.notify_one(); + // 返回先前注册的任务指针 + return task_ptr->get_future(); +} + + + + + +#endif //THREAD_POOL_V2_THREADPOOL_H diff --git a/pool/event.h b/pool/event.h new file mode 100644 index 0000000..5e99a56 --- /dev/null +++ b/pool/event.h @@ -0,0 +1,24 @@ +// +// Created by dongl on 23-4-12. +// + +#ifndef THREAD_POOL_V2_EVENT_H +#define THREAD_POOL_V2_EVENT_H + +struct event { + int event_id; + bool terminate; // 是否需要结束的标志 + bool is_working; // 该事件是否在工作 + pthread_t pthread_id; // 线程id + std::function function_; + + event(int eventId, const std::function& function) + : event_id(eventId), terminate(false), is_working(false), pthread_id(pthread_self()), + function_(function) {} + + void operator() () { + function_(); + } +}; + +#endif //THREAD_POOL_V2_EVENT_H