Notes: brpc DoublyBufferedData

DoublyBufferedData 逻辑上是一个这样的数据结构:

  1. 持有两份数据,读同时读一份,写的话写另一份. 它自己分为
  2. 并发读几乎是没有什么开销的,写/更新会有开销。

看代码注释吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// This data structure makes Read() almost lock-free by making Modify()
// *much* slower. It's very suitable for implementing LoadBalancers which
// have a lot of concurrent read-only ops from many threads and occasional
// modifications of data. As a side effect, this data structure can store
// a thread-local data for user.
//
// Read(): begin with a thread-local mutex locked then read the foreground
// instance which will not be changed before the mutex is unlocked. Since the
// mutex is only locked by Modify() with an empty critical section, the
// function is almost lock-free.
//
// Modify(): Modify background instance which is not used by any Read(), flip
// foreground and background, lock thread-local mutexes one by one to make
// sure all existing Read() finish and later Read() see new foreground,
// then modify background(foreground before flip) again.

同时,需要注意的是,这个 class 实现的时候利用了 TLS,所以使用的时候实际上可以自定义 TLS。

先看这个 class 对外提供的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template <typename T, typename TLS = Void>
class DoublyBufferedData {
public:
class ScopedPtr {
friend class DoublyBufferedData;
public:
ScopedPtr() : _data(NULL), _w(NULL) {}
~ScopedPtr() {
if (_w) {
_w->EndRead();
}
}
const T* get() const { return _data; }
const T& operator*() const { return *_data; }
const T* operator->() const { return _data; }
TLS& tls() { return _w->user_tls(); }

private:
DISALLOW_COPY_AND_ASSIGN(ScopedPtr);
const T* _data;
Wrapper* _w;
};

DoublyBufferedData();
~DoublyBufferedData();

// Put foreground instance into ptr. The instance will not be changed until
// ptr is destructed.
// This function is not blocked by Read() and Modify() in other threads.
// Returns 0 on success, -1 otherwise.
int Read(ScopedPtr* ptr);

// Modify background and foreground instances. fn(T&, ...) will be called
// twice. Modify() from different threads are exclusive from each other.
// NOTE: Call same series of fn to different equivalent instances should
// result in equivalent instances, otherwise foreground and background
// instance will be inconsistent.
template <typename Fn> size_t Modify(Fn& fn);
template <typename Fn, typename Arg1> size_t Modify(Fn& fn, const Arg1&);
template <typename Fn, typename Arg1, typename Arg2>
size_t Modify(Fn& fn, const Arg1&, const Arg2&);

// fn(T& background, const T& foreground, ...) will be called to background
// and foreground instances respectively.
template <typename Fn> size_t ModifyWithForeground(Fn& fn);
template <typename Fn, typename Arg1>
size_t ModifyWithForeground(Fn& fn, const Arg1&);
template <typename Fn, typename Arg1, typename Arg2>
size_t ModifyWithForeground(Fn& fn, const Arg1&, const Arg2&);
}

其中 Fn 是更新函数,这个我们暂且不表。可以看到,这里提供了 ReadModify

需要注意的是,Read 是侵入式的,需要一个 ScopedPtr 来管理读相关的 Ownership。ScopedPtr 同时支持 tls, 这是 thread local 相关的自定义语义。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
template <typename T, typename TLS = Void>
class DoublyBufferedData {
class Wrapper;
public:
class ScopedPtr {
friend class DoublyBufferedData;
public:
ScopedPtr() : _data(NULL), _w(NULL) {}
~ScopedPtr() {
if (_w) {
_w->EndRead();
}
}
const T* get() const { return _data; }
const T& operator*() const { return *_data; }
const T* operator->() const { return _data; }
TLS& tls() { return _w->user_tls(); }

private:
DISALLOW_COPY_AND_ASSIGN(ScopedPtr);
const T* _data;
Wrapper* _w;
};
private:
const T* UnsafeRead() const
{ return _data + _index.load(butil::memory_order_acquire); }
Wrapper* AddWrapper();
void RemoveWrapper(Wrapper*);

// Foreground and background void.
T _data[2];

// Index of foreground instance.
butil::atomic<int> _index;

// Key to access thread-local wrappers.
bool _created_key;
pthread_key_t _wrapper_key;

// All thread-local instances.
std::vector<Wrapper*> _wrappers;

// Sequence access to _wrappers.
pthread_mutex_t _wrappers_mutex;

// Sequence modifications.
pthread_mutex_t _modify_mutex;
};

