bthread 调度的机制

bthread 是一个用户态的 Fiber 库,它的入口在 incubator-brpc/src/bthread,它对外提供下面一组 API:

  • 以不同优先级和不同的调度参数创建 bthread 线程
  • 提供了 rwlock, barrier, mutex, condvar 之类的 bthread 同步原语
  • bthread local 相关的配置

我们以 bthread_start_background为例, 这里介绍一下 bthread 调度相关的机制。

TaskControl, TaskGroup 和 Queue

TaskControl 是一个全局唯一的中心控制器,用来做 bthread 相关的调度。TaskGroup 是物理工作线程的调度上下文。他们通过 Queue 来通信。

下面是一些全局共有的上下文:

1
2
3
4
5
6
7
8
9
10
BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match);

pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
// Referenced in rpc, needs to be extern.
// Notice that we can't declare the variable as atomic<TaskControl*> which
// are not constructed before main().
TaskControl* g_task_control = NULL;

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
extern void (*g_worker_startfn)();

我们晚点看看 TaskControl 的创建,这里直接看 bthread_start_background:

1
2
3
4
5
6
7
8
9
10
11
12
13
int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
// 拿到 TaskGroup, 这个对应的 thread 本身是 worker.
// 可能是 bthread 创建子线程.
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return g->start_background<false>(tid, attr, fn, arg);
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}

我们先从 start_from_non_worker 开始看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 从 non_worker 开始进行调度, 如果这里 attr 标记了 `no_signal`, 里面会走到 REMOTE = true.
BUTIL_FORCE_INLINE int
start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
// 拿到全局的 TaskControl.
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
}
if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
// Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
// 1. NOSIGNAL is often for creating many bthreads in batch,
// inserting into the same TaskGroup maximizes the batch.
// 2. bthread_flush() needs to know which TaskGroup to flush.
//
// 这里相当于记忆化, 如果是 no_signal 会倾向于丢到一个 TaskGroup.
TaskGroup* g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
return g->start_background<true>(tid, attr, fn, arg);
}
// 否则会选中一个 TaskGroup 发送.
return c->choose_one_group()->start_background<true>(
tid, attr, fn, arg);
}

那么我们再看看 choose_one_group 是什么逻辑,哦就是个 random 啊:

1
2
3
4
5
6
7
8
9
// 使用 fastrand, 随机选择一个 Group
TaskGroup* TaskControl::choose_one_group() {
const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
if (ngroup != 0) {
return _groups[butil::fast_rand_less_than(ngroup)];
}
CHECK(false) << "Impossible: ngroup is 0";
return NULL;
}

接下来看看 start_background:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 这里分成两部分, 一部分从对象池获取并初始化一个 TaskMeta, 然后 dispatch 给
// ready_to_run 系列的函数实际运行.
template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
if (__builtin_expect(!fn, 0)) {
return EINVAL;
}
const int64_t start_ns = butil::cpuwide_time_ns();
// 强制带一个 attr.
const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource(&slot);
if (__builtin_expect(!m, 0)) {
return ENOMEM;
}
// 初始化 bthread 的状态.
CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = fn;
m->arg = arg;
CHECK(m->stack == NULL);
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
}
_control->_nbthreads << 1;
if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
}
return 0;
}

反正没有构造函数还是蛮丑的。REMOTE 表示是否调用的函数自身也跑在这个 TaskGroup 下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 尝试 push 到 rq 中, 如果队列满了, 说明创建了太多任务, 会需要 sleep 一下.
inline void TaskGroup::push_rq(bthread_t tid) {
while (!_rq.push(tid)) {
// Created too many bthreads: a promising approach is to insert the
// task into another TaskGroup, but we don't use it because:
// * There're already many bthreads to run, inserting the bthread
// into other TaskGroup does not help.
// * Insertions into other TaskGroups perform worse when all workers
// are busy at creating bthreads (proved by test_input_messenger in
// brpc)
flush_nosignal_tasks();
LOG_EVERY_SECOND(ERROR) << "_rq is full, capacity=" << _rq.capacity();
// TODO(gejun): May cause deadlock when all workers are spinning here.
// A better solution is to pop and run existing bthreads, however which
// make set_remained()-callbacks do context switches and need extensive
// reviews on related code.
::usleep(1000);
}
}

