让我们先来看看系统提供了些什么: posix系统能以signal方式告知timer触发,不过signal逼迫我们使用全局变量,写async-signal-safe的函数,在面向用户的编程框架中,我们应当尽力避免使用signal。linux自2.6.27后能以fd方式通知timer触发,这个fd可以放到epoll中和传输数据的fd统一管理。唯一问题是:这是个系统调用,且我们不清楚它在多线程下的表现。


  • 在发起RPC过程中设定一个timer,在超时时间后取消还在等待中的RPC。几乎所有的RPC调用都有超时限制,都会设置这个timer。
  • RPC结束前删除timer。大部分RPC都由正常返回的response导致结束,timer很少触发。


  • 在单线程框架中,比如以libevent, libev为代表的eventloop类库,或以GNU Pth, StateThreads为代表的coroutine / fiber类库中,一般是以小顶堆记录触发时间。epoll_wait前以堆顶的时间计算出参数timeout的值,如果在该时间内没有其他事件,epoll_wait也会醒来,从堆中弹出已超时的元素,调用相应的回调函数。整个框架周而复始地这么运转,timer的建立,等待,删除都发生在一个线程中。只要所有的回调都是非阻塞的,且逻辑不复杂,这套机制就能提供基本准确的timer。不过就像Threading Overview中说的那样,这不是RPC的场景。

  • 在多线程框架中,任何线程都可能被用户逻辑阻塞较长的时间,我们需要独立的线程实现timer,这种线程我们叫它TimerThread。一个非常自然的做法,就是使用用锁保护的小顶堆。当一个线程需要创建timer时,它先获得锁,然后把对应的时间插入堆,如果插入的元素成为了最早的,唤醒TimerThread。TimerThread中的逻辑和单线程类似,就是等着堆顶的元素超时,如果在等待过程中有更早的时间插入了,自己会被插入线程唤醒,而不会睡过头。这个方法的问题在于每个timer都需要竞争一把全局锁,操作一个全局小顶堆,就像在其他文章中反复谈到的那样,这会触发cache bouncing。同样数量的timer操作比单线程下的慢10倍是非常正常的,尴尬的是这些timer基本不触发。


一个惯例思路是把timer的需求散列到多个TimerThread,但这对TimerThread效果不好。注意我们上面提及到了那个“制约因素”:一旦插入的元素是最早的,要唤醒TimerThread。假设TimerThread足够多,以至于每个timer都散列到独立的TImerThread,那么每次它都要唤醒那个TimerThread。 “唤醒”意味着触发linux的调度函数,触发上下文切换。在非常流畅的系统中,这个开销大约是3-5微秒,这可比抢锁和同步cache还慢。这个因素是提高TimerThread扩展性的一个难点。多个TimerThread减少了对单个小顶堆的竞争压力,但同时也引入了更多唤醒。


第三个难点是TimerThread不应该经常醒。一个极端是TimerThread永远醒着或以较高频率醒过来(比如每1ms醒一次),这样插入timer的线程就不用负责唤醒了,然后我们把插入请求散列到多个堆降低竞争,问题看似解决了。但事实上这个方案提供的timer精度较差,一般高于2ms。你得想这个TimerThread怎么写逻辑,它是没法按堆顶元素的时间等待的,由于插入线程不唤醒,一旦有更早的元素插入,TimerThread就会睡过头。它唯一能做的是睡眠固定的时间,但这和现代OS scheduler的假设冲突:频繁sleep的线程的优先级最低。在linux下的结果就是,即使只sleep很短的时间,最终醒过来也可能超过2ms,因为在OS看来,这个线程不重要。一个高精度的TimerThread有唤醒机制,而不是定期醒。

