linux锁实现

前沿

这两天有个面试问题问到linux常用的几种锁的实现,区别,以及如果用互斥锁api实现读写锁;没答上,现在补补功课

mutex, semaphore, spinlock比较

  • mutex: sleep-waiting型,底层通过futex实现互斥,进程、线程互斥都支持,但是用在进程间互斥时,需要手动创建进程间共享内存

    1. pthread_mutex_t mut; pthread_mutex_init(&mut, NULL); pthread_mutex_lock(&mut); pthread_mutex_unlock(&mut)
  • semaphore: 底层通过 mach msg进程间通信系统来完成等待锁、唤醒等待进程

    1. sem_t sem; sem_init(&sem, 0, 1); sem_wait(&sem); sem_post(&sem);

    2. 关于mach msg系统网上资料较少,未看清实现方式

  • spinlock: busy-waiting型,实现cpu间总线互斥,加的是总线锁(有限次while循环不sleep加锁);一般用在内核态

futex

  • futex是一种用户态和内核台混合的同步机制,同步的进程间通过共享一段内存,futex变量就位于这段共享的内存中且操作是原子的;

  • futex变量只有3种可能

    1. 0: 没人获得锁

    2. 1: 当只有一个线程调用pthread_mutex_lock;如果此后没有其它线程争用该mutex,则一直为1,直到unlock更新为0

    3. 2: 值为1时,有其它线程调用pthread_mutex_lock(),则被设置为2

  • 当值为1时,unlock时只需要原子的将值改为0,不需要调用sys_futex系统调用;否则需要调用sys_futex,唤醒等待该锁上的线程

代码分析

__pthread_mutex_lock

int __pthread_mutex_lock (pthread_mutex_t *mutex)
{
  unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);

  LIBC_PROBE (mutex_entry, 1, mutex);

  if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP
				 | PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))
    return __pthread_mutex_lock_full (mutex);

  if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
    {
    // 如果为普通锁
      FORCE_ELISION (mutex, goto elision);
    simple:
      /* Normal mutex. 调用函数加锁  */
      LLL_MUTEX_LOCK (mutex);
      /*确保是本线程获取到了锁*/
      assert (mutex->__data.__owner == 0);
    }
#ifdef HAVE_ELISION
  else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP))
    {
  elision: __attribute__((unused))
      /* This case can never happen on a system without elision,
         as the mutex type initialization functions will not
	 allow to set the elision flags.  */
      /* Don't record owner or users for elision case.  This is a
         tail call.  */
      return LLL_MUTEX_LOCK_ELISION (mutex);
    }
#endif
  else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
			     == PTHREAD_MUTEX_RECURSIVE_NP, 1))
    {
      /* Recursive mutex. 如果是可重入锁,获取线程pid */
      pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

      /* 如果是本线程目前持有锁  */
      if (mutex->__data.__owner == id)
	{
	  /* Just bump the counter. 如果count计数器翻转,则加锁失败,返回错误码 */
	  if (__glibc_unlikely (mutex->__data.__count + 1 == 0))
	    /* Overflow of the counter.  */
	    return EAGAIN;
      /*计数器++*/
	  ++mutex->__data.__count;
      /*可重入锁如果是本线程持有锁,只需要计数器++,不需要真正再去获取锁*/
	  return 0;
	}

      /* We have to get the mutex. 非本线程持有锁,则走获取锁流程 */
      LLL_MUTEX_LOCK (mutex);
      /* 如果__owner!=0,表示已经有别的线程先抢占到了锁? */
      assert (mutex->__data.__owner == 0);
      mutex->__data.__count = 1;
    }
  else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
			  == PTHREAD_MUTEX_ADAPTIVE_NP, 1))
    {
    /*自适应锁, 是普通锁和自旋锁的结合体*/
    /*通过is_smp_system()函数判断是否为SMP系统*/
      if (! __is_smp)
	goto simple;
    /*如果为smp系统,首先尝试trylock*/
      if (LLL_MUTEX_TRYLOCK (mutex) != 0)
	{
      /*如果trylock不成功*/
	  int cnt = 0;
      /*计算最大的自旋次数*/
	  int max_cnt = MIN (MAX_ADAPTIVE_COUNT,
			     mutex->__data.__spins * 2 + 10);
	  do
	    {
	      if (cnt++ >= max_cnt)
		{
          /*如果自旋次数大于最大次数, 调用加锁函数*/
		  LLL_MUTEX_LOCK (mutex);
		  break;
		}
        /*如果没达到自旋次数则不停的循环trylock,sleep*/
	      atomic_spin_nop ();
	    }
	  while (LLL_MUTEX_TRYLOCK (mutex) != 0);
      /*记录已经自旋次数*/
	  mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;
	}
      assert (mutex->__data.__owner == 0);
    }
  else
    {
    /*检错锁*/
      pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
      /*确保只有检错锁才会走到本分支*/
      assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP);
      /* Check whether we already hold the mutex. 如果本线程已经获取锁,则返回EDEADLK */
      if (__glibc_unlikely (mutex->__data.__owner == id))
	return EDEADLK;
    /*否则,走正常获取锁逻辑*/
      goto simple;
    }

  pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

  /* Record the ownership. 成功获取锁后,记录pid */
  mutex->__data.__owner = id;