void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
push_rq(tid);
if (nosignal) {
++_num_nosignal;
} else {
// signal 应该是 batch signal 的.
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
_control->signal_task(1 + additional_signal);
}
}

_rq 是一个 WorkStealingQueue<bthread_t>, 这个 queue 提供了 push popsteal, 特殊的, pushpop 不可能并行, 但是可能有一堆 steal 在和他们并行.

这里 signal_task 会根据任务的数量,来对 ParkingLot 调用对应的 signal, 这里会走到 futex 那一套上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void TaskControl::signal_task(int num_task) {
if (num_task <= 0) {
return;
}
// TODO(gejun): Current algorithm does not guarantee enough threads will
// be created to match caller's requests. But in another side, there's also
// many useless signalings according to current impl. Capping the concurrency
// is a good balance between performance and timeliness of scheduling.
if (num_task > 2) {
num_task = 2;
}
// 选中一个 ParkingLot.
// 这里要寻找 1-2 个 worker 来唤醒.
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
num_task -= _pl[start_index].signal(1);

if (num_task > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= _pl[start_index].signal(1);
}
}
// 还有任务(感觉概率很小), 可能需要动态调度 worker.
if (num_task > 0 &&
FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
// TODO: Reduce this lock
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
add_workers(1);
}
}
}

好,后台这块就丢进任务池子就没事了.

这里再注意一下 ready_to_run_remote:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
_remote_rq._mutex.lock();
while (!_remote_rq.push_locked(tid)) {
flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
<< _remote_rq.capacity();
::usleep(1000);
_remote_rq._mutex.lock();
}
if (nosignal) {
++_remote_num_nosignal;
_remote_rq._mutex.unlock();
} else {
const int additional_signal = _remote_num_nosignal;
_remote_num_nosignal = 0;
_remote_nsignaled += 1 + additional_signal;
_remote_rq._mutex.unlock();
_control->signal_task(1 + additional_signal);
}
}

这里会塞到 _remote_rq 中. 逻辑和之前 _rq 其实差不多。

TaskGroup 是怎么运行 bthread 的

还记得TaskControl::signal_task 调用了 add_workers 吗,这里实际上跟进去就是 TaskGroup 具体的逻辑了:

1
pthread_create(&_workers[i + old_concurency], NULL, worker_thread, this);

显然,我们要看 worker_thread 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void* TaskControl::worker_thread(void* arg) {
run_worker_startfn();
#ifdef BAIDU_INTERNAL
logging::ComlogInitializer comlog_initializer;
#endif

// 里面拿到了 task_control, 然后会创建一个 TaskGroup 挂在物理线程 TLS 下.
TaskControl* c = static_cast<TaskControl*>(arg);
TaskGroup* g = c->create_group();
TaskStatistics stat;
if (NULL == g) {
LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
return NULL;
}
BT_VLOG << "Created worker=" << pthread_self()
<< " bthread=" << g->main_tid();

// 绑定物理线程的任务
tls_task_group = g;

// nworkers 统计量增加
c->_nworkers << 1;
g->run_main_task();

// 销毁自身

stat = g->main_stat();
BT_VLOG << "Destroying worker=" << pthread_self() << " bthread="
<< g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
<< "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
tls_task_group = NULL;
g->destroy_self();
c->_nworkers << -1;
return NULL;
}

下面 TaskGroup::run_main_task 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 物理线程的主要循环.
void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;

// 拿到对应自身对应的 TaskGroup 对象, 这里不需要操作它
TaskGroup* dummy = this;
bthread_t tid;

// wait_task 会等待下一个任务, 如果返回值为 0, 就退出
while (wait_task(&tid)) {
// 运行完下一个任务.
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);

