From f61a89696ecaf6b4d89d32c7c99424e653c92681 Mon Sep 17 00:00:00 2001 From: Andreas Maerten <24669514+Yimura@users.noreply.github.com> Date: Thu, 3 Aug 2023 12:56:05 +0200 Subject: [PATCH] refactor(ThreadPool): dynamically scale based on demand (#1912) --- src/thread_pool.cpp | 66 +++++++++++++++++++++++++-------------------- src/thread_pool.hpp | 13 ++++----- 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index b0681e13..347c7484 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -2,10 +2,11 @@ namespace big { - thread_pool::thread_pool() : - m_accept_jobs(true) + thread_pool::thread_pool(const std::size_t preallocated_thread_count) : + m_accept_jobs(true), + m_allocated_thread_count(preallocated_thread_count) { - this->m_managing_thread = std::thread(&thread_pool::create, this); + rescale_thread_pool(); g_thread_pool = this; } @@ -15,27 +16,25 @@ namespace big g_thread_pool = nullptr; } - void thread_pool::create() + void thread_pool::rescale_thread_pool() { - const uint32_t thread_count = std::thread::hardware_concurrency(); + LOG(VERBOSE) << "Resizing thread pool from " << m_thread_pool.size() << " to " << m_allocated_thread_count; + m_thread_pool.reserve(m_allocated_thread_count); - LOG(VERBOSE) << "Allocating " << thread_count << " threads in thread pool."; - this->m_thread_pool.reserve(thread_count); - - m_available_thread_count = thread_count; - - for (uint32_t i = 0; i < thread_count; i++) - this->m_thread_pool.emplace_back(std::thread(&thread_pool::run, this)); + if (m_thread_pool.size() < m_allocated_thread_count) + { + for (uint32_t i = 0; i < m_allocated_thread_count; i++) + m_thread_pool.emplace_back(std::thread(&thread_pool::run, this)); + } } void thread_pool::destroy() { - this->m_managing_thread.join(); { std::unique_lock lock(m_lock); - this->m_accept_jobs = false; + m_accept_jobs = false; } - this->m_data_condition.notify_all(); + m_data_condition.notify_all(); for (auto& thread : m_thread_pool) thread.join(); @@ -48,15 +47,24 @@ namespace big if (func) { { - std::unique_lock lock(this->m_lock); - this->m_job_stack.push({func, location}); + std::unique_lock lock(m_lock); + m_job_stack.push({func, location}); - if (m_available_thread_count < m_job_stack.size()) + if (m_allocated_thread_count < m_job_stack.size()) { - LOG(WARNING) << "thread_pool potentially starved"; + LOG(WARNING) << "Thread pool potentially starved, resizing to accommodate for load."; + + if (m_allocated_thread_count++ >= MAX_POOL_SIZE) + { + LOG(FATAL) << "The thread pool limit has been reached, whatever you did this should not occur in production."; + } + if (m_accept_jobs && m_allocated_thread_count <= MAX_POOL_SIZE) + { + rescale_thread_pool(); + } } } - this->m_data_condition.notify_all(); + m_data_condition.notify_all(); } } @@ -64,22 +72,22 @@ namespace big { for (;;) { - std::unique_lock lock(this->m_lock); + std::unique_lock lock(m_lock); - this->m_data_condition.wait(lock, [this]() { - return !this->m_job_stack.empty() || !this->m_accept_jobs; + m_data_condition.wait(lock, [this]() { + return !m_job_stack.empty() || !m_accept_jobs; }); - if (!this->m_accept_jobs) + if (!m_accept_jobs) break; - if (this->m_job_stack.empty()) + if (m_job_stack.empty()) continue; - thread_pool_job job = this->m_job_stack.top(); - this->m_job_stack.pop(); + thread_pool_job job = m_job_stack.top(); + m_job_stack.pop(); lock.unlock(); - m_available_thread_count--; + m_allocated_thread_count--; try { @@ -94,7 +102,7 @@ namespace big LOG(WARNING) << "Exception thrown while executing job in thread:" << std::endl << e.what(); } - m_available_thread_count++; + m_allocated_thread_count++; } LOG(VERBOSE) << "Thread " << std::this_thread::get_id() << " exiting..."; diff --git a/src/thread_pool.hpp b/src/thread_pool.hpp index 5cb1a8bc..2246a7d7 100644 --- a/src/thread_pool.hpp +++ b/src/thread_pool.hpp @@ -2,6 +2,9 @@ namespace big { + // if this limit is hit you did something wrong coding wise. + constexpr auto MAX_POOL_SIZE = 32u; + struct thread_pool_job { std::function m_func; @@ -17,21 +20,19 @@ namespace big std::mutex m_lock; std::vector m_thread_pool; - std::thread m_managing_thread; - - std::atomic m_available_thread_count; + std::atomic m_allocated_thread_count; public: - thread_pool(); + // YimMenu only has 2 blocking threads, 4 should be sufficient but the pool should automatically allocate more if needed + thread_pool(const std::size_t preallocated_thread_count = 4); ~thread_pool(); void destroy(); void push(std::function func, std::source_location location = std::source_location::current()); private: - void create(); - void done(); void run(); + void rescale_thread_pool(); }; inline thread_pool* g_thread_pool{};