前沿
上一篇大概看了下mutex在glibc的实现,glibc中主要对不同类型的锁做了不同处理;如果锁空闲,则在glibc层就可以完成加锁,并返回客户;但是如果锁被占用,再次lock,则需要陷入内核,使用futex系统库,本篇主要看下futex内核实现
上篇主要内容
-
分析了mutex, semaphore glibc层面实现机制
-
mutex有多种不同的类型,可以通过
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL)
设置不同类型的锁 -
mutex, semaphore内核实现不相同
-
mutex内核由futex系统调用支撑,如果没有竞争不需要陷入内核;内核的主要是
futex_wait/wake
函数配合上层完成业务逻辑; -
semaphore内核由mach msg来支撑业务逻辑
-
读写锁
涉及glibc相关函数:pthread_rwlock_init, pthread_rwlock_rdlock, pthread_rwlock_tryrdlock, pthread_rwlock_wrlock, pthread_rwlock_trywrlock, pthread_rwlock_unlock, pthread_rwlock_destroy
条件锁
本篇主要内容
涉及相关glibc函数:pthread_cond_init, pthread_cond_wait, pthread_cond_signal, pthread_cond_timedwait, pthread_cond_reltimedwait_up, pthread_cond_broadcase, phtread_cond_destory
代码分析
条件变量
__pthread_cond_wait_common
/*
* 1. 注册waiter到cond 的 _wseq 队列 (分为G1 G2两个组)
* 2. 释放互斥锁
* 3. 自旋等待,检查 __g_signals,自旋次数结束,进入 futex_wait_cancelable,休眠
* 4. 完成后,需要对mutex进行加锁
*/
static __always_inline int
__pthread_cond_wait_common(pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime)
{
const int maxspin = 0;
int err;
int result = 0;
LIBC_PROBE(cond_wait, 2, cond, mutex);
uint64_t wseq = __condvar_fetch_add_wseq_acquire(cond, 2);
/* Find our group's index. We always go into what was G2 when we acquired
our position. */
unsigned int g = wseq & 1;
uint64_t seq = wseq >> 1;
/* Increase the waiter reference count. Relaxed MO is sufficient because
we only need to synchronize when decrementing the reference count. */
unsigned int flags = atomic_fetch_add_relaxed(&cond->__data.__wrefs, 8);
int private = __condvar_get_private(flags);
/* 注册waiter后,释放mutex,等待信号;如果释放锁失败,则取消waiter注册 */
err = __pthread_mutex_unlock_usercnt(mutex, 0);
if (__glibc_unlikely(err != 0))
{
__condvar_cancel_waiting(cond, seq, g, private);
__condvar_confirm_wakeup(cond, private);
return err;
}
unsigned int signals = atomic_load_acquire(cond->__data.__g_signals + g);
do
{
while (1)
{
/*
* 目前根据maxspin的值来看,不会走进while循环
*/
unsigned int spin = maxspin;
while (signals == 0 && spin > 0)
{
/* Check that we are not spinning on a group that's already
closed. */
if (seq < (__condvar_load_g1_start_relaxed(cond) >> 1))
goto done;
/* TODO Back off. */
/* Reload signals. See above for MO. */
signals = atomic_load_acquire(cond->__data.__g_signals + g);
spin--;
}
/* If our group will be closed as indicated by the flag on signals,
don't bother grabbing a signal. */
if (signals & 1)
goto done;
/* If there is an available signal, don't block. */
if (signals != 0)
break;
/* 自旋等待后,没有信号;则需要block */
atomic_fetch_add_acquire(cond->__data.__g_refs + g, 2);
if (((atomic_load_acquire(cond->__data.__g_signals + g) & 1) != 0) || (seq < (__condvar_load_g1_start_relaxed(cond) >> 1)))
{
/* Our group is closed. Wake up any signalers that might be
waiting. */
__condvar_dec_grefs(cond, g, private);
goto done;
}
// block等待
struct _pthread_cleanup_buffer buffer;
struct _condvar_cleanup_buffer cbuffer;
cbuffer.wseq = wseq;
cbuffer.cond = cond;
cbuffer.mutex = mutex;
cbuffer.private = private;
/* 注册清理函数 __condvar_cancel_waiting, 被唤醒后,会调用回调函数,清理waiter,最后会重新获取mutex */
__pthread_cleanup_push(&buffer, __condvar_cleanup_waiting, &cbuffer);
/* 如果没有等待超时时间 */
if (abstime == NULL)
{
/* 使用 futex=cond->__data.__g_signals + g,调用 lll_futex_timed_wait->lll_futex_syscall 陷入内核
* 线程取消类型:deferred(延迟取消), asynchronous(异步取消)
* --> oldtype = __pthread_enable_asynccancel (); //将线程取消类型设置为异步取消
--> int err = lll_futex_timed_wait (futex_word, expected, NULL, private); //进入睡眠等待条件发生
--> __pthread_disable_asynccancel (oldtype); //将线程取消类型改回延迟取消
*/
err = futex_wait_cancelable(
cond->__data.__g_signals + g, 0, private);
}
else/* 有等待超时时间 */
{
/* Block, but with a timeout. 超时绝对时间为负值 */
if (__glibc_unlikely(abstime->tv_sec < 0))
err = ETIMEDOUT;
else if ((flags & __PTHREAD_COND_CLOCK_MONOTONIC_MASK) != 0)
{
/* CLOCK_MONOTONIC is requested. */
struct timespec rt;
if (__clock_gettime(CLOCK_MONOTONIC, &rt) != 0)
__libc_fatal("clock_gettime does not support "
"CLOCK_MONOTONIC");
/* Convert the absolute timeout value to a relative
timeout. */
rt.tv_sec = abstime->tv_sec - rt.tv_sec;
rt.tv_nsec = abstime->tv_nsec - rt.tv_nsec;
if (rt.tv_nsec < 0)
{
rt.tv_nsec += 1000000000;
--rt.tv_sec;
}
/* 如果已经超时返回超时错误 */
if (__glibc_unlikely(rt.tv_sec < 0))
err = ETIMEDOUT;
else/* 否则执行带超时时间的 cancelable wait */
err = futex_reltimed_wait_cancelable(cond->__data.__g_signals + g, 0, &rt, private);
}
else
{
/* Use CLOCK_REALTIME. */
err = futex_abstimed_wait_cancelable(cond->__data.__g_signals + g, 0, abstime, private);
}
}
/* 弹出线程清理函数 */
__pthread_cleanup_pop(&buffer, 0);
if (__glibc_unlikely(err == ETIMEDOUT))
{
__condvar_dec_grefs(cond, g, private);
/* 如果超时退出等待,则取消waiter注册 */
__condvar_cancel_waiting(cond, seq, g, private);
result = ETIMEDOUT;
goto done;
}
else
__condvar_dec_grefs(cond, g, private);
/* Reload signals. See above for MO. */
signals = atomic_load_acquire(cond->__data.__g_signals + g);
}
}
/* Try to grab a signal. Use acquire MO so that we see an up-to-date value
of __g1_start below (see spinning above for a similar case). In
particular, if we steal from a more recent group, we will also see a
more recent __g1_start below. */
while (!atomic_compare_exchange_weak_acquire(cond->__data.__g_signals + g,
&signals, signals - 2));
/* 下面应该是涉及如果收到的是广播信号,如何确定解锁优先级,解决惊群现象可能引发的bug,具体没有看懂 */
uint64_t g1_start = __condvar_load_g1_start_relaxed(cond);
if (seq < (g1_start >> 1))
{
if (((g1_start & 1) ^ 1) == g)
{
unsigned int s = atomic_load_relaxed(cond->__data.__g_signals + g);
while (__condvar_load_g1_start_relaxed(cond) == g1_start)
{
if (((s & 1) != 0) || atomic_compare_exchange_weak_relaxed(cond->__data.__g_signals + g, &s, s + 2))
{
futex_wake(cond->__data.__g_signals + g, 1, private);
break;
}
/* TODO Back off. */
}
}
}
done:
/* Confirm that we have been woken. We do that before acquiring the mutex
to allow for execution of pthread_cond_destroy while having acquired the
mutex. */
__condvar_confirm_wakeup(cond, private);
/* 获取互斥锁 */
err = __pthread_mutex_cond_lock(mutex);
/* XXX Abort on errors that are disallowed by POSIX? */
return (err != 0) ? err : result;
}
__pthread_cond_signal
/*
* 1. 检查 cond __wseq, 若没有waiter则直接返回
* 2. 有waiter, 检查是否需要切换组(例如首次调用 wait 后 G1 为空,G2有一个等待者,则首次调用 signal 后需要将 G2 切换为 G1)
* 3. 递增 __g_signals, 递减__g_size(未唤醒的waiters个数),再调用futex_wake
*/
int __pthread_cond_signal (pthread_cond_t *cond)
{
LIBC_PROBE (cond_signal, 1, cond);
/* First check whether there are waiters. Relaxed MO is fine for that for
the same reasons that relaxed MO is fine when observing __wseq (see
below). */
unsigned int wrefs = atomic_load_relaxed (&cond->__data.__wrefs);
if (wrefs >> 3 == 0)
return 0;
int private = __condvar_get_private (wrefs);
__condvar_acquire_lock (cond, private);
unsigned long long int wseq = __condvar_load_wseq_relaxed (cond);
unsigned int g1 = (wseq & 1) ^ 1;
wseq >>= 1;
bool do_futex_wake = false;
/* 检查是否需要切换组. 如果g1组waiters num==0,则执行切换 */
if ((cond->__data.__g_size[g1] != 0)
|| __condvar_quiesce_and_switch_g1 (cond, wseq, &g1, private))
{
/* 递增 __g_signals */
atomic_fetch_add_relaxed (cond->__data.__g_signals + g1, 2);
cond->__data.__g_size[g1]--;/* G1组waiter数递减 */
/* TODO Only set it if there are indeed futex waiters. */
do_futex_wake = true;
}
__condvar_release_lock (cond, private);
if (do_futex_wake)
futex_wake (cond->__data.__g_signals + g1, 1, private);
return 0;
}
__pthread_cond_broadcast
/* We do the following steps from __pthread_cond_signal in one critical
section: (1) signal all waiters in G1, (2) close G1 so that it can become
the new G2 and make G2 the new G1, and (3) signal all waiters in the new
G1. We don't need to do all these steps if there are no waiters in G1
and/or G2. See __pthread_cond_signal for further details.
*/
int
__pthread_cond_broadcast (pthread_cond_t *cond)
{
LIBC_PROBE (cond_broadcast, 1, cond);
unsigned int wrefs = atomic_load_relaxed (&cond->__data.__wrefs);
if (wrefs >> 3 == 0)
return 0;
int private = __condvar_get_private (wrefs);
__condvar_acquire_lock (cond, private);
unsigned long long int wseq = __condvar_load_wseq_relaxed (cond);
unsigned int g2 = wseq & 1;
unsigned int g1 = g2 ^ 1;
wseq >>= 1;
bool do_futex_wake = false;
/* Step (1): signal all waiters remaining in G1. */
if (cond->__data.__g_size[g1] != 0)
{
/* Add as many signals as the remaining size of the group. */
atomic_fetch_add_relaxed (cond->__data.__g_signals + g1,
cond->__data.__g_size[g1] << 1);
cond->__data.__g_size[g1] = 0;
/* We need to wake G1 waiters before we quiesce G1 below. */
/* TODO Only set it if there are indeed futex waiters. We could
also try to move this out of the critical section in cases when
G2 is empty (and we don't need to quiesce). */
futex_wake (cond->__data.__g_signals + g1, INT_MAX, private);
}
/* G1 is complete. Step (2) is next unless there are no waiters in G2, in
which case we can stop. */
if (__condvar_quiesce_and_switch_g1 (cond, wseq, &g1, private))
{
/* Step (3): Send signals to all waiters in the old G2 / new G1. */
atomic_fetch_add_relaxed (cond->__data.__g_signals + g1,
cond->__data.__g_size[g1] << 1);
cond->__data.__g_size[g1] = 0;
/* TODO Only set it if there are indeed futex waiters. */
do_futex_wake = true;
}
__condvar_release_lock (cond, private);
if (do_futex_wake)
futex_wake (cond->__data.__g_signals + g1, INT_MAX, private);
return 0;
}
读写锁
__pthread_rwlock_init
/* 初始化pthread_rwlock_t *rwlock */
int
__pthread_rwlock_init (pthread_rwlock_t *rwlock,
const pthread_rwlockattr_t *attr)
{
ASSERT_TYPE_SIZE (pthread_rwlock_t, __SIZEOF_PTHREAD_RWLOCK_T);
const struct pthread_rwlockattr *iattr;
iattr = ((const struct pthread_rwlockattr *) attr) ?: &default_rwlockattr;
memset (rwlock, '\0', sizeof (*rwlock));
rwlock->__data.__flags = iattr->lockkind;
/* The value of __SHARED in a private rwlock must be zero. */
rwlock->__data.__shared = (iattr->pshared != PTHREAD_PROCESS_PRIVATE);
return 0;
}
__pthread_rwlock_rdlock_full
static __always_inline int
__pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
const struct timespec *abstime)
{
unsigned int r;
/* Make sure we are not holding the rwlock as a writer. This is a deadlock
situation we recognize and report. */
if (__glibc_unlikely (atomic_load_relaxed (&rwlock->__data.__cur_writer)
== THREAD_GETMEM (THREAD_SELF, tid)))
return EDEADLK;
/* If we prefer writers, recursive rdlock is disallowed, we are in a read
phase, and there are other readers present, we try to wait without
extending the read phase. We will be unblocked by either one of the
other active readers, or if the writer gives up WRLOCKED (e.g., on
timeout).
If there are no other readers, we simply race with any existing primary
writer; it would have been a race anyway, and changing the odds slightly
will likely not make a big difference. */
if (rwlock->__data.__flags == PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
{
r = atomic_load_relaxed (&rwlock->__data.__readers);
while (((r & PTHREAD_RWLOCK_WRPHASE) == 0)
&& ((r & PTHREAD_RWLOCK_WRLOCKED) != 0)
&& ((r >> PTHREAD_RWLOCK_READER_SHIFT) > 0))
{
/* TODO Spin first. */
/* Try setting the flag signaling that we are waiting without having
incremented the number of readers. Relaxed MO is fine because
this is just about waiting for a state change in __readers. */
if (atomic_compare_exchange_weak_relaxed
(&rwlock->__data.__readers, &r, r | PTHREAD_RWLOCK_RWAITING))
{
/* Wait for as long as the flag is set. An ABA situation is
harmless because the flag is just about the state of
__readers, and all threads set the flag under the same
conditions. */
while ((atomic_load_relaxed (&rwlock->__data.__readers)
& PTHREAD_RWLOCK_RWAITING) != 0)
{
int private = __pthread_rwlock_get_private (rwlock);
int err = futex_abstimed_wait (&rwlock->__data.__readers,
r, abstime, private);
/* We ignore EAGAIN and EINTR. On time-outs, we can just
return because we don't need to clean up anything. */
if (err == ETIMEDOUT)
return err;
}
/* It makes sense to not break out of the outer loop here
because we might be in the same situation again. */
}
else
{
/* TODO Back-off. */
}
}
}
/* Register as a reader, using an add-and-fetch so that R can be used as
expected value for future operations. Acquire MO so we synchronize with
prior writers as well as the last reader of the previous read phase (see
below). */
r = atomic_fetch_add_acquire (&rwlock->__data.__readers,
(1 << PTHREAD_RWLOCK_READER_SHIFT)) + (1 << PTHREAD_RWLOCK_READER_SHIFT);
/* Check whether there is an overflow in the number of readers. We assume
that the total number of threads is less than half the maximum number
of readers that we have bits for in __readers (i.e., with 32-bit int and
PTHREAD_RWLOCK_READER_SHIFT of 3, we assume there are less than
1 << (32-3-1) concurrent threads).
If there is an overflow, we use a CAS to try to decrement the number of
readers if there still is an overflow situation. If so, we return
EAGAIN; if not, we are not a thread causing an overflow situation, and so
we just continue. Using a fetch-add instead of the CAS isn't possible
because other readers might release the lock concurrently, which could
make us the last reader and thus responsible for handing ownership over
to writers (which requires a CAS too to make the decrement and ownership
transfer indivisible). */
while (__glibc_unlikely (r >= PTHREAD_RWLOCK_READER_OVERFLOW))
{
/* Relaxed MO is okay because we just want to undo our registration and
cannot have changed the rwlock state substantially if the CAS
succeeds. */
if (atomic_compare_exchange_weak_relaxed (&rwlock->__data.__readers, &r,
r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
return EAGAIN;
}
/* We have registered as a reader, so if we are in a read phase, we have
acquired a read lock. This is also the reader--reader fast-path.
Even if there is a primary writer, we just return. If writers are to
be preferred and we are the only active reader, we could try to enter a
write phase to let the writer proceed. This would be okay because we
cannot have acquired the lock previously as a reader (which could result
in deadlock if we would wait for the primary writer to run). However,
this seems to be a corner case and handling it specially not be worth the
complexity. */
if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
return 0;
/* Otherwise, if we were in a write phase (states #6 or #8), we must wait
for explicit hand-over of the read phase; the only exception is if we
can start a read phase if there is no primary writer currently. */
while (((r & PTHREAD_RWLOCK_WRPHASE) != 0)
&& ((r & PTHREAD_RWLOCK_WRLOCKED) == 0))
{
/* Try to enter a read phase: If the CAS below succeeds, we have
ownership; if it fails, we will simply retry and reassess the
situation.
Acquire MO so we synchronize with prior writers. */
if (atomic_compare_exchange_weak_acquire (&rwlock->__data.__readers, &r,
r ^ PTHREAD_RWLOCK_WRPHASE))
{
/* We started the read phase, so we are also responsible for
updating the write-phase futex. Relaxed MO is sufficient.
We have to do the same steps as a writer would when handing
over the read phase to us because other readers cannot
distinguish between us and the writer; this includes
explicit hand-over and potentially having to wake other readers
(but we can pretend to do the setting and unsetting of WRLOCKED
atomically, and thus can skip this step). */
if ((atomic_exchange_relaxed (&rwlock->__data.__wrphase_futex, 0)
& PTHREAD_RWLOCK_FUTEX_USED) != 0)
{
int private = __pthread_rwlock_get_private (rwlock);
futex_wake (&rwlock->__data.__wrphase_futex, INT_MAX, private);
}
return 0;
}
else
{
/* TODO Back off before retrying. Also see above. */
}
}
/* We were in a write phase but did not install the read phase. We cannot
distinguish between a writer and another reader starting the read phase,
so we must wait for explicit hand-over via __wrphase_futex.
However, __wrphase_futex might not have been set to 1 yet (either
because explicit hand-over to the writer is still ongoing, or because
the writer has started the write phase but has not yet updated
__wrphase_futex). The least recent value of __wrphase_futex we can
read from here is the modification of the last read phase (because
we synchronize with the last reader in this read phase through
__readers; see the use of acquire MO on the fetch_add above).
Therefore, if we observe a value of 0 for __wrphase_futex, we need
to subsequently check that __readers now indicates a read phase; we
need to use acquire MO for this so that if we observe a read phase,
we will also see the modification of __wrphase_futex by the previous
writer. We then need to load __wrphase_futex again and continue to
wait if it is not 0, so that we do not skip explicit hand-over.
Relaxed MO is sufficient for the load from __wrphase_futex because
we just use it as an indicator for when we can proceed; we use
__readers and the acquire MO accesses to it to eventually read from
the proper stores to __wrphase_futex. */
unsigned int wpf;
bool ready = false;
for (;;)
{
while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
| PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
{
int private = __pthread_rwlock_get_private (rwlock);
if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
&& !atomic_compare_exchange_weak_relaxed
(&rwlock->__data.__wrphase_futex,
&wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED))
continue;
int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
if (err == ETIMEDOUT)
{
/* If we timed out, we need to unregister. If no read phase
has been installed while we waited, we can just decrement
the number of readers. Otherwise, we just acquire the
lock, which is allowed because we give no precise timing
guarantees, and because the timeout is only required to
be in effect if we would have had to wait for other
threads (e.g., if futex_wait would time-out immediately
because the given absolute time is in the past). */
r = atomic_load_relaxed (&rwlock->__data.__readers);
while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
{
/* We don't need to make anything else visible to
others besides unregistering, so relaxed MO is
sufficient. */
if (atomic_compare_exchange_weak_relaxed
(&rwlock->__data.__readers, &r,
r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
return ETIMEDOUT;
/* TODO Back-off. */
}
/* Use the acquire MO fence to mirror the steps taken in the
non-timeout case. Note that the read can happen both
in the atomic_load above as well as in the failure case
of the CAS operation. */
atomic_thread_fence_acquire ();
/* We still need to wait for explicit hand-over, but we must
not use futex_wait anymore because we would just time out
in this case and thus make the spin-waiting we need
unnecessarily expensive. */
while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
| PTHREAD_RWLOCK_FUTEX_USED)
== (1 | PTHREAD_RWLOCK_FUTEX_USED))
{
/* TODO Back-off? */
}
ready = true;
break;
}
/* If we got interrupted (EINTR) or the futex word does not have the
expected value (EAGAIN), retry. */
}
if (ready)
/* See below. */
break;
/* We need acquire MO here so that we synchronize with the lock
release of the writer, and so that we observe a recent value of
__wrphase_futex (see below). */
if ((atomic_load_acquire (&rwlock->__data.__readers)
& PTHREAD_RWLOCK_WRPHASE) == 0)
/* We are in a read phase now, so the least recent modification of
__wrphase_futex we can read from is the store by the writer
with value 1. Thus, only now we can assume that if we observe
a value of 0, explicit hand-over is finished. Retry the loop
above one more time. */
ready = true;
}
return 0;
}
__pthread_rwlock_wrlock_full
static __always_inline int
__pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
const struct timespec *abstime)
{
/* Make sure we are not holding the rwlock as a writer. This is a deadlock
situation we recognize and report. */
if (__glibc_unlikely (atomic_load_relaxed (&rwlock->__data.__cur_writer)
== THREAD_GETMEM (THREAD_SELF, tid)))
return EDEADLK;
/* First we try to acquire the role of primary writer by setting WRLOCKED;
if it was set before, there already is a primary writer. Acquire MO so
that we synchronize with previous primary writers.
We do not try to change to a write phase right away using a fetch_or
because we would have to reset it again and wake readers if there are
readers present (some readers could try to acquire the lock more than
once, so setting a write phase in the middle of this could cause
deadlock). Changing to a write phase eagerly would only speed up the
transition from a read phase to a write phase in the uncontended case,
but it would slow down the contended case if readers are preferred (which
is the default).
We could try to CAS from a state with no readers to a write phase, but
this could be less scalable if readers arrive and leave frequently. */
bool may_share_futex_used_flag = false;
unsigned int r = atomic_fetch_or_acquire (&rwlock->__data.__readers,
PTHREAD_RWLOCK_WRLOCKED);
if (__glibc_unlikely ((r & PTHREAD_RWLOCK_WRLOCKED) != 0))
{
/* There is another primary writer. */
bool prefer_writer =
(rwlock->__data.__flags != PTHREAD_RWLOCK_PREFER_READER_NP);
if (prefer_writer)
{
/* We register as a waiting writer, so that we can make use of
writer--writer hand-over. Relaxed MO is fine because we just
want to register. We assume that the maximum number of threads
is less than the capacity in __writers. */
atomic_fetch_add_relaxed (&rwlock->__data.__writers, 1);
}
for (;;)
{
/* TODO Spin until WRLOCKED is 0 before trying the CAS below.
But pay attention to not delay trying writer--writer hand-over
for too long (which we must try eventually anyway). */
if ((r & PTHREAD_RWLOCK_WRLOCKED) == 0)
{
/* Try to become the primary writer or retry. Acquire MO as in
the fetch_or above. */
if (atomic_compare_exchange_weak_acquire
(&rwlock->__data.__readers, &r,
r | PTHREAD_RWLOCK_WRLOCKED))
{
if (prefer_writer)
{
/* Unregister as a waiting writer. Note that because we
acquired WRLOCKED, WRHANDOVER will not be set.
Acquire MO on the CAS above ensures that
unregistering happens after the previous writer;
this sorts the accesses to __writers by all
primary writers in a useful way (e.g., any other
primary writer acquiring after us or getting it from
us through WRHANDOVER will see both our changes to
__writers).
??? Perhaps this is not strictly necessary for
reasons we do not yet know of. */
atomic_fetch_add_relaxed (&rwlock->__data.__writers,
-1);
}
break;
}
/* Retry if the CAS fails (r will have been updated). */
continue;
}
/* If writer--writer hand-over is available, try to become the
primary writer this way by grabbing the WRHANDOVER token. If we
succeed, we own WRLOCKED. */
if (prefer_writer)
{
unsigned int w = atomic_load_relaxed
(&rwlock->__data.__writers);
if ((w & PTHREAD_RWLOCK_WRHANDOVER) != 0)
{
/* Acquire MO is required here so that we synchronize with
the writer that handed over WRLOCKED. We also need this
for the reload of __readers below because our view of
__readers must be at least as recent as the view of the
writer that handed over WRLOCKED; we must avoid an ABA
through WRHANDOVER, which could, for example, lead to us
assuming we are still in a write phase when in fact we
are not. */
if (atomic_compare_exchange_weak_acquire
(&rwlock->__data.__writers,
&w, (w - PTHREAD_RWLOCK_WRHANDOVER - 1)))
{
/* Reload so our view is consistent with the view of
the previous owner of WRLOCKED. See above. */
r = atomic_load_relaxed (&rwlock->__data.__readers);
break;
}
/* We do not need to reload __readers here. We should try
to perform writer--writer hand-over if possible; if it
is not possible anymore, we will reload __readers
elsewhere in this loop. */
continue;
}
}
/* We did not acquire WRLOCKED nor were able to use writer--writer
hand-over, so we block on __writers_futex. */
int private = __pthread_rwlock_get_private (rwlock);
unsigned int wf = atomic_load_relaxed
(&rwlock->__data.__writers_futex);
if (((wf & ~(unsigned int) PTHREAD_RWLOCK_FUTEX_USED) != 1)
|| ((wf != (1 | PTHREAD_RWLOCK_FUTEX_USED))
&& !atomic_compare_exchange_weak_relaxed
(&rwlock->__data.__writers_futex, &wf,
1 | PTHREAD_RWLOCK_FUTEX_USED)))
{
/* If we cannot block on __writers_futex because there is no
primary writer, or we cannot set PTHREAD_RWLOCK_FUTEX_USED,
we retry. We must reload __readers here in case we cannot
block on __writers_futex so that we can become the primary
writer and are not stuck in a loop that just continuously
fails to block on __writers_futex. */
r = atomic_load_relaxed (&rwlock->__data.__readers);
continue;
}
/* We set the flag that signals that the futex is used, or we could
have set it if we had been faster than other waiters. As a
result, we may share the flag with an unknown number of other
writers. Therefore, we must keep this flag set when we acquire
the lock. We do not need to do this when we do not reach this
point here because then we are not part of the group that may
share the flag, and another writer will wake one of the writers
in this group. */
may_share_futex_used_flag = true;
int err = futex_abstimed_wait (&rwlock->__data.__writers_futex,
1 | PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
if (err == ETIMEDOUT)
{
if (prefer_writer)
{
/* We need to unregister as a waiting writer. If we are the
last writer and writer--writer hand-over is available,
we must make use of it because nobody else will reset
WRLOCKED otherwise. (If we use it, we simply pretend
that this happened before the timeout; see
pthread_rwlock_rdlock_full for the full reasoning.)
Also see the similar code above. */
unsigned int w = atomic_load_relaxed
(&rwlock->__data.__writers);
while (!atomic_compare_exchange_weak_acquire
(&rwlock->__data.__writers, &w,
(w == PTHREAD_RWLOCK_WRHANDOVER + 1 ? 0 : w - 1)))
{
/* TODO Back-off. */
}
if (w == PTHREAD_RWLOCK_WRHANDOVER + 1)
{
/* We must continue as primary writer. See above. */
r = atomic_load_relaxed (&rwlock->__data.__readers);
break;
}
}
/* We cleaned up and cannot have stolen another waiting writer's
futex wake-up, so just return. */
return ETIMEDOUT;
}
/* If we got interrupted (EINTR) or the futex word does not have the
expected value (EAGAIN), retry after reloading __readers. */
r = atomic_load_relaxed (&rwlock->__data.__readers);
}
/* Our snapshot of __readers is up-to-date at this point because we
either set WRLOCKED using a CAS (and update r accordingly below,
which was used as expected value for the CAS) or got WRLOCKED from
another writer whose snapshot of __readers we inherit. */
r |= PTHREAD_RWLOCK_WRLOCKED;
}
/* We are the primary writer; enable blocking on __writers_futex. Relaxed
MO is sufficient for futex words; acquire MO on the previous
modifications of __readers ensures that this store happens after the
store of value 0 by the previous primary writer. */
atomic_store_relaxed (&rwlock->__data.__writers_futex,
1 | (may_share_futex_used_flag ? PTHREAD_RWLOCK_FUTEX_USED : 0));
/* If we are in a write phase, we have acquired the lock. */
if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
goto done;
/* If we are in a read phase and there are no readers, try to start a write
phase. */
while (((r & PTHREAD_RWLOCK_WRPHASE) == 0)
&& ((r >> PTHREAD_RWLOCK_READER_SHIFT) == 0))
{
/* Acquire MO so that we synchronize with prior writers and do
not interfere with their updates to __writers_futex, as well
as regarding prior readers and their updates to __wrphase_futex,
respectively. */
if (atomic_compare_exchange_weak_acquire (&rwlock->__data.__readers,
&r, r | PTHREAD_RWLOCK_WRPHASE))
{
/* We have started a write phase, so need to enable readers to wait.
See the similar case in __pthread_rwlock_rdlock_full. Unlike in
that similar case, we are the (only) primary writer and so do
not need to wake another writer. */
atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 1);
goto done;
}
/* TODO Back-off. */
}
/* We became the primary writer in a read phase and there were readers when
we did (because of the previous loop). Thus, we have to wait for
explicit hand-over from one of these readers.
We basically do the same steps as for the similar case in
__pthread_rwlock_rdlock_full, except that we additionally might try
to directly hand over to another writer and need to wake up
other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING). */
unsigned int wpf;
bool ready = false;
for (;;)
{
while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
| PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
{
int private = __pthread_rwlock_get_private (rwlock);
if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
&& !atomic_compare_exchange_weak_relaxed
(&rwlock->__data.__wrphase_futex, &wpf,
PTHREAD_RWLOCK_FUTEX_USED))
continue;
int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
PTHREAD_RWLOCK_FUTEX_USED, abstime, private);
if (err == ETIMEDOUT)
{
if (rwlock->__data.__flags
!= PTHREAD_RWLOCK_PREFER_READER_NP)
{
/* We try writer--writer hand-over. */
unsigned int w = atomic_load_relaxed
(&rwlock->__data.__writers);
if (w != 0)
{
/* We are about to hand over WRLOCKED, so we must
release __writers_futex too; otherwise, we'd have
a pending store, which could at least prevent
other threads from waiting using the futex
because it could interleave with the stores
by subsequent writers. In turn, this means that
we have to clean up when we do not hand over
WRLOCKED.
Release MO so that another writer that gets
WRLOCKED from us can take over our view of
__readers. */
unsigned int wf = atomic_exchange_relaxed
(&rwlock->__data.__writers_futex, 0);
while (w != 0)
{
if (atomic_compare_exchange_weak_release
(&rwlock->__data.__writers, &w,
w | PTHREAD_RWLOCK_WRHANDOVER))
{
/* Wake other writers. */
if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
futex_wake (&rwlock->__data.__writers_futex,
1, private);
return ETIMEDOUT;
}
/* TODO Back-off. */
}
/* We still own WRLOCKED and someone else might set
a write phase concurrently, so enable waiting
again. Make sure we don't loose the flag that
signals whether there are threads waiting on
this futex. */
atomic_store_relaxed
(&rwlock->__data.__writers_futex, wf);
}
}
/* If we timed out and we are not in a write phase, we can
just stop being a primary writer. Otherwise, we just
acquire the lock. */
r = atomic_load_relaxed (&rwlock->__data.__readers);
if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
{
/* We are about to release WRLOCKED, so we must release
__writers_futex too; see the handling of
writer--writer hand-over above. */
unsigned int wf = atomic_exchange_relaxed
(&rwlock->__data.__writers_futex, 0);
while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
{
/* While we don't need to make anything from a
caller's critical section visible to other
threads, we need to ensure that our changes to
__writers_futex are properly ordered.
Therefore, use release MO to synchronize with
subsequent primary writers. Also wake up any
waiting readers as they are waiting because of
us. */
if (atomic_compare_exchange_weak_release
(&rwlock->__data.__readers, &r,
(r ^ PTHREAD_RWLOCK_WRLOCKED)
& ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
{
/* Wake other writers. */
if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
futex_wake (&rwlock->__data.__writers_futex,
1, private);
/* Wake waiting readers. */
if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
futex_wake (&rwlock->__data.__readers,
INT_MAX, private);
return ETIMEDOUT;
}
}
/* We still own WRLOCKED and someone else might set a
write phase concurrently, so enable waiting again.
Make sure we don't loose the flag that signals
whether there are threads waiting on this futex. */
atomic_store_relaxed (&rwlock->__data.__writers_futex, wf);
}
/* Use the acquire MO fence to mirror the steps taken in the
non-timeout case. Note that the read can happen both
in the atomic_load above as well as in the failure case
of the CAS operation. */
atomic_thread_fence_acquire ();
/* We still need to wait for explicit hand-over, but we must
not use futex_wait anymore. */
while ((atomic_load_relaxed
(&rwlock->__data.__wrphase_futex)
| PTHREAD_RWLOCK_FUTEX_USED)
== PTHREAD_RWLOCK_FUTEX_USED)
{
/* TODO Back-off. */
}
ready = true;
break;
}
/* If we got interrupted (EINTR) or the futex word does not have
the expected value (EAGAIN), retry. */
}
/* See pthread_rwlock_rdlock_full. */
if (ready)
break;
if ((atomic_load_acquire (&rwlock->__data.__readers)
& PTHREAD_RWLOCK_WRPHASE) != 0)
ready = true;
}
done:
atomic_store_relaxed (&rwlock->__data.__cur_writer,
THREAD_GETMEM (THREAD_SELF, tid));
return 0;
}
####
####
####
####
附录
__pthread_cond_s
/* Common definition of pthread_cond_t. */
struct __pthread_cond_s
{
__extension__ union
{
__extension__ unsigned long long int __wseq;
struct
{
unsigned int __low;
unsigned int __high;
} __wseq32;
};
__extension__ union
{
__extension__ unsigned long long int __g1_start;
struct
{
unsigned int __low;
unsigned int __high;
} __g1_start32;
};
unsigned int __g_refs[2] __LOCK_ALIGNMENT;
unsigned int __g_size[2]; /* 不同group等待的waiter数量 */
unsigned int __g1_orig_size;
unsigned int __wrefs;
unsigned int __g_signals[2];/* 已经触发的信号数量? */
};
__pthread_rwlock_arch_t
struct __pthread_rwlock_arch_t
{
unsigned int __readers; /*等待读锁数*/
unsigned int __writers; /*等待写锁数*/
unsigned int __wrphase_futex;
unsigned int __writers_futex;
unsigned int __pad3;
unsigned int __pad4;
int __cur_writer; /* 目前占用写锁线程号 */
int __shared; /*读写锁共享模式*/
unsigned long int __pad1;
unsigned long int __pad2;
/* FLAGS must stay at this position in the structure to maintain
binary compatibility. */
unsigned int __flags;/*读写锁类型标识*/
}
####
####
线程取消
线程取消有两种状态:cancelstate, canseltype
-
cancelstate: enabled, disabled; enabled根据cenceltype值进行后续操作;否则cancel请求加入到请求队列,等待cancelstate变为enabled时再进行后续操作
-
canceltype: deferred(延迟取消), asynchronous(异步取消)
-
deferred: 线程收到取消请求,不作出任何相应,而是要等到取消点时才执行取消
-
asynchronous: 可以在任何时候执行取消动作
-
-
线程取消动作:
-
将pthread_cleanup_push压栈的函数出栈执行
-
调用线程的析构函数
-
调用pthread_exit
-