世至其美

Linux内核 | 内核同步

1. 定义

  1. 临界区和竞争条件

    临界区:访问和操作共享数据的代码段
    竞争条件:多个执行线程在一个临界区同时执行
  2. 死锁:每个线程都在互相等待,但它们永远也不会释放占用的资源。
  3. 自死锁:一个执行线程试图去获取一个自己已经持有的锁,它不得不等待锁释放,但因为它忙于等待这个锁,所以自己永远也不会有机会释放释放锁。

2. 设计规则

以下简单规则避免死锁

  1. 按顺序加锁
  2. 防止发生饥饿
  3. 不要重复请求一个锁
  4. 设计简单-复杂加锁机制,越可能造成死锁

注:锁的争用严重时,加锁太粗会降低可扩展性,而锁争用不明显时,加锁过细会加大系统开销,带来浪费。

3. 内核同步方法

原子操作:保证指令以原子的方式执行,过程不被打断。

原子整数操作引入特殊类型atomic_t,在不同体系结构,都能实现原子操作;同时确保编辑器优化,不针对原子操作。

3.1 原子整数操作

3.1.1 32位原子操作

atomic_t类型:<linux/types.h | 源代码 | v5.4>

typedef struct {
    int counter;
} atomic_t;

标准原子整数操作,所有体系结构都包含的操作,可以在特定的体系结构下找到atomic.h,<asm/atomic.h | 源代码 | v5.4>

原子整数操作描述
ATOMIC_INIT(int i)声明一个atomic_t变量,并将它初始化为i
int atomic_read(atomic_t *v)读取atomic_t变量,转化成int类型,原子操作
void atomic_set(atomic_t *v, int i)设置v值为i,原子操作
void atomic_add(int i, atomic_t *v)v=v+i,原子操作
void atomic_sub(int i, atomic_t *v)v=v-i,原子操作
void atomic_inc(atomic_t *v)v=v+1,原子操作
void atomic_dec(atomic_t *v)v=v-1,原子操作
int atomic_sub_and_test(int i, atomic_t *v)v=v-i,结果等于0,返回真;反之为假,原子操作
int atomic_add_negative(int i, atomic_t *v)v=v+i,结果是负数,返回真;反之为假,原子操作
int atomic_add_return(int i, atomic_t *v)v=v+i,且返回结果,原子操作
int atomic_sub_return(int i, atomic_t *v)v=v-i,且返回结果,原子操作
int atomic_inc_return(atomic_t *v)v=v+1,且返回结果,原子操作
int atomic_dec_return(atomic_t *v)v=v-1,且返回结果,原子操作
int atomic_dec_and_test(atomic_t *v)v=v-1,结果等于0,返回真;反之为假,原子操作
int atomic_inc_and_test(atomic_t *v)v=v+1,结果等于0,返回真;反之为假,原子操作
// example
atomic_t v;  //定义
atomic_t u = ATOMIC_INIT(0); //定义u,并初始化为0

atomic_set(&v, 4);//相当于v = 4,原子操作
atomic_add(4, &v);//相当于v=v+4,原子操作
atomic_inc(&v);//相当于v=v+1,原子操作

atomic_read(&v);//将atomic_t类型转化成

3.1.2 64位原子操作

atomic64_t类型:<linux/types.h | 源代码 | v5.4>

typedef struct {
    s64 counter;
} atomic64_t;
原子整数操作描述
ATOMIC64_INIT(long i)声明一个atomic64_t变量,并将它初始化为i
long atomic64_read(atomic64_t *v)读取atomic64_t变量,转化成long类型,原子操作
void atomic64_set(atomic64_t *v, int i)设置v值为i,原子操作
void atomic64_add(int i, atomic64_t *v)v=v+i,原子操作
void atomic64_sub(int i, atomic64_t *v)v=v-i,原子操作
void atomic64_inc(atomic64_t *v)v=v+1,原子操作
void atomic64_dec(atomic64_t *v)v=v-1,原子操作
int atomic64_sub_and_test(int i, atomic64_t *v)v=v-i,结果等于0,返回真;反之为假,原子操作
int atomic64_add_negative(int i, atomic64_t *v)v=v+i,结果是负数,返回真;反之为假,原子操作
long atomic64_add_return(int i, atomic64_t *v)v=v+i,且返回结果,原子操作
long atomic64_sub_return(int i, atomic64_t *v)v=v-i,且返回结果,原子操作
long atomic64_inc_return(atomic64_t *v)v=v+1,且返回结果,原子操作
long atomic64_dec_return(atomic64_t *v)v=v-1,且返回结果,原子操作
int atomic64_dec_and_test(atomic64_t *v)v=v-1,结果等于0,返回真;反之为假,原子操作
int atomic64_inc_and_test(atomic64_t *v)v=v+1,结果等于0,返回真;反之为假,原子操作