#ifndef NO_INCR
  ++mutex->__data.__nusers;
#endif

  LIBC_PROBE (mutex_acquired, 1, mutex);

  return 0;
}

LLL_MUTEX_LOCK

/* 实现mutex加锁、等待加锁
 * 判断是否使用共享锁(实现进程间互斥),标志位为锁类型参数__kind的第一字节最高位
 * 调用___lll_lock加锁
 */
# define LLL_MUTEX_LOCK(mutex) \
  lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex))
  
//__kind的第一字节最高位(第8个标志位)表示是否为共享锁
# define PTHREAD_MUTEX_PSHARED(m) \
  (((m)->__data.__kind & 128) ? LLL_SHARED : LLL_PRIVATE)

/* 1. 首先尝试通过atomic_compare_and_exchange_bool_acq对_lock进行更改
 *    如果_lock == 0, 则此处即会比较赋值成功,完成加锁
 * 否则需要调用___lll_lock_wait陷入内核
 */
#define __lll_lock(futex, private)                                      \
  ((void)                                                               \
   ({                                                                   \
     int *__futex = (futex);                                            \
     /*更改mutex->_lock的值,如果更改成功,表示mutex->_lock原始值为0,上锁成功 
      * 3个参数: mem,newval,oldval, 如果oldval == *mem, 则设置*mem = newval,并返回0
      * 如果*mem值没有改变则返回非0
      */
     if (__glibc_unlikely                                               \
         (atomic_compare_and_exchange_bool_acq (__futex, 1, 0)))        \
       {                                                                \
       /*如果源自比较交换指令不成功,就调用__lll_lock_wait,__lll_lock_wait_privete */
         if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
         //如果是私有锁走本分支
           __lll_lock_wait_private (__futex);                           \
         else                                                           \
         //共享锁
           __lll_lock_wait (__futex, private);                          \
       }                                                                \
   }))
#define lll_lock(futex, private)	\
  __lll_lock (&(futex), private)

__lll_lock_wait

/*如果futex本来就是2,则直接调用futex_wait系统调用陷入内核等待
 * 否则,先将futex设为2,再调用futex_wait陷入内核
*/
void __lll_lock_wait (int *futex, int private)
{
/*如果futex==2,表示锁已经被占用,调用wait函数,futex系统调用*/
  if (*futex == 2)
    lll_futex_wait (futex, 2, private); /* Wait if *futex == 2.  */

/*automic_exchange_acq是没有比较的原子操作, 返回 old *mem,不同硬件平台对应的实现不同
 * *futex = 2
 * return old *mem
 */
  while (atomic_exchange_acq (futex, 2) != 0) //强制给*futex置2,并返回*futex原始值,如果非0则需要调用sys_futex陷入内核
    lll_futex_wait (futex, 2, private); /* Wait if *futex == 2.  */
}

lll_futex_timed_wait

/* Wait while *FUTEXP == VAL for an lll_futex_wake call on FUTEXP.  
 * 调用系统futex函数
 */