另外,更并发的数据结构也难以奏效,感兴趣的同学可以去搜索”concurrent priority queue”或”concurrent skip list”,这些数据结构一般假设插入的数值较为散开,所以可以同时修改结构内的不同部分。但这在RPC场景中也不成立,相互竞争的线程设定的时间往往聚集在同一个区域,因为程序的超时大都是一个值,加上当前时间后都差不多。



  • 一个TimerThread而不是多个。
  • 创建的timer散列到多个Bucket以降低线程间的竞争,默认12个Bucket。
  • Bucket内不使用小顶堆管理时间,而是链表 + nearest_run_time字段,当插入的时间早于nearest_run_time时覆盖这个字段,之后去和全局nearest_run_time(和Bucket的nearest_run_time不同)比较,如果也早于这个时间,修改并唤醒TimerThread。链表节点在锁外使用ResourcePool分配。
  • 删除时通过id直接定位到timer内存结构,修改一个标志,timer结构总是由TimerThread释放。
  • TimerThread被唤醒后首先把全局nearest_run_time设置为几乎无限大(max of int64),然后取出所有Bucket内的链表,并把Bucket的nearest_run_time设置为几乎无限大(max of int64)。TimerThread把未删除的timer插入小顶堆中维护,这个堆就它一个线程用。在每次运行回调或准备睡眠前都会检查全局nearest_run_time, 如果全局更早,说明有更早的时间加入了,重复这个过程。


struct TimerThreadOptions {
    // Scheduling requests are hashed into different bucket to improve
    // scalability. However bigger num_buckets may NOT result in more scalable
    // schedule() because bigger values also make each buckets more sparse
    // and more likely to lock the global mutex. You better not change
    // this value, just leave it to us.
    // Default: 12
    size_t num_buckets;

    // If this field is not empty, some bvar for reporting stats of TimerThread
    // will be exposed with this prefix.
    // Default: ""
    std::string bvar_prefix;

    // Contruct with default options.

// TimerThread is a separate thread to run scheduled tasks at specific time.
// At most one task runs at any time, don't put time-consuming code in the
// callback otherwise the task may delay other tasks significantly.
class TimerThread {
    struct Task;
    class Bucket;

    typedef uint64_t TaskId;
    const static TaskId INVALID_TASK_ID;


    // Start the timer thread.
    // This method should only be called once.
    // return 0 if success, errno otherwise.
    int start(const TimerThreadOptions* options);

    // Stop the timer thread. Later schedule() will return INVALID_TASK_ID.
    void stop_and_join();

    // Schedule |fn(arg)| to run at realtime |abstime| approximately.
    // Returns: identifier of the scheduled task, INVALID_TASK_ID on error.
    TaskId schedule(void (*fn)(void*), void* arg, const timespec& abstime);

    // Prevent the task denoted by `task_id' from running. `task_id' must be
    // returned by schedule() ever.
    // Returns:
    //   0   -  Removed the task which does not run yet
    //  -1   -  The task does not exist.
    //   1   -  The task is just running.
    int unschedule(TaskId task_id);

    // Get identifier of internal pthread.
    // Returns (pthread_t)0 if start() is not called yet.
    pthread_t thread_id() const { return _thread; }

    // the timer thread will run this method.
    void run();
    static void* run_this(void* arg);

    bool _started;            // whether the timer thread was started successfully.
    base::atomic<bool> _stop;

    TimerThreadOptions _options;
    Bucket* _buckets;        // list of tasks to be run
    base::Mutex _mutex;      // protect _nearest_run_time

    int64_t _nearest_run_time;
    // the futex for wake up timer thread. can't use _nearest_run_time because
    // it's 64-bit.
    int _nsignals;
    pthread_t _thread;       // all scheduled task will be run on this thread

// Get the global TimerThread which never quits.
TimerThread* get_or_create_global_timer_thread();
TimerThread* get_global_timer_thread();
// Defined in task_control.cpp
void run_worker_startfn();

const TimerThread::TaskId TimerThread::INVALID_TASK_ID = 0;