关于 pthread_key_t 和相关的,可以看看 Linux 系统编程手册 31节。它包含一些简单的生命周期管理的策略,线程会自动 destruct 数据。

E38A4C61-DEEF-4C87-ADDF-9961485FC7BB

然后可以注意到这个 wrapper 类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
template <typename T, typename TLS>
class DoublyBufferedDataWrapperBase {
public:
TLS& user_tls() { return _user_tls; }
protected:
TLS _user_tls;
};

template <typename T>
class DoublyBufferedDataWrapperBase<T, Void> {
};


// 单个线程从 Wrapper 里面读是线程安全的。
template <typename T, typename TLS>
class DoublyBufferedData<T, TLS>::Wrapper
: public DoublyBufferedDataWrapperBase<T, TLS> {
friend class DoublyBufferedData;
public:
explicit Wrapper(DoublyBufferedData* c) : _control(c) {
pthread_mutex_init(&_mutex, NULL);
}

~Wrapper() {
if (_control != NULL) {
_control->RemoveWrapper(this);
}
pthread_mutex_destroy(&_mutex);
}

// _mutex will be locked by the calling pthread and DoublyBufferedData.
// Most of the time, no modifications are done, so the mutex is
// uncontended and fast.
inline void BeginRead() {
pthread_mutex_lock(&_mutex);
}

inline void EndRead() {
pthread_mutex_unlock(&_mutex);
}

inline void WaitReadDone() {
BAIDU_SCOPED_LOCK(_mutex);
}

private:
DoublyBufferedData* _control;
pthread_mutex_t _mutex;
};

这个 wrapper 是一个 TLS 相关的结构,它本身是会被存放在 DoublyBufferedData 的 TLS 相关内容中。而且它只有读相关的权限。他在内部有一个 _control, 控制的是具体读取 DoublyBufferedData 的数据。不过话说回来,BeginRead EndRead 甚至析构函数都比较好理解(RemoveWrapper 一会儿讲),但是这个 WaitReadDone() 让人有些费解: 我们先看看它代码吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// NOTE(gejun): c++11 deduces additional reference to the type.
namespace butil {
namespace detail {
template <typename T>
std::lock_guard<typename std::remove_reference<T>::type> get_lock_guard();
} // namespace detail
} // namespace butil

#define BAIDU_SCOPED_LOCK(ref_of_lock) \
decltype(::butil::detail::get_lock_guard<decltype(ref_of_lock)>()) \
BAIDU_CONCAT(scoped_locker_dummy_at_line_, __LINE__)(ref_of_lock)
#endif

template<> class lock_guard<pthread_mutex_t> {
public:
explicit lock_guard(pthread_mutex_t & mutex) : _pmutex(&mutex) {
#if !defined(NDEBUG)
const int rc = pthread_mutex_lock(_pmutex);
if (rc) {
LOG(FATAL) << "Fail to lock pthread_mutex_t=" << _pmutex << ", " << berror(rc);
_pmutex = NULL;
}
#else
pthread_mutex_lock(_pmutex);
#endif // NDEBUG
}

~lock_guard() {
#ifndef NDEBUG
if (_pmutex) {
pthread_mutex_unlock(_pmutex);
}
#else
pthread_mutex_unlock(_pmutex);
#endif
}

private:
DISALLOW_COPY_AND_ASSIGN(lock_guard);
pthread_mutex_t* _pmutex;
};

这里它自己实现了一套 lock_guard, 然后局部 lock 了…等等,这个就是一个 scoped_lock, 那它怎么样 fence 呢?我们得看看 ReadModify 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template <typename T, typename TLS>
int DoublyBufferedData<T, TLS>::Read(
typename DoublyBufferedData<T, TLS>::ScopedPtr* ptr) {
if (BAIDU_UNLIKELY(!_created_key)) {
return -1;
}
Wrapper* w = static_cast<Wrapper*>(pthread_getspecific(_wrapper_key));
if (BAIDU_LIKELY(w != NULL)) {
w->BeginRead();
ptr->_data = UnsafeRead();
ptr->_w = w;
return 0;
}
w = AddWrapper();
if (BAIDU_LIKELY(w != NULL)) {
const int rc = pthread_setspecific(_wrapper_key, w);
if (rc == 0) {
w->BeginRead();
ptr->_data = UnsafeRead();
ptr->_w = w;
return 0;
}
}
return -1;
}
  • 获得 TLS 的 Wrapper, 否则尝试添加一个对应的 Wrapper
  • 读的时候,开启 w->BeginRead ,然后构造一个 ScopedPtr 对象,它在外部析构的时候会调用 EndRead