#define lll_futex_wait(futexp, val, private) \
  lll_futex_timed_wait (futexp, val, NULL, private)

#define lll_futex_timed_wait(futexp, val, timeout, private)     \
  lll_futex_syscall (4, futexp,                                 \
		     __lll_private_flag (FUTEX_WAIT, private),  \
		     val, timeout)
             
#define lll_futex_syscall(nargs, futexp, op, ...)                       \
  ({                                                                    \
    INTERNAL_SYSCALL_DECL (__err);                                      \
    long int __ret = INTERNAL_SYSCALL (futex, __err, nargs, futexp, op, \
				       __VA_ARGS__);                    \
    (__glibc_unlikely (INTERNAL_SYSCALL_ERROR_P (__ret, __err))         \
     ? -INTERNAL_SYSCALL_ERRNO (__ret, __err) : 0);                     \
  })

lll_unlock

/* 释放锁核心函数 
 * 1. 如果__lock(futex)==2,则需要陷入内核wake其它等待的线程
 * 2. 否则直接将__lock(futex)修改为0,返回即可
 */
#define lll_unlock(ptr, flags)   \
  ({   \
     int *__iptr = (int *)(ptr);   \
     
     if (atomic_exchange_rel (__iptr, 0) == 2)   \
       lll_wake (__iptr, (flags));   \
     (void)0;   \
   })

semphore信号量

__sem_init

/* 主要是初始化sem_t结构体 */
int __sem_init (sem_t *sem, int pshared, unsigned value)
{
  if (pshared != 0)
    {
      errno = EOPNOTSUPP;
      return -1;
    }

#ifdef SEM_VALUE_MAX
  if (value > SEM_VALUE_MAX)
    {
      errno = EINVAL;
      return -1;
    }
#endif

  *sem = (sem_t) __SEMAPHORE_INITIALIZER (pshared, value);
  return 0;
}

附录

__pthread_mutex_s

struct __pthread_mutex_s
{
  /* mutex状态,0表示未占用,1表示占用 */
  int __lock __LOCK_ALIGNMENT;
  unsigned int __count;/* 可重入锁,记录owner线程持有锁的次数 */
  int __owner;/* owner 线程id */
#if !__PTHREAD_MUTEX_NUSERS_AFTER_KIND
  unsigned int __nusers;
#endif
  /* 锁类型:
   * 1. PTHREAD_MUTEX_TIMED_NP: 缺省值,普通互斥锁
   * 2. PTHREAD_MUTEX_RECURSIVE_NP: 可重入锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁
   * 3. PTHREAD_MUTEX_ERRORCHECK_NP: 检错锁,如果同一个线程重复请求同一个锁,则返回EDEADLK, 否则与普通锁一致 
   * 4. PTHREAD_MUTEX_ADAPTIVE_NP: 自适应锁,自旋锁和普通锁的混合
   */
  int __kind;/*记录mutex类型 */
  __PTHREAD_COMPAT_PADDING_MID
#if __PTHREAD_MUTEX_NUSERS_AFTER_KIND
  unsigned int __nusers;
#endif
#if !__PTHREAD_MUTEX_USE_UNION
  __PTHREAD_SPINS_DATA;
  __pthread_list_t __list;
# define __PTHREAD_MUTEX_HAVE_PREV      1
#else
  __extension__ union
  {
    __PTHREAD_SPINS_DATA;
    __pthread_slist_t __list;
  };
# define __PTHREAD_MUTEX_HAVE_PREV      0
#endif
  __PTHREAD_COMPAT_PADDING_END
};

原子操作宏

  1. atomic_compare_and_exchange_bool_acq(mem, newval, oldval):
  • if(*mem == oldval): *mem = newval; return 0

  • else: return !0

  1. atomic_compare_and_exchange_val_acq(mem, newval, oldval):
  • if(*mem == oldval): *mem = newval; return(old *mem)

  • else: return(old *mem)

__semaphore

/* User visible part of a semaphore.  */
struct __semaphore
{
  __pthread_spinlock_t __lock;
  struct __pthread *__queue;
  int __pshared;
  int __value;
  void *__data;
};

参考资料

___lll_lock

锁介绍