refactor(ThreadPool): dynamically scale based on demand (#1912)

This commit is contained in:
Andreas Maerten 2023-08-03 12:56:05 +02:00 committed by GitHub
parent b7b13ac638
commit f61a89696e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 35 deletions

View File

@ -2,10 +2,11 @@
namespace big namespace big
{ {
thread_pool::thread_pool() : thread_pool::thread_pool(const std::size_t preallocated_thread_count) :
m_accept_jobs(true) m_accept_jobs(true),
m_allocated_thread_count(preallocated_thread_count)
{ {
this->m_managing_thread = std::thread(&thread_pool::create, this); rescale_thread_pool();
g_thread_pool = this; g_thread_pool = this;
} }
@ -15,27 +16,25 @@ namespace big
g_thread_pool = nullptr; g_thread_pool = nullptr;
} }
void thread_pool::create() void thread_pool::rescale_thread_pool()
{ {
const uint32_t thread_count = std::thread::hardware_concurrency(); LOG(VERBOSE) << "Resizing thread pool from " << m_thread_pool.size() << " to " << m_allocated_thread_count;
m_thread_pool.reserve(m_allocated_thread_count);
LOG(VERBOSE) << "Allocating " << thread_count << " threads in thread pool."; if (m_thread_pool.size() < m_allocated_thread_count)
this->m_thread_pool.reserve(thread_count); {
for (uint32_t i = 0; i < m_allocated_thread_count; i++)
m_available_thread_count = thread_count; m_thread_pool.emplace_back(std::thread(&thread_pool::run, this));
}
for (uint32_t i = 0; i < thread_count; i++)
this->m_thread_pool.emplace_back(std::thread(&thread_pool::run, this));
} }
void thread_pool::destroy() void thread_pool::destroy()
{ {
this->m_managing_thread.join();
{ {
std::unique_lock lock(m_lock); std::unique_lock lock(m_lock);
this->m_accept_jobs = false; m_accept_jobs = false;
} }
this->m_data_condition.notify_all(); m_data_condition.notify_all();
for (auto& thread : m_thread_pool) for (auto& thread : m_thread_pool)
thread.join(); thread.join();
@ -48,15 +47,24 @@ namespace big
if (func) if (func)
{ {
{ {
std::unique_lock lock(this->m_lock); std::unique_lock lock(m_lock);
this->m_job_stack.push({func, location}); m_job_stack.push({func, location});
if (m_available_thread_count < m_job_stack.size()) if (m_allocated_thread_count < m_job_stack.size())
{ {
LOG(WARNING) << "thread_pool potentially starved"; LOG(WARNING) << "Thread pool potentially starved, resizing to accommodate for load.";
if (m_allocated_thread_count++ >= MAX_POOL_SIZE)
{
LOG(FATAL) << "The thread pool limit has been reached, whatever you did this should not occur in production.";
}
if (m_accept_jobs && m_allocated_thread_count <= MAX_POOL_SIZE)
{
rescale_thread_pool();
}
} }
} }
this->m_data_condition.notify_all(); m_data_condition.notify_all();
} }
} }
@ -64,22 +72,22 @@ namespace big
{ {
for (;;) for (;;)
{ {
std::unique_lock lock(this->m_lock); std::unique_lock lock(m_lock);
this->m_data_condition.wait(lock, [this]() { m_data_condition.wait(lock, [this]() {
return !this->m_job_stack.empty() || !this->m_accept_jobs; return !m_job_stack.empty() || !m_accept_jobs;
}); });
if (!this->m_accept_jobs) if (!m_accept_jobs)
break; break;
if (this->m_job_stack.empty()) if (m_job_stack.empty())
continue; continue;
thread_pool_job job = this->m_job_stack.top(); thread_pool_job job = m_job_stack.top();
this->m_job_stack.pop(); m_job_stack.pop();
lock.unlock(); lock.unlock();
m_available_thread_count--; m_allocated_thread_count--;
try try
{ {
@ -94,7 +102,7 @@ namespace big
LOG(WARNING) << "Exception thrown while executing job in thread:" << std::endl << e.what(); LOG(WARNING) << "Exception thrown while executing job in thread:" << std::endl << e.what();
} }
m_available_thread_count++; m_allocated_thread_count++;
} }
LOG(VERBOSE) << "Thread " << std::this_thread::get_id() << " exiting..."; LOG(VERBOSE) << "Thread " << std::this_thread::get_id() << " exiting...";

View File

@ -2,6 +2,9 @@
namespace big namespace big
{ {
// if this limit is hit you did something wrong coding wise.
constexpr auto MAX_POOL_SIZE = 32u;
struct thread_pool_job struct thread_pool_job
{ {
std::function<void()> m_func; std::function<void()> m_func;
@ -17,21 +20,19 @@ namespace big
std::mutex m_lock; std::mutex m_lock;
std::vector<std::thread> m_thread_pool; std::vector<std::thread> m_thread_pool;
std::thread m_managing_thread; std::atomic<size_t> m_allocated_thread_count;
std::atomic<size_t> m_available_thread_count;
public: public:
thread_pool(); // YimMenu only has 2 blocking threads, 4 should be sufficient but the pool should automatically allocate more if needed
thread_pool(const std::size_t preallocated_thread_count = 4);
~thread_pool(); ~thread_pool();
void destroy(); void destroy();
void push(std::function<void()> func, std::source_location location = std::source_location::current()); void push(std::function<void()> func, std::source_location location = std::source_location::current());
private: private:
void create();
void done();
void run(); void run();
void rescale_thread_pool();
}; };
inline thread_pool* g_thread_pool{}; inline thread_pool* g_thread_pool{};