    : num_buckets(12) {

// A task contains the necessary information for running fn(arg).
// Tasks are created in Bucket::schedule and destroyed in TimerThread::run
struct BAIDU_CACHELINE_ALIGNMENT TimerThread::Task {
    Task* next;                 // For linking tasks in a Bucket.
    int64_t run_time;           // run the task at this realtime
    void (*fn)(void*);          // the fn(arg) to run
    void* arg;
    // Current TaskId, checked against version in TimerThread::run to test
    // if this task is unscheduled.
    TaskId task_id;
    // initial_version:     not run yet
    // initial_version + 1: running
    // initial_version + 2: removed (also the version of next Task reused
    //                      this struct)
    base::atomic<uint32_t> version;

    Task() : version(2/*skip 0*/) {}

    // Run this task and delete this struct.
    // Returns true if fn(arg) did run.
    bool run_and_delete();

    // Delete this struct if this task was unscheduled.
    // Returns true on deletion.
    bool try_delete();

// Timer tasks are sharded into different Buckets to reduce contentions.
class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket {
        : _nearest_run_time(std::numeric_limits<int64_t>::max())
        , _task_head(NULL) {}

    ~Bucket() {}

    struct ScheduleResult {
        TimerThread::TaskId task_id;
        bool earlier;

    // Schedule a task into this bucket.
    // Returns the TaskId and if it has the nearest run time.
    ScheduleResult schedule(
        void (*fn)(void*), void* arg, const timespec& abstime);

    // Pull all scheduled tasks.
    Task* consume_tasks();

    base::Mutex _mutex;
    int64_t _nearest_run_time;
    Task* _task_head;

// Utilies for making and extracting TaskId.
inline TimerThread::TaskId make_task_id(
    base::ResourceId<TimerThread::Task> slot, uint32_t version) {
    return TimerThread::TaskId((((uint64_t)version) << 32) | slot.value);

base::ResourceId<TimerThread::Task> slot_of_task_id(TimerThread::TaskId id) {
    base::ResourceId<TimerThread::Task> slot = { (id & 0xFFFFFFFFul) };
    return slot;

inline uint32_t version_of_task_id(TimerThread::TaskId id) {
    return (uint32_t)(id >> 32);

inline bool task_greater(const TimerThread::Task* a, const TimerThread::Task* b) {
    return a->run_time > b->run_time;

void* TimerThread::run_this(void* arg) {
    return NULL;

    : _started(false)
    , _stop(false)
    , _buckets(NULL)
    , _nearest_run_time(std::numeric_limits<int64_t>::max())
    , _nsignals(0)
    , _thread(0) {

TimerThread::~TimerThread() {
    delete [] _buckets;
    _buckets = NULL;

int TimerThread::start(const TimerThreadOptions* options_in) {
    if (_started) {
        return 0;
    if (options_in) {
        _options = *options_in;
    if (_options.num_buckets == 0) {
        LOG(ERROR) << "num_buckets can't be 0";
        return EINVAL;
    if (_options.num_buckets > 1024) {
        LOG(ERROR) << "num_buckets=" << _options.num_buckets << " is too big";
        return EINVAL;
    _buckets = new (std::nothrow) Bucket[_options.num_buckets];
    if (NULL == _buckets) {
        LOG(ERROR) << "Fail to new _buckets";
        return ENOMEM;
    const int ret = pthread_create(&_thread, NULL, TimerThread::run_this, this);
    if (ret) {
        return ret;
    _started = true;
    return 0;

TimerThread::Task* TimerThread::Bucket::consume_tasks() {
    Task* head = NULL;
    head = _task_head;
    _task_head = NULL;
    _nearest_run_time = std::numeric_limits<int64_t>::max();
    return head;

TimerThread::Bucket::ScheduleResult TimerThread::Bucket::schedule(
    void (*fn)(void*), void* arg, const timespec& abstime) {
    base::ResourceId<Task> slot_id;
    Task* task = base::get_resource<Task>(&slot_id);
    if (task == NULL) {
        ScheduleResult result = { INVALID_TASK_ID, false };
        return result;
    task->next = NULL;
    task->fn = fn;
    task->arg = arg;
    task->run_time = base::timespec_to_microseconds(abstime);
    uint32_t version = task->version.load(base::memory_order_relaxed);
    if (version == 0) {  // skip 0.
        task->version.fetch_add(2, base::memory_order_relaxed);
        version = 2;
    const TaskId id = make_task_id(slot_id, version);
    task->task_id = id;
    bool earlier = false;
        task->next = _task_head;
        _task_head = task;
        if (task->run_time < _nearest_run_time) {
            _nearest_run_time = task->run_time;
            earlier = true;
    ScheduleResult result = { id, earlier };
    return result;

inline uint64_t fmix64(uint64_t k) {
    k ^= k >> 33;
    k *= 0xff51afd7ed558ccdLLU;
    k ^= k >> 33;
    k *= 0xc4ceb9fe1a85ec53LLU;
    k ^= k >> 33;
    return k;

TimerThread::TaskId TimerThread::schedule(
    void (*fn)(void*), void* arg, const timespec& abstime) {
    if (_stop.load(base::memory_order_relaxed) || !_started) {
        // Not add tasks when TimerThread is about to stop.
        return INVALID_TASK_ID;
    // Hashing by pthread id is better for cache locality.
    const Bucket::ScheduleResult result =
        _buckets[fmix64(pthread_self()) % _options.num_buckets]
        .schedule(fn, arg, abstime);
    if (result.earlier) {
        bool earlier = false;
        const int64_t run_time = base::timespec_to_microseconds(abstime);
            if (run_time < _nearest_run_time) {
                _nearest_run_time = run_time;
                earlier = true;
        if (earlier) {
            futex_wake_private(&_nsignals, 1);
    return result.task_id;

// Notice that we don't recycle the Task in this function, let TimerThread::run
// do it. The side effect is that we may allocated many unscheduled tasks before
// TimerThread wakes up. The number is approximiately qps * timeout_s. Under the
// precondition that ResourcePool<Task> caches 128K for each thread, with some
// further calculations, we can conclude that in a RPC scenario:
//   when timeout / latency < 2730 (128K / sizeof(Task))
// unscheduled tasks do not occupy addititonal memory. 2730 is a large ratio
// between timeout and latency in most RPC scenarios, this is why we don't
// try to reuse tasks right now inside unschedule() with more complicated code.
int TimerThread::unschedule(TaskId task_id) {
    const base::ResourceId<Task> slot_id = slot_of_task_id(task_id);
    Task* const task = base::address_resource(slot_id);
    if (task == NULL) {
        LOG(ERROR) << "Invalid task_id=" << task_id;
        return -1;
    const uint32_t id_version = version_of_task_id(task_id);
    uint32_t expected_version = id_version;
    // This CAS is rarely contended, should be fast.
    // The acquire fence is paired with release fence in Task::run_and_delete
    // to make sure that we see all changes brought by fn(arg).
    if (task->version.compare_exchange_strong(
            expected_version, id_version + 2,
            base::memory_order_acquire)) {
        return 0;
    return (expected_version == id_version + 1) ? 1 : -1;

bool TimerThread::Task::run_and_delete() {
    const uint32_t id_version = version_of_task_id(task_id);
    uint32_t expected_version = id_version;
    // This CAS is rarely contended, should be fast.
    if (version.compare_exchange_strong(
            expected_version, id_version + 1, base::memory_order_relaxed)) {
        // The release fence is paired with acquire fence in
        // TimerThread::unschedule to make changes of fn(arg) visible.
        version.store(id_version + 2, base::memory_order_release);
        return true;
    } else if (expected_version == id_version + 2) {
        // already unscheduled.
        return false;
    } else {
        // Impossible.
        LOG(ERROR) << "Invalid version=" << expected_version
                   << ", expecting " << id_version + 2;
        return false;

bool TimerThread::Task::try_delete() {
    const uint32_t id_version = version_of_task_id(task_id);
    if (version.load(base::memory_order_relaxed) != id_version) {
        CHECK_EQ(version.load(base::memory_order_relaxed), id_version + 2);
        return true;
    return false;

template <typename T>
static T deref_value(void* arg) {
    return *(T*)arg;

void TimerThread::run() {
    ComlogInitializer comlog_initializer;

    int64_t last_sleep_time = base::gettimeofday_us();
    BT_VLOG << "Started TimerThread=" << pthread_self();

    // min heap of tasks (ordered by run_time)
    std::vector<Task*> tasks;

    // vars
    size_t nscheduled = 0;
    bvar::PassiveStatus<size_t> nscheduled_var(deref_value<size_t>, &nscheduled);
    bvar::PerSecond<bvar::PassiveStatus<size_t> > nscheduled_second(&nscheduled_var);
    size_t ntriggered = 0;
    bvar::PassiveStatus<size_t> ntriggered_var(deref_value<size_t>, &ntriggered);
    bvar::PerSecond<bvar::PassiveStatus<size_t> > ntriggered_second(&ntriggered_var);
    double busy_seconds = 0;
    bvar::PassiveStatus<double> busy_seconds_var(deref_value<double>, &busy_seconds);
    bvar::PerSecond<bvar::PassiveStatus<double> > busy_seconds_second(&busy_seconds_var);
    if (!_options.bvar_prefix.empty()) {
        nscheduled_second.expose_as(_options.bvar_prefix, "scheduled_second");
        ntriggered_second.expose_as(_options.bvar_prefix, "triggered_second");
        busy_seconds_second.expose_as(_options.bvar_prefix, "usage");

    while (!_stop.load(base::memory_order_relaxed)) {
        // Clear _nearest_run_time before consuming tasks from buckets.
        // This helps us to be aware of earliest task of the new tasks before we
        // would run the consumed tasks.
            _nearest_run_time = std::numeric_limits<int64_t>::max();

        // Pull tasks from buckets.
        for (size_t i = 0; i < _options.num_buckets; ++i) {
            Bucket& bucket = _buckets[i];
            for (Task* p = bucket.consume_tasks(); p != NULL;
                 p = p->next, ++nscheduled) {
                if (!p->try_delete()) { // remove the task if it's unscheduled
                    std::push_heap(tasks.begin(), tasks.end(), task_greater);

        bool pull_again = false;
        while (!tasks.empty()) {
            Task* task1 = tasks[0];  // the about-to-run task
            if (task1->try_delete()) { // already unscheduled
                std::pop_heap(tasks.begin(), tasks.end(), task_greater);
            if (base::gettimeofday_us() < task1->run_time) {  // not ready yet.
            // Each time before we run the earliest task (that we think),
            // check the globally shared _nearest_run_time. If a task earlier
            // than task1 was scheduled during pulling from buckets, we'll
            // know. In RPC scenarios, _nearest_run_time is not often changed by
            // threads because the task needs to be the earliest in its bucket,
            // since run_time of scheduled tasks are often in ascending order,
            // most tasks are unlikely to be "earliest". (If run_time of tasks
            // are in descending orders, all tasks are "earliest" after every
            // insertion, and they'll grab _mutex and change _nearest_run_time
            // frequently, fortunately this is not true at most of time).
                if (task1->run_time > _nearest_run_time) {
                    // a task is earlier than task1. We need to check buckets.
                    pull_again = true;
            std::pop_heap(tasks.begin(), tasks.end(), task_greater);
            if (task1->run_and_delete()) {
        if (pull_again) {
            BT_VLOG << "pull again, tasks=" << tasks.size();

        // The realtime to wait for.
        int64_t next_run_time = std::numeric_limits<int64_t>::max();
        if (tasks.empty()) {
            next_run_time = std::numeric_limits<int64_t>::max();
        } else {
            next_run_time = tasks[0]->run_time;
        // Similarly with the situation before running tasks, we check
        // _nearest_run_time to prevent us from waiting on a non-earliest
        // task. We also use the _nsignal to make sure that if new task
        // is earlier that the realtime that we wait for, we'll wake up.
        int expected_nsignals = 0;
            if (next_run_time > _nearest_run_time) {
                // a task is earlier that what we would wait for.
                // We need to check buckets.
            } else {
                _nearest_run_time = next_run_time;
                expected_nsignals = _nsignals;
        timespec* ptimeout = NULL;
        timespec next_timeout = { 0, 0 };
        const int64_t now = base::gettimeofday_us();
        if (next_run_time != std::numeric_limits<int64_t>::max()) {
            next_timeout = base::microseconds_to_timespec(next_run_time - now);
            ptimeout = &next_timeout;
        busy_seconds += (now - last_sleep_time) / 1000000.0;
        futex_wait_private(&_nsignals, expected_nsignals, ptimeout);
        last_sleep_time = base::gettimeofday_us();
    BT_VLOG << "Ended TimerThread=" << pthread_self();

void TimerThread::stop_and_join() {
    _stop.store(true, base::memory_order_relaxed);
    if (_started) {
             // trigger pull_again and wakeup TimerThread
            _nearest_run_time = 0;
        if (pthread_self() != _thread) {
            // stop_and_join was not called from a running task.
            // wake up the timer thread in case it is sleeping.
            futex_wake_private(&_nsignals, 1);
            pthread_join(_thread, NULL);

static pthread_once_t g_timer_thread_once = PTHREAD_ONCE_INIT;
static TimerThread* g_timer_thread = NULL;
static void init_global_timer_thread() {
    g_timer_thread = new (std::nothrow) TimerThread;
    if (g_timer_thread == NULL) {
        LOG(FATAL) << "Fail to new g_timer_thread";
    TimerThreadOptions options;
    options.bvar_prefix = "bthread_timer";
    const int rc = g_timer_thread->start(&options);
    if (rc != 0) {
        LOG(FATAL) << "Fail to start timer_thread, " << berror(rc);
        delete g_timer_thread;
        g_timer_thread = NULL;
TimerThread* get_or_create_global_timer_thread() {
    pthread_once(&g_timer_thread_once, init_global_timer_thread);
    return g_timer_thread;
TimerThread* get_global_timer_thread() {
    return g_timer_thread;


  • Bucket锁内的操作是O(1)的,就是插入一个链表节点,临界区很小。节点本身的内存分配是在锁外的。
  • 由于大部分插入的时间是递增的,早于Bucket::nearest_run_time而参与全局竞争的timer很少。
  • 参与全局竞争的timer也就是和全局nearest_run_time比一下,临界区很小。
  • 和Bucket内类似,极少数Timer会早于全局nearest_run_time并去唤醒TimerThread。唤醒也在全局锁外。
  • 删除不参与全局竞争。
  • TimerThread自己维护小顶堆,没有任何cache bouncing,效率很高。
  • TimerThread醒来的频率大约是RPC超时的倒数,比如超时=100ms,TimerThread一秒内大约醒10次,已经最优。


下面是一些和linux下时间管理相关的知识: - epoll_wait的超时精度是毫秒,较差。pthread_cond_timedwait的超时使用timespec,精度到纳秒,一般是60微秒左右的延时。 - 出于性能考虑,TimerThread使用wall-time,而不是单调时间,可能受到系统时间调整的影响。具体来说,如果在测试中把系统时间往前或往后调一个小时,程序行为将完全undefined。未来可能会让用户选择单调时间。 - 在cpu支持nonstop_tsc和constant_tsc的机器上,baidu-rpc和bthread会优先使用基于rdtsc的cpuwide_time_us。那两个flag表示rdtsc可作为wall-time使用,不支持的机器上会转而使用较慢的内核时间。我们的机器(Intel Xeon系列)大都有那两个flag。rdtsc作为wall-time使用时是否会受到系统调整时间的影响,未测试不清楚。