// sched_to 只是运行单个任务,
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
// 一些相关的配置, 记录 CPU-Time
if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
char name[32];
#if defined(OS_MACOSX)
snprintf(name, sizeof(name), "bthread_worker_usage_%" PRIu64,
pthread_numeric_id());
#else
snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
(long)syscall(SYS_gettid));
#endif
usage_bvar.reset(new bvar::PerSecond<bvar::PassiveStatus<double> >
(name, &cumulated_cputime, 1));
}
}
// Don't forget to add elapse of last wait_task.
current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}

这里有几个关键点:

  1. wait_task
  2. TaskGroup::sched_to
  3. TaskGroup::task_runner

第一个 wait_task 可以和之前的联动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
bool TaskGroup::wait_task(bthread_t* tid) {
do {
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
// 如果 stop 了, 就润走.
if (_last_pl_state.stopped()) {
return false;
}
// 等待有任务, 需要 signal 等方式来调用.
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
return true;
}
#else
const ParkingLot::State st = _pl->get_state();
if (st.stopped()) {
return false;
}
if (steal_task(tid)) {
return true;
}
_pl->wait(st);
#endif
} while (true);
}

// 从 remote_rq 中 steal 任务.
bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
}
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
_last_pl_state = _pl->get_state();
#endif
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}

TaskControl::steal_task 会分别从 _rq_remote_rq 拿:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}

// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}

sched_to

这是最重要的函数了,涉及了具体 Fiber 的运行、切栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
}
}
// Update now_ns only when wait_task did yield.
sched_to(pg, next_meta);
}

get_stack 会拿到一个栈,这里还是由对象池申请的定长对象。这里 task_runner 函数负责具体运行。

这里需要着重看一下 bthread_make_fcontext:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
struct StackStorage {
int stacksize;
int guardsize;
// Assume stack grows upwards.
// http://www.boost.org/doc/libs/1_55_0/libs/context/doc/html/context/stack.html
void* bottom;
unsigned valgrind_stack_id;

// Clears all members.
void zeroize() {
stacksize = 0;
guardsize = 0;
bottom = NULL;
valgrind_stack_id = 0;
}
};

// Allocate a piece of stack.
int allocate_stack_storage(StackStorage* s, int stacksize, int guardsize);
// Deallocate a piece of stack. Parameters MUST be returned or set by the
// corresponding allocate_stack_storage() otherwise behavior is undefined.
void deallocate_stack_storage(StackStorage* s);

enum StackType {
STACK_TYPE_MAIN = 0,
STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD,
STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
};

struct ContextualStack {
bthread_fcontext_t context;
StackType stacktype;
StackStorage storage;
};

template <typename StackClass> struct StackFactory {
struct Wrapper : public ContextualStack {
explicit Wrapper(void (*entry)(intptr_t)) {
if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
FLAGS_guard_page_size) != 0) {
storage.zeroize();
context = NULL;
return;
}
// 创建一个带有 entry 的 fcontext, 到时候会回退.
context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
stacktype = (StackType)StackClass::stacktype;
}
~Wrapper() {
if (context) {
context = NULL;
deallocate_stack_storage(&storage);
storage.zeroize();
}
}
};

static ContextualStack* get_stack(void (*entry)(intptr_t)) {
return butil::get_object<Wrapper>(entry);
}

static void return_stack(ContextualStack* sc) {
butil::return_object(static_cast<Wrapper*>(sc));
}
};

话说回来,之前写 xv6 lab 的时候踩过一个栈增长的坑,x86 栈是自上而下增长的,栈的 bottom 在一个高地址,所以 allocate_stack_storage 注意一下这个 bottommalloc() + STACK_SIZE.

bthread_make_fcontext 基本上是从 boost::Context 偷来的,我们这直接看看 boost 的 RISC-V 代码:https://github.com/boostorg/context/blob/develop/src/asm/make_riscv64_sysv_elf_gas.S

这里有两个要注意的地方:

  1. 只保存了 callee-saved 的寄存器,因为 caller saved 我们之后会看到
  2. ra 设置成了 _exit,这个是做一个兜底,如果 fiber 结尾不调度给别的 fiber 或者返回 main, 这里就会直接 exit.