UnsafeRead 逻辑如下:

1
2
const T* UnsafeRead() const
{ return _data + _index.load(butil::memory_order_acquire); }

注意这里是 acquire,我们下文会提到。

AddWrapper 的逻辑也很清晰,它的逻辑受到 _wrappers_mutex 保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Called when thread initializes thread-local wrapper.
template <typename T, typename TLS>
typename DoublyBufferedData<T, TLS>::Wrapper*
DoublyBufferedData<T, TLS>::AddWrapper() {
std::unique_ptr<Wrapper> w(new (std::nothrow) Wrapper(this));
if (NULL == w) {
return NULL;
}
try {
BAIDU_SCOPED_LOCK(_wrappers_mutex);
_wrappers.push_back(w.get());
} catch (std::exception& e) {
return NULL;
}
return w.release();
}

// Called when thread quits.
template <typename T, typename TLS>
void DoublyBufferedData<T, TLS>::RemoveWrapper(
typename DoublyBufferedData<T, TLS>::Wrapper* w) {
if (NULL == w) {
return;
}
BAIDU_SCOPED_LOCK(_wrappers_mutex);
for (size_t i = 0; i < _wrappers.size(); ++i) {
if (_wrappers[i] == w) {
_wrappers[i] = _wrappers.back();
_wrappers.pop_back();
return;
}
}
}

目前都没什么问题,但是我们得看看写是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template <typename T, typename TLS>
template <typename Fn>
size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
// _modify_mutex sequences modifications. Using a separate mutex rather
// than _wrappers_mutex is to avoid blocking threads calling
// AddWrapper() or RemoveWrapper() too long. Most of the time, modifications
// are done by one thread, contention should be negligible.
BAIDU_SCOPED_LOCK(_modify_mutex);
int bg_index = !_index.load(butil::memory_order_relaxed);
// background instance is not accessed by other threads, being safe to
// modify.
const size_t ret = fn(_data[bg_index]);
if (!ret) {
return 0;
}

// Publish, flip background and foreground.
// The release fence matches with the acquire fence in UnsafeRead() to
// make readers which just begin to read the new foreground instance see
// all changes made in fn.
_index.store(bg_index, butil::memory_order_release);
bg_index = !bg_index;

// Wait until all threads finishes current reading. When they begin next
// read, they should see updated _index.
{
BAIDU_SCOPED_LOCK(_wrappers_mutex);
for (size_t i = 0; i < _wrappers.size(); ++i) {
_wrappers[i]->WaitReadDone();
}
}

const size_t ret2 = fn(_data[bg_index]);
CHECK_EQ(ret2, ret) << "index=" << _index.load(butil::memory_order_relaxed);
return ret2;
}
  1. 写会 grab _modify_mutex

  2. 拿到需要修改的 index, 因为 _index 只有 0 1

  3. fn(_data[bg_index]) 修改。这个时候

    1. 只有一个 modify,这是 _modify_mutex 保证的
    2. 读不会读到 _data[bg_index] .这个保证得整体逻辑一起看
  4. 写入 _index, 这个时候读之前改过的是安全的

    1. Q: 为什么需要 release ?
    2. A: seq_cst 肯定是可以的,如果是 relaxed 的话,没有 fence 的话,UnsafeRead 会读到混写的数据。这里期望写不阻塞读。
  5. BAIDU_SCOPED_LOCK 来 grab _wrappers_mutex, 这里保证不会 AddWrappers 或者 RemoveWrappers, 每个分配出去的 Wrappers 出去下面两种状态之一:

    1. 分配出去了,持有了自己的 mutex_,再进行 UnsafeRead
    2. 么有分配出去,也没有 UnsafeRead

    所以上 _wrappers_mutex 后再 WaitReadDone 之后,状态是安全的.

  6. 修改原来的数据,返回。

上述感觉 5 里面有一些隐含的风险,假设一个线程拿了两个 ScopedPtr, 然后 Block 在第二个上,第一个死都不释放,那么程序就…感觉这个是不是其实改成可重入锁安全一些?

感想

读了一遍,终于知道这个玩意怎么 work 了。但是我感觉我设计不出这么巧妙的东西,有什么参考材料吗,囧…