前沿
这两天有个面试问题问到linux常用的几种锁的实现,区别,以及如果用互斥锁api实现读写锁;没答上,现在补补功课
mutex, semaphore, spinlock比较
-
mutex: sleep-waiting型,底层通过futex实现互斥,进程、线程互斥都支持,但是用在进程间互斥时,需要手动创建进程间共享内存
- pthread_mutex_t mut; pthread_mutex_init(&mut, NULL); pthread_mutex_lock(&mut); pthread_mutex_unlock(&mut)
-
semaphore: 底层通过 mach msg进程间通信系统来完成等待锁、唤醒等待进程
-
sem_t sem; sem_init(&sem, 0, 1); sem_wait(&sem); sem_post(&sem);
-
关于mach msg系统网上资料较少,未看清实现方式
-
-
spinlock: busy-waiting型,实现cpu间总线互斥,加的是总线锁(有限次while循环不sleep加锁);一般用在内核态
futex
-
futex是一种用户态和内核台混合的同步机制,同步的进程间通过共享一段内存,futex变量就位于这段共享的内存中且操作是原子的;
-
futex变量只有3种可能
-
0: 没人获得锁
-
1: 当只有一个线程调用pthread_mutex_lock;如果此后没有其它线程争用该mutex,则一直为1,直到unlock更新为0
-
2: 值为1时,有其它线程调用pthread_mutex_lock(),则被设置为2
-
-
当值为1时,unlock时只需要原子的将值改为0,不需要调用sys_futex系统调用;否则需要调用sys_futex,唤醒等待该锁上的线程
代码分析
__pthread_mutex_lock
int __pthread_mutex_lock (pthread_mutex_t *mutex)
{
unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);
LIBC_PROBE (mutex_entry, 1, mutex);
if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP
| PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))
return __pthread_mutex_lock_full (mutex);
if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
{
// 如果为普通锁
FORCE_ELISION (mutex, goto elision);
simple:
/* Normal mutex. 调用函数加锁 */
LLL_MUTEX_LOCK (mutex);
/*确保是本线程获取到了锁*/
assert (mutex->__data.__owner == 0);
}
#ifdef HAVE_ELISION
else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP))
{
elision: __attribute__((unused))
/* This case can never happen on a system without elision,
as the mutex type initialization functions will not
allow to set the elision flags. */
/* Don't record owner or users for elision case. This is a
tail call. */
return LLL_MUTEX_LOCK_ELISION (mutex);
}
#endif
else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
== PTHREAD_MUTEX_RECURSIVE_NP, 1))
{
/* Recursive mutex. 如果是可重入锁,获取线程pid */
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
/* 如果是本线程目前持有锁 */
if (mutex->__data.__owner == id)
{
/* Just bump the counter. 如果count计数器翻转,则加锁失败,返回错误码 */
if (__glibc_unlikely (mutex->__data.__count + 1 == 0))
/* Overflow of the counter. */
return EAGAIN;
/*计数器++*/
++mutex->__data.__count;
/*可重入锁如果是本线程持有锁,只需要计数器++,不需要真正再去获取锁*/
return 0;
}
/* We have to get the mutex. 非本线程持有锁,则走获取锁流程 */
LLL_MUTEX_LOCK (mutex);
/* 如果__owner!=0,表示已经有别的线程先抢占到了锁? */
assert (mutex->__data.__owner == 0);
mutex->__data.__count = 1;
}
else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
== PTHREAD_MUTEX_ADAPTIVE_NP, 1))
{
/*自适应锁, 是普通锁和自旋锁的结合体*/
/*通过is_smp_system()函数判断是否为SMP系统*/
if (! __is_smp)
goto simple;
/*如果为smp系统,首先尝试trylock*/
if (LLL_MUTEX_TRYLOCK (mutex) != 0)
{
/*如果trylock不成功*/
int cnt = 0;
/*计算最大的自旋次数*/
int max_cnt = MIN (MAX_ADAPTIVE_COUNT,
mutex->__data.__spins * 2 + 10);
do
{
if (cnt++ >= max_cnt)
{
/*如果自旋次数大于最大次数, 调用加锁函数*/
LLL_MUTEX_LOCK (mutex);
break;
}
/*如果没达到自旋次数则不停的循环trylock,sleep*/
atomic_spin_nop ();
}
while (LLL_MUTEX_TRYLOCK (mutex) != 0);
/*记录已经自旋次数*/
mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;
}
assert (mutex->__data.__owner == 0);
}
else
{
/*检错锁*/
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
/*确保只有检错锁才会走到本分支*/
assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP);
/* Check whether we already hold the mutex. 如果本线程已经获取锁,则返回EDEADLK */
if (__glibc_unlikely (mutex->__data.__owner == id))
return EDEADLK;
/*否则,走正常获取锁逻辑*/
goto simple;
}
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
/* Record the ownership. 成功获取锁后,记录pid */
mutex->__data.__owner = id;
#ifndef NO_INCR
++mutex->__data.__nusers;
#endif
LIBC_PROBE (mutex_acquired, 1, mutex);
return 0;
}
LLL_MUTEX_LOCK
/* 实现mutex加锁、等待加锁
* 判断是否使用共享锁(实现进程间互斥),标志位为锁类型参数__kind的第一字节最高位
* 调用___lll_lock加锁
*/
# define LLL_MUTEX_LOCK(mutex) \
lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex))
//__kind的第一字节最高位(第8个标志位)表示是否为共享锁
# define PTHREAD_MUTEX_PSHARED(m) \
(((m)->__data.__kind & 128) ? LLL_SHARED : LLL_PRIVATE)
/* 1. 首先尝试通过atomic_compare_and_exchange_bool_acq对_lock进行更改
* 如果_lock == 0, 则此处即会比较赋值成功,完成加锁
* 否则需要调用___lll_lock_wait陷入内核
*/
#define __lll_lock(futex, private) \
((void) \
({ \
int *__futex = (futex); \
/*更改mutex->_lock的值,如果更改成功,表示mutex->_lock原始值为0,上锁成功
* 3个参数: mem,newval,oldval, 如果oldval == *mem, 则设置*mem = newval,并返回0
* 如果*mem值没有改变则返回非0
*/
if (__glibc_unlikely \
(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \
{ \
/*如果源自比较交换指令不成功,就调用__lll_lock_wait,__lll_lock_wait_privete */
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
//如果是私有锁走本分支
__lll_lock_wait_private (__futex); \
else \
//共享锁
__lll_lock_wait (__futex, private); \
} \
}))
#define lll_lock(futex, private) \
__lll_lock (&(futex), private)
__lll_lock_wait
/*如果futex本来就是2,则直接调用futex_wait系统调用陷入内核等待
* 否则,先将futex设为2,再调用futex_wait陷入内核
*/
void __lll_lock_wait (int *futex, int private)
{
/*如果futex==2,表示锁已经被占用,调用wait函数,futex系统调用*/
if (*futex == 2)
lll_futex_wait (futex, 2, private); /* Wait if *futex == 2. */
/*automic_exchange_acq是没有比较的原子操作, 返回 old *mem,不同硬件平台对应的实现不同
* *futex = 2
* return old *mem
*/
while (atomic_exchange_acq (futex, 2) != 0) //强制给*futex置2,并返回*futex原始值,如果非0则需要调用sys_futex陷入内核
lll_futex_wait (futex, 2, private); /* Wait if *futex == 2. */
}
lll_futex_timed_wait
/* Wait while *FUTEXP == VAL for an lll_futex_wake call on FUTEXP.
* 调用系统futex函数
*/
#define lll_futex_wait(futexp, val, private) \
lll_futex_timed_wait (futexp, val, NULL, private)
#define lll_futex_timed_wait(futexp, val, timeout, private) \
lll_futex_syscall (4, futexp, \
__lll_private_flag (FUTEX_WAIT, private), \
val, timeout)
#define lll_futex_syscall(nargs, futexp, op, ...) \
({ \
INTERNAL_SYSCALL_DECL (__err); \
long int __ret = INTERNAL_SYSCALL (futex, __err, nargs, futexp, op, \
__VA_ARGS__); \
(__glibc_unlikely (INTERNAL_SYSCALL_ERROR_P (__ret, __err)) \
? -INTERNAL_SYSCALL_ERRNO (__ret, __err) : 0); \
})
lll_unlock
/* 释放锁核心函数
* 1. 如果__lock(futex)==2,则需要陷入内核wake其它等待的线程
* 2. 否则直接将__lock(futex)修改为0,返回即可
*/
#define lll_unlock(ptr, flags) \
({ \
int *__iptr = (int *)(ptr); \
if (atomic_exchange_rel (__iptr, 0) == 2) \
lll_wake (__iptr, (flags)); \
(void)0; \
})
semphore信号量
__sem_init
/* 主要是初始化sem_t结构体 */
int __sem_init (sem_t *sem, int pshared, unsigned value)
{
if (pshared != 0)
{
errno = EOPNOTSUPP;
return -1;
}
#ifdef SEM_VALUE_MAX
if (value > SEM_VALUE_MAX)
{
errno = EINVAL;
return -1;
}
#endif
*sem = (sem_t) __SEMAPHORE_INITIALIZER (pshared, value);
return 0;
}
附录
__pthread_mutex_s
struct __pthread_mutex_s
{
/* mutex状态,0表示未占用,1表示占用 */
int __lock __LOCK_ALIGNMENT;
unsigned int __count;/* 可重入锁,记录owner线程持有锁的次数 */
int __owner;/* owner 线程id */
#if !__PTHREAD_MUTEX_NUSERS_AFTER_KIND
unsigned int __nusers;
#endif
/* 锁类型:
* 1. PTHREAD_MUTEX_TIMED_NP: 缺省值,普通互斥锁
* 2. PTHREAD_MUTEX_RECURSIVE_NP: 可重入锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁
* 3. PTHREAD_MUTEX_ERRORCHECK_NP: 检错锁,如果同一个线程重复请求同一个锁,则返回EDEADLK, 否则与普通锁一致
* 4. PTHREAD_MUTEX_ADAPTIVE_NP: 自适应锁,自旋锁和普通锁的混合
*/
int __kind;/*记录mutex类型 */
__PTHREAD_COMPAT_PADDING_MID
#if __PTHREAD_MUTEX_NUSERS_AFTER_KIND
unsigned int __nusers;
#endif
#if !__PTHREAD_MUTEX_USE_UNION
__PTHREAD_SPINS_DATA;
__pthread_list_t __list;
# define __PTHREAD_MUTEX_HAVE_PREV 1
#else
__extension__ union
{
__PTHREAD_SPINS_DATA;
__pthread_slist_t __list;
};
# define __PTHREAD_MUTEX_HAVE_PREV 0
#endif
__PTHREAD_COMPAT_PADDING_END
};
原子操作宏
- atomic_compare_and_exchange_bool_acq(mem, newval, oldval):
-
if(*mem == oldval): *mem = newval; return 0
-
else: return !0
- atomic_compare_and_exchange_val_acq(mem, newval, oldval):
-
if(*mem == oldval): *mem = newval; return(old *mem)
-
else: return(old *mem)
__semaphore
/* User visible part of a semaphore. */
struct __semaphore
{
__pthread_spinlock_t __lock;
struct __pthread *__queue;
int __pshared;
int __value;
void *__data;
};