我们和 task_runner 联动看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// 实际运行 task 在 task_runner 内, TaskMeta 任务执行的时候也会包装这么一层.
// 它有两个调用源: bthread 的栈里调用, TaskGroup 在 run_main_task 里调用.
//
// m->fn 的函数可能会调 起 bthread 的任务、bthread_usleep.
void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
TaskGroup* g = tls_task_group;

if (!skip_remained) {
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}

#ifndef NDEBUG
--g->_sched_recursive_guard;
#endif
}

do {
// A task can be stopped before it gets running, in which case
// we may skip user function, but that may confuse user:
// Most tasks have variables to remember running result of the task,
// which is often initialized to values indicating success. If an
// user function is never called, the variables will be unchanged
// however they'd better reflect failures because the task is stopped
// abnormally.

// Meta and identifier of the task is persistent in this run.
TaskMeta* const m = g->_cur_meta;

if (FLAGS_show_bthread_creation_in_vars) {
// NOTE: the thread triggering exposure of pending time may spend
// considerable time because a single bvar::LatencyRecorder
// contains many bvar.
g->_control->exposed_pending_time() <<
(butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L;
}

// Not catch exceptions except ExitException which is for implementing
// bthread_exit(). User code is intended to crash when an exception is
// not caught explicitly. This is consistent with other threading
// libraries.

// 目前这里应该都是 0, 嘻嘻.
void* thread_return;

// 运行 m->fn().
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}

// Group is probably changed
g = tls_task_group;

// TODO: Save thread_return
(void)thread_return;

// Logging must be done before returning the keytable, since the logging lib
// use bthread local storage internally, or will cause memory leak.
// FIXME: the time from quiting fn to here is not counted into cputime
if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Finished bthread " << m->tid << ", cputime="
<< m->stat.cputime_ns / 1000000.0 << "ms";
}

// Clean tls variables, must be done before changing version_butex
// otherwise another thread just joined this thread may not see side
// effects of destructing tls variables.
KeyTable* kt = tls_bls.keytable;
if (kt != NULL) {
return_keytable(m->attr.keytable_pool, kt);
// After deletion: tls may be set during deletion.
tls_bls.keytable = NULL;
m->local_storage.keytable = NULL; // optional
}

// Increase the version and wake up all joiners, if resulting version
// is 0, change it to 1 to make bthread_t never be 0. Any access
// or join to the bthread after changing version will be rejected.
// The spinlock is for visibility of TaskGroup::get_attr.
{
BAIDU_SCOPED_LOCK(m->version_lock);
if (0 == ++*m->version_butex) {
++*m->version_butex;
}
}
butex_wake_except(m->version_butex, 0);

g->_control->_nbthreads << -1;
// 这个设置一个回收掉 m 的 callback.
g->set_remained(TaskGroup::_release_last_context, m);
// task_group 完成一次调度.
ending_sched(&g);

} while (g->_cur_meta->tid != g->_main_tid);

// Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
// tasks to run, quit for more tasks.
}

这个函数是整个 bthread 调度的核中核,我们理一下:

  1. 拿到现有 TaskGroup ,即现有 worker
  2. 执行 _last_context_remained,清理掉 TaskGroup 上一次执行对应的资源
  3. 在循环中,拿到 bthread 栈对应的 meta,这是需要执行的对象
  4. 运行 m->fn(),这里是真正的逻辑
    1. 需要注意的是,m->fn() 运行的时候,可能发生上下文切换,比如 m->fn() 里面调用了 bthread_usleep.
  5. 清理上下文
  6. 调用 ending_sched

(3) 这就是用户给 bthread_start_background 任务具体被执行的 point 了!

sched_to 具体逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;
#ifndef NDEBUG
if ((++g->_sched_recursive_guard) > 1) {
LOG(FATAL) << "Recursively(" << g->_sched_recursive_guard - 1
<< ") call sched_to(" << g << ")";
}
#endif
// Save errno so that errno is bthread-specific.
const int saved_errno = errno;
void* saved_unique_user_ptr = tls_unique_user_ptr;