3.1.3 原子位操作

原子位操作列表<linux/bitops.h | 源代码 | v5.4>

原子位操作描述
void set_bit(int nr, void *addr)设置addr所指对象的第nr位,原子操作
void clear_bit(int nr, void *addr)清空addr所指对象的第nr位,原子操作
void change_bit(int nr, void *addr)翻转addr所指对象的第nr位,原子操作
void test_and_set_bit(int nr, void *addr)设置addr所指对象的第nr位,并返回原先的值,原子操作
void test_and_clear_bit(int nr, void *addr)清除addr所指对象的第nr位,并返回原先的值,原子操作
void test_and_change_bit(int nr, void *addr)翻转addr所指对象的第nr位,并返回原先的值,原子操作
void test_bit(int nr, void *addr)返回addr所指对象的第nr位,原子操作

非原子位操作也有一组位操作函数,在函数前面加两个下划线,如void __set_bit(int nr, void *addr)。

// 从指定地址开始搜索,第一个被被置位/没被置位的位
// 参数:指定地址,要搜索的位数
// 返回值:第一个被置位/没被置位的位号
int find_first_bit(unsigned long *addr, unsigned int size)
int find_first_zero_bit(unsigned long *addr, unsigned int size)

3.2 自旋锁(spin lock)

3.2.1 定义

多个线程访问临界区,可利用自旋锁进行保护临界区,自旋锁最多只能被一个可执行线程持有, 其他线程自旋等待锁释放。

注:linux内核实现的自旋锁是不可递归的。

锁争用处理

持有自旋锁的时间尽可能小于两次上下文切换的耗时。

  1. 其他线程自旋等待(自旋锁):特别耗费处理器时间
  2. 其他线程进入睡眠状态,锁释放后唤醒线程(信号量):两次上下文切换的开销

3.2.2 自旋锁用法

自旋锁接口定义<linux/spin_lock.h | 源代码 | v5.4>

DEFINE_SPINLOCK(sp_lock);
spin_lock(&sp_lock);
/*临界区*/
spin_unlock(&sp_lock);

自旋锁可以使用在中断处理程序,但是在获取锁之前需要禁用本地中断(当前处理器中的中断请求),否则会导致死锁。在多核处理器中,在不同的处理器自旋,并不会影响锁的持有和释放,注意,中断处理程序不能使用信号量。

DEFINE_SPINLOCK(sp_lock);
unsigned long flags;
//保存中断当前状态,禁用本地中断,获取自旋锁
spin_lock_irqsave(&sp_lock, flags);
/* 临界区 */
spin_unlock_irqrestore(&sp_lock, flags);

如果能确定中断在加锁前是激活的,不需要在解锁后恢复到中断以前的状态。一般不建议使用。

DEFINE_SPINLOCK(sp_lock);
//禁用本地中断,获取自旋锁
spin_lock_irq(&sp_lock);
/* 临界区 */
spin_unlock_irq(&sp_lock);

调试自旋锁:配置CONFIG_DEBUG_SPINLOCK,如果想进一步调试锁,还应该打开CONFIG_DEBUG_LOCK_ALLOC选项。

3.2.3 函数接口

函数描述
spin_lock()获取指定自旋锁
spin_lock_irq()禁止本地中断,并获取指定的自旋锁
spin_lock_bh()禁用所有下半部的执行,并获取指定的自旋锁
spin_lock_irqsave()保存本地中断的当前状态,禁止本地中断,并获取指定的自旋锁
spin_unlock()释放指定的自旋锁
spin_unlock_irq()释放指定的自旋锁,并激活本地中断
spin_unlock_bh()释放指定的自旋锁,并激活所有下半部的执行
spin_unlock_irqrestore()释放指定的自旋锁,并让本地中断恢复以前状态
spin_lock_init()动态初始化指定的spinlock_t
spin_trylock()试图获取指定的自旋锁,如果获取到锁,则返回0,防止返回非0
spin_is_locked()如果指定的自旋锁当前正在被获取,则返回非0,反之返回0

