blob: 6f48bce5cd7afabed4b963c7261a43d00b35d6b0 [file] [log] [blame]
/*
* Copyright 2017 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef NB_CONCURRENT_PTR_H_
#define NB_CONCURRENT_PTR_H_
#include <algorithm>
#include <atomic>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>
#include "starboard/atomic.h"
#include "starboard/log.h"
#include "starboard/memory.h"
#include "starboard/mutex.h"
#include "starboard/types.h"
namespace nb {
namespace detail {
class AtomicPointerBase;
template <typename T>
class AtomicPointer;
template <typename T>
class Access;
} // namespace detail
// ConcurrentPtr is similar to a scoped_ptr<> but with additional thread safe
// guarantees on the lifespan of the owned pointer. Threads can get access
// to the owned pointer using class ConcurrentPtr<T>::Access, which will lock
// the lifetime of the pointer for the duration that ConcurrentPtr<T>::Access
// object will remain in scope. Operations on the object pointed to by the
// owned pointer has no additional thread safety guarantees.
//
//
// ConcurrentPtr<T>::access_ptr(...) will take in an arbitrary id value and
// will hash this id value to select one of the buckets. Good id's to pass in
// include input from SbThreadGetId() or random numbers (std::rand() is known
// to use internal locks on some implementations!).
//
// Performance:
// To increase performance, the lifespan of ConcurrentPtr<T>::Access instance
// should be as short as possible.
//
// The number of buckets has a large effect on performance. More buckets will
// result in lower contention.
//
// A hashing function with good distribution will produce less contention.
//
// Example:
// class MyClass { void Run(); };
// ConcurrentPtr<MyClass> shared_concurrent_ptr_(new MyClass);
//
// // From all other threads.
// ConcurrentPtr<MyClass>::Access access_ptr =
// shared_concurrent_ptr_.access_ptr(SbThreadGetId());
// // access_ptr now either holds a valid object pointer or either nullptr.
// if (access_ptr) {
// access_ptr->Run();
// }
template <typename T, typename KeyT = int64_t, typename HashT = std::hash<KeyT>>
class ConcurrentPtr {
public:
ConcurrentPtr(T* ptr, size_t number_locks = 31) : ptr_(NULL) {
internal_construct(ptr, number_locks);
}
~ConcurrentPtr() { internal_destruct(); }
// Used to access the underlying pointer to perform operations. The lifetime
// of the accessed pointer is guaranteed to be alive for the duration of the
// lifetime of this access object.
//
// Example
// ConcurrentPtr<MyClass>::Access access_ptr =
// shared_concurrent_ptr_.access_ptr(SbThreadGetId());
// if (access_ptr) {
// access_ptr->Run();
// }
using Access = nb::detail::Access<T>;
// Provides read access to the underlying pointer in a thread safe way.
inline Access access_ptr(const KeyT& seed) {
const size_t index = hasher_(seed) % table_.size();
AtomicPointer* atomic_ptr = table_[index];
return atomic_ptr->access_ptr();
}
void reset(T* value) { delete SetAllThenSwap(value); }
T* release() { return SetAllThenSwap(nullptr); }
T* swap(T* new_value) { return SetAllThenSwap(new_value); }
private:
using Mutex = starboard::Mutex;
using ScopedLock = starboard::ScopedLock;
using AtomicPointer = nb::detail::AtomicPointer<T>;
// Change all buckets to the new pointer. The old pointer is returned.
T* SetAllThenSwap(T* new_ptr) {
ScopedLock write_lock(pointer_write_mutex_);
for (auto it = table_.begin(); it != table_.end(); ++it) {
AtomicPointer* atomic_ptr = *it;
atomic_ptr->swap(new_ptr);
}
T* old_ptr = ptr_;
ptr_ = new_ptr;
return old_ptr;
}
void internal_construct(T* ptr, size_t number_locks) {
table_.resize(number_locks);
for (auto it = table_.begin(); it != table_.end(); ++it) {
*it = new AtomicPointer;
}
reset(ptr);
}
void internal_destruct() {
reset(nullptr);
for (auto it = table_.begin(); it != table_.end(); ++it) {
delete *it;
}
table_.clear();
}
HashT hasher_;
Mutex pointer_write_mutex_;
std::vector<AtomicPointer*> table_;
T* ptr_;
};
/////////////////////////// Implementation Details ////////////////////////////
namespace detail {
// Access is a smart pointer type which holds a lock to the underlying pointer
// returned by ConcurrentPtr and AtomicPointer.
template <typename T>
class Access {
public:
Access() = delete;
// It's assumed that ref_count is incremented before being passed
// to this constructor.
inline Access(T* ptr, starboard::atomic_int32_t* ref_count)
: ref_count_(ref_count), ptr_(ptr) {}
Access(const Access& other) = delete;
// Allow move construction.
Access(Access&& other) = default;
inline ~Access() { release(); }
inline operator bool() const { return valid(); }
inline operator T*() const { return get(); }
inline T* operator->() { return get(); }
inline T* get() const { return ptr_; }
inline bool valid() const { return !!get(); }
inline void release() { internal_release(); }
private:
inline void internal_release() {
if (ref_count_) {
ref_count_->decrement();
ref_count_ = nullptr;
}
ptr_ = nullptr;
}
starboard::atomic_int32_t* ref_count_;
T* ptr_;
};
// AtomicPointer allows read access to a pointer through access_ptr()
// and a form of atomic swap.
template <typename T>
class AtomicPointer {
public:
// Customer new/delete operators align AtomicPointers to cache
// lines for improved performance.
static void* operator new(std::size_t count) {
const int kCacheLineSize = 64;
return SbMemoryAllocateAligned(kCacheLineSize, count);
}
static void operator delete(void* ptr) { SbMemoryDeallocateAligned(ptr); }
AtomicPointer() : ptr_(nullptr), counter_(0) {}
~AtomicPointer() { delete get(); }
inline T* swap(T* new_ptr) {
AcquireWriteLock();
T* old_ptr = get();
ptr_.store(new_ptr);
WaitForReadersToDrain();
ReleaseWriteLock();
return old_ptr;
}
inline void reset(T* new_ptr) { delete swap(new_ptr); }
inline T* get() { return ptr_.load(); }
inline Access<T> access_ptr() {
counter_.increment();
return Access<T>(ptr_.load(), &counter_);
}
private:
inline void AcquireWriteLock() {
write_mutex_.Acquire();
counter_.increment();
}
inline void WaitForReadersToDrain() {
int32_t expected_value = 1;
while (!counter_.compare_exchange_weak(&expected_value, 0)) {
SbThreadYield();
expected_value = 1;
}
}
inline void ReleaseWriteLock() { write_mutex_.Release(); }
starboard::Mutex write_mutex_;
starboard::atomic_int32_t counter_;
starboard::atomic_pointer<T*> ptr_;
};
} // namespace detail
} // namespace nb
#endif // NB_CONCURRENT_PTR_H_