TaskMeta* const cur_meta = g->_cur_meta;
const int64_t now = butil::cpuwide_time_ns();
const int64_t elp_ns = now - g->_last_run_ns;
g->_last_run_ns = now;
cur_meta->stat.cputime_ns += elp_ns;
if (cur_meta->tid != g->main_tid()) {
g->_cumulated_cputime_ns += elp_ns;
}
++cur_meta->stat.nswitch;
++ g->_nswitch;
// Switch to the task
if (__builtin_expect(next_meta != cur_meta, 1)) {
// 这里把 _cur_meta 设置成要执行的 cur_meta.
g->_cur_meta = next_meta;
// Switch tls_bls
cur_meta->local_storage = tls_bls;
tls_bls = next_meta->local_storage;

// Logging must be done after switching the local storage, since the logging lib
// use bthread local storage internally, or will cause memory leak.
if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
(next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
<< next_meta->tid;
}

if (cur_meta->stack != NULL) {
if (next_meta->stack != cur_meta->stack) {
jump_stack(cur_meta->stack, next_meta->stack);
// probably went to another group, need to assign g again.
g = tls_task_group;
}
#ifndef NDEBUG
else {
// else pthread_task is switching to another pthread_task, sc
// can only equal when they're both _main_stack
CHECK(cur_meta->stack == g->_main_stack);
}
#endif
}
// else because of ending_sched(including pthread_task->pthread_task)
} else {
LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
}

while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}

// Restore errno
errno = saved_errno;
tls_unique_user_ptr = saved_unique_user_ptr;

#ifndef NDEBUG
--g->_sched_recursive_guard;
#endif
*pg = g;
}

sched_to 中,我们要先理解 jump_stack 才能理解 sched_to, jump_stack 也是 boost::Context 抄来的:https://github.com/boostorg/context/blob/develop/src/asm/jump_riscv64_sysv_elf_gas.S

  1. 保存所有 callee-saved registers
  2. 把 ra 设置成需要 jump 到的目标
  3. 恢复所有新的 called-saved registers

那么,我们假设有多个 bthread, TaskGroup 的 worker 调用了 sched_to, 然后系统会挑选第一个 bthread 来执行。假设执行完了,这里又回回到这个调度点。同时,TaskGroup::sched_to 这个函数在 jump_stack 前后可能被不同的线程执行!

知道这个,再看一些上下文代码就会清晰很多了。

ending_sched

bthread 调度的时候,实际上如果 task_runner 在 bthread 中调用,那么它不会退出,只会随着 ending_sched 切到别的上下文。ending_sched 在这里做了 bthread 的调度和回收工作。可能在 ending_sched 结束后,bthread 就运行一个新的栈了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 一个调度完成了, 尝试调度运行到下一组 task.
// 这里通过 TaskGroup::task_runner 来包装了一层 Task, 它可能又在 task_runner 被调用.
void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.

// BTHREAD_FAIR_WSQ 会有帮助吗?
#ifndef BTHREAD_FAIR_WSQ
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
// to 2.9%
const bool popped = g->_rq.pop(&next_tid);
#else
const bool popped = g->_rq.steal(&next_tid);
#endif

// 简单控制一下, 如果 rq 没有, remote 也没有, 才会设置成 main_tid.
// 这就要等待唤醒了.
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}

// 拿到现有的 meta, 可能之后要被回收了.
TaskMeta* const cur_meta = g->_cur_meta;
// 定下下一个需要跳转的栈
TaskMeta* next_meta = address_meta(next_tid);
// 需要创建栈.
if (next_meta->stack == NULL) {
if (next_meta->stack_type() == cur_meta->stack_type()) {
// also works with pthread_task scheduling to pthread_task, the
// transferred stack is just _main_stack.
next_meta->set_stack(cur_meta->release_stack());
} else {
// 创建 Stack,
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack(g->_main_stack);
}
}
}
sched_to(pg, next_meta);
}

有一个问题是,fcontextra_exit,这里不会出现碰到结尾直接退出的问题吗?答案是 ending_sched 的时候,直接把你这个栈给准备灭了,等下一次调度到别的地方的时候,直接把你这个整个栈换了。这样就不会走到 _exit 啦~