3.2.4 下半部和自旋锁

  1. 由于下半部可以抢占进程上下文中的代码,下半部和进程上下文共享数据时,需要使用spin_lock_bh()进行禁止下半部,并获取自旋锁对临界区进行保护。
  2. 由于中断处理程序可以抢占下半部,如果中断处理程序和下半部共享数据时,需要使用spin_lock_irqsave()或者spin_lock_irq()进行禁止本地中断,并获取自旋锁对临界区进行保护。
  3. 由于同类的tasklet不可能同时运行,如果同类的tasklet共享数据,不需要加锁。
  4. 由于不同类的tasklet可以在同个处理器上运行,但同个处理器上tasklet不会互相抢占,如果不同类的tasklet共享数据,则需要获取普通的自旋锁对临界区进行保护。
  5. 由于同类型的软中断可以同时在多个处理器,如果两个软中断不管是否同类型进行共享数据,都需要获取普通的自旋锁对临界区进行保护。

3.3 读 - 写自旋锁

3.3.1 定义

读自旋锁:一个或多个读任务可以并发持有读自旋锁,写锁在自旋等待所有读者释放锁。

写自旋锁:写自旋锁最多只能被一个写任务持有,且不能存在并发的读操作。

3.3.2 读写锁用法

DEFINE_RWLOCK(sp_rwlock);

read_lock(&sp_rwlock);
/* 临界区(只读) */
read_unlock(&sp_rwlock);

write_lock(&sp_rwlock);
/* 临界区(读写) */
write_unlock(&sp_rwlock);

// 这样写,直接死锁
read_lock(&sp_rwlock);
write_lock(&sp_rwlock);

3.3.3 函数接口

函数描述
read_lock()获取指定的读自旋锁
read_lock_irq()禁止本地中断,并获取指定的读自旋锁
read_lock_irqsave()保存本地中断的当前状态,禁止本地中断,并获取指定的读自旋锁
read_unlock()释放指定的读自旋锁
read_unlock_irq()释放指定的读自旋锁,并激活本地中断
read_unlock_irqrestore()释放指定的读自旋锁,并让本地中断恢复以前状态
write_lock()获取指定的写自旋锁
write_lock_irq()禁止本地中断,并获取指定的写自旋锁
write_lock_irqsave()保存本地中断的当前状态,禁止本地中断,并获取指定的写自旋锁
write_unlock()释放指定的写自旋锁
write_unlock_irq()释放指定的写自旋锁,并激活本地中断
write_unlock_irqrestore()释放指定的写自旋锁,并让本地中断恢复以前状态
write_trylock()试图获取指定的写自旋锁,如果获取到锁,则返回0,防止返回非0
rwlock_init()动态初始化指定的rwlock_t

3.3 信号量

3.3.1 定义

利用信号量进行保护临界区,当试图获取的信号量被占用,其他线程将进入等待队列,且进入睡眠状态。等待信号量释放后, 从等待队列中唤醒其他线程,并重新获取信号量。相对于自旋锁,提高CPU使用率,但是同时引入两次上下文切换和维护等待队列的开销。

互斥信号量:临界区只允许一个线程持有信号量。

计数信号量 :临界区允许至多有count个线程持有信号量,当count=1,就是互斥信号量。

3.3.2 创建和初始化信号量

// 计数信号量的定义和初始化
// count:临界区允许访问的最多线程数量
struct semphore name;
sema_init(&name,count);

// 互斥信号量的定义和初始化
static DECLARE_MUTEX(name);

//初始化一个动态创建的计数信号量
sema_init(sem,count);
//初始化一个动态创建的互斥信号量
init_MUTEX(sem);

3.3.3 使用信号量

count:信号量计数,临界区允许访问的最多线程数量

P()和down():通过信号量计数减1来请求获取一个信号量,如果结果大于等于0,则获取信号量锁,进入临界区。反之,线程进入等待队列,进入睡眠状态。

V()和up():通过信号量计数加1来释放一个信号量,唤醒等待队列中的线程。

3.3.4 函数接口

