blob: 0a96e9101bee70de3d891448b54ac4b50d09ada0 [file] [log] [blame]
//
// Copyright 2016 The ANGLE Project Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
// WorkerThread:
// Task running thread for ANGLE, similar to a TaskRunner in Chromium.
// Might be implemented differently depending on platform.
//
#include "libANGLE/WorkerThread.h"
#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
# include <condition_variable>
# include <future>
# include <mutex>
# include <queue>
# include <thread>
#endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
namespace angle
{
WaitableEvent::WaitableEvent() = default;
WaitableEvent::~WaitableEvent() = default;
void WaitableEventDone::wait() {}
bool WaitableEventDone::isReady()
{
return true;
}
WorkerThreadPool::WorkerThreadPool() = default;
WorkerThreadPool::~WorkerThreadPool() = default;
class SingleThreadedWaitableEvent final : public WaitableEvent
{
public:
SingleThreadedWaitableEvent() = default;
~SingleThreadedWaitableEvent() override = default;
void wait() override;
bool isReady() override;
};
void SingleThreadedWaitableEvent::wait() {}
bool SingleThreadedWaitableEvent::isReady()
{
return true;
}
class SingleThreadedWorkerPool final : public WorkerThreadPool
{
public:
std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
void setMaxThreads(size_t maxThreads) override;
bool isAsync() override;
};
// SingleThreadedWorkerPool implementation.
std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
std::shared_ptr<Closure> task)
{
(*task)();
return std::make_shared<SingleThreadedWaitableEvent>();
}
void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
bool SingleThreadedWorkerPool::isAsync()
{
return false;
}
#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
class AsyncWaitableEvent final : public WaitableEvent
{
public:
AsyncWaitableEvent() : mIsPending(true) {}
~AsyncWaitableEvent() override = default;
void wait() override;
bool isReady() override;
private:
friend class AsyncWorkerPool;
void setFuture(std::future<void> &&future);
// To block wait() when the task is stil in queue to be run.
// Also to protect the concurrent accesses from both main thread and
// background threads to the member fields.
std::mutex mMutex;
bool mIsPending;
std::condition_variable mCondition;
std::future<void> mFuture;
};
void AsyncWaitableEvent::setFuture(std::future<void> &&future)
{
mFuture = std::move(future);
}
void AsyncWaitableEvent::wait()
{
{
std::unique_lock<std::mutex> lock(mMutex);
mCondition.wait(lock, [this] { return !mIsPending; });
}
ASSERT(mFuture.valid());
mFuture.wait();
}
bool AsyncWaitableEvent::isReady()
{
std::lock_guard<std::mutex> lock(mMutex);
if (mIsPending)
{
return false;
}
ASSERT(mFuture.valid());
return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
class AsyncWorkerPool final : public WorkerThreadPool
{
public:
AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
~AsyncWorkerPool() override = default;
std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
void setMaxThreads(size_t maxThreads) override;
bool isAsync() override;
private:
void checkToRunPendingTasks();
// To protect the concurrent accesses from both main thread and background
// threads to the member fields.
std::mutex mMutex;
size_t mMaxThreads;
size_t mRunningThreads;
std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
};
// AsyncWorkerPool implementation.
std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
{
ASSERT(mMaxThreads > 0);
auto waitable = std::make_shared<AsyncWaitableEvent>();
{
std::lock_guard<std::mutex> lock(mMutex);
mTaskQueue.push(std::make_pair(waitable, task));
}
checkToRunPendingTasks();
return waitable;
}
void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
{
{
std::lock_guard<std::mutex> lock(mMutex);
mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
}
checkToRunPendingTasks();
}
bool AsyncWorkerPool::isAsync()
{
return true;
}
void AsyncWorkerPool::checkToRunPendingTasks()
{
std::lock_guard<std::mutex> lock(mMutex);
while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
{
auto task = mTaskQueue.front();
mTaskQueue.pop();
auto waitable = task.first;
auto closure = task.second;
auto future = std::async(std::launch::async, [closure, this] {
(*closure)();
{
std::lock_guard<std::mutex> lock(mMutex);
ASSERT(mRunningThreads != 0);
--mRunningThreads;
}
checkToRunPendingTasks();
});
++mRunningThreads;
{
std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
waitable->mIsPending = false;
waitable->setFuture(std::move(future));
}
waitable->mCondition.notify_all();
}
}
#endif // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
// static
std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
{
std::shared_ptr<WorkerThreadPool> pool(nullptr);
#if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
if (multithreaded)
{
pool = std::shared_ptr<WorkerThreadPool>(static_cast<WorkerThreadPool *>(
new AsyncWorkerPool(std::thread::hardware_concurrency())));
}
#endif
if (!pool)
{
return std::shared_ptr<WorkerThreadPool>(
static_cast<WorkerThreadPool *>(new SingleThreadedWorkerPool()));
}
return pool;
}
// static
std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask(
std::shared_ptr<WorkerThreadPool> pool,
std::shared_ptr<Closure> task)
{
std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
if (event.get())
{
event->setWorkerThreadPool(pool);
}
return event;
}
} // namespace angle