函数描述
sema_init(struct semaphore *, int)通过指定的信号量计数初始化信号量
init_MUTEX(struct semaphore *)通过信号量计数为1,初始化信号量
init_MUTEX_LOCKED(struct semaphore *)通过信号量计数为0,初始化信号量(初始为加锁状态)
down_interruptible(struct semaphore *)尝试获取信号量,如果信号量被占用,则进入可中断睡眠状态
down(struct semaphore *)尝试获取信号量,如果信号量被占用,则进入不可中断睡眠状态
down_trylock(struct semaphore *)尝试获取信号量,如果信号量被占用,则返回非0值
up(struct semaphore *)释放信号量,如果等待队列不为空,则唤醒等待队列中的线程

3.4 读 - 写信号量

3.4.1 定义

读信号量:一个或多个读任务可以并发持有读信号量,写锁在等待队列睡眠,不可被打断,等待所有读者释放锁。

写信号量:写信号量最多只能被一个写任务持有,且不能存在并发的读操作。

注:所有的读写信号量都是互斥信号量,信号量计数为1,只对写者互斥,不对读者。所有的读-写锁睡眠都不可中断。

定义在文件<linux/rwsem.h | 源代码 | v5.4>中,读 - 写信号量在内核中是由rw_semaphore结构表示的。

struct rw_semaphore {
    atomic_long_t count;
    /*
     * Write owner or one of the read owners as well flags regarding
     * the current state of the rwsem. Can be used as a speculative
     * check to see if the write owner is running on the cpu.
     */
    atomic_long_t owner;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
    struct optimistic_spin_queue osq; /* spinner MCS lock */
#endif
    raw_spinlock_t wait_lock;
    struct list_head wait_list;
#ifdef CONFIG_DEBUG_RWSEMS
    void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map    dep_map;
#endif
};

3.4.2 读 - 写信号量用法

// 创建静态声明的读-写信号量
static DECLARE_RWSEM(name);

// 读信号量
down_read(&name);
// 临界区(只读)
up_read(&name);

// 写信号量
down_write(&name);
// 临界区(读和写)
up_write(&name)

3.4.3 函数接口

函数描述
static DECLARE_RWSEM(struct rw_semaphore);创建静态声明的读-写信号量
init_rwsem(struct rw_semaphore *)初始化读-写信号量
down_write(struct rw_semaphore *);尝试获取写信号量,如果信号量被占用,则进入不可中断睡眠状态
down_write_trylock(struct rw_semaphore *);尝试获取写信号量,如果成功获取锁,则返回非0,反之返回0。
down_read(struct rw_semaphore *);尝试获取读信号量,如果信号量被写任务占用,则进入不可中断睡眠状态。
down_read_trylock(struct rw_semaphore *);尝试获取读信号量,如果成功获取锁,则返回非0,反之返回0。
up_write(struct rw_semaphore *);释放写信号量,如果等待队列不为空,则唤醒等待队列中的线。
up_read(struct rw_semaphore *);释放读信号量,如果等待队列不为空,则唤醒等待队列中的线程。
downgrade_write(struct rw_semaphore *)动态地将写锁转化成读锁

3.5 互斥体

3.5.1 定义

一种实现互斥的特定睡眠锁,有以下特点:

  1. 任何时刻中只有一个任务可以持有mutex
  2. 需要在同一个上下文加解锁
  3. 不能递归加解锁
  4. 持有mutex的进程不能退出
  5. 不能在中断或者下半部使用mutex
  6. 只能通过官方API管理,不可被拷贝、手动初始化或者重复初始化。

3.5.2 函数接口

函数描述
DEFINE_MUTEX(struct mutex);静态地定义mutex
mutex_init(struct mutex *);动态初始化mutex
mutex_lock(struct mutex *);为指定的mutex上锁,如果锁不可用则睡眠
mutex_unlock(struct mutex *);为指定的mutex解锁
mutex_trylock(struct mutex *);尝试获取指定的mutex,如果成功则返回1,反之返回0。
mutex_is_lock(struct mutex *);如果锁被占用,则返回1,反之返回0。

打开内核配置CONFIG_DEBUG_MUTEXES,可以进行调试mutex。

3.6 完成变量

3.6.1 定义

内核中一个任务完成后,使用完成变量去唤醒正在等待的任务,完成变量(completion variable)定义在<linux/completion.h>中。

3.6.2 函数接口

函数描述
DECLARE_COMPLETION(mr_comp);静态地定义并初始化完成变量
init_completion(struct completion *);初始化指定的动态创建的完成变量
wait_for_completion(struct completion *);等待接收指定的完成变量信号
complete(struct completion *);发信号唤醒任何等待任务

3.7 BLK:大内核锁

3.7.1 定义

BLK是一个全局自旋锁。

  1. 任务持有BLK锁可以睡眠,当任务无法被调度时,锁自动丢弃。当任务被调度,锁又重新获得。睡眠不会导致死锁。
  2. BLK是递归锁,一个进程可以多次请求获取一个锁,不会导致死锁。自旋锁递归获取会导致死锁。
  3. BLK只能用在进程上下文,不能用于中断上下文。自旋锁可以用于进程上下文和中断上下文。

注意:目前不再推荐使用BLK,只是一直保留着接口。

3.7.2 函数接口

函数描述
lock_kernel();获得BKL
unlock_kernel();释放BKL
kernel_locked();如果锁被持有,返回非0值,否则返回0(UP方式总是返回非0)

3.8 顺序锁

3.8.1 定义

顺序锁(seq锁)对临界区进行读取数据不加锁,对写进程加锁,但是需要保证读取的过程中因为写进程修改了临界区的数据,导致读进程读取数据错误。

typedef struct {
    struct seqcount seqcount;
    spinlock_t lock;
} seqlock_t;

typedef struct seqcount {
    unsigned sequence;
} seqcount_t;

顺序锁是优先写进程,适用于读者多,写者少,写优先读的场景。

读-写信号量是优先读进程,因为写进程进入时需要等待所有读进程退出临界区。

3.8.2 顺序锁用法

// 静态地定义并初始化seq锁
seqlock_t seq_lock =  DEFINE_SEQLOCK(seq_lock);

// 获取写锁
write_seqlock(&seq_lock);
// 写操作
write_sequnlock(&seq_lock);

// 获取读锁
unsigned int seq;
do{
    seq = read_seqbegin(&seq_lock);
    // 读操作
}while(read_seqretry(&seq_lock,seq));

3.8.3 函数接口

函数描述
DEFINE_SEQLOCK(const seqlock_t);静态地定义并初始化seq锁
write_seqlock(const seqlock_t *);获取写锁
write_sequnlock(const seqlock_t *);释放写锁
read_seqbegin(const seqlock_t *);读取获取读锁之前的值,返回sequence的值。
read_seqretry(const seqlock_t *,unsigned );将seq锁中sequence的前后值进行对比。如果前后值不相等,则返回值为1,读者需要重新进行读操作;反之,返回值为0,则读者成功完成了读操作。

3.9 禁止内核抢占

preempt_enable()和preempt_disable(),成对出现,可以嵌套调用。

禁止中断和禁止抢占区别

禁止中断,确保某个中断处理程序不会抢占当前的代码。如local_irq_disable()和 local_irq_enable()函数。

禁止内核抢占,防止内核中当前进程不会突然被另一个进程抢占。如preempt_disable()和preempt_enable()函数。

3.9.1 函数接口

函数描述
preempt_disable()增加抢占计数值,从而禁止内核抢占
preempt_enable()减少抢占计数值,并当该值降为0时,检查和执行被挂起的需要调度的任务
preempt_enable_no_rasched()激活内核抢占,但不再检查任何被挂起的需调度的任务
preempt_count()返回抢占计数

3.10 屏障

编译器为了提高效率,可能存在将读和写操作进行重新排序。屏障是一组确保代码的载入存储顺序设计的指令,指示编译器不要对周围的指令序列进行重排序。

函数描述
rmb()阻止跨越屏障的载入动作发生重排序
read_barrier_depends()阻止跨越屏障的具有数据依赖关系的载入动作发生重排序
wmb()阻止跨越屏障的存储动作发生重排序
mb()阻止跨越屏障的载入和存储动作发生重排序
smp_rmb()在SMP上提供rmb()功能,在UP()上提供barrier()功能
smp_read_barrier_depends()在SMP上提供read_barrier_depends()功能,在UP()上提供barrier()功能
smp_wmb()在SMP上提供wmb()功能,在UP()上提供barrier()功能
smp_mb()在SMP上提供mb()功能,在UP()上提供barrier()功能
barrier()阻止编译器跨屏障对载入或存储操作进行优化

当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »