Linux内核 | 内核同步
1. 定义
临界区和竞争条件
临界区:访问和操作共享数据的代码段 竞争条件:多个执行线程在一个临界区同时执行
- 死锁:每个线程都在互相等待,但它们永远也不会释放占用的资源。
- 自死锁:一个执行线程试图去获取一个自己已经持有的锁,它不得不等待锁释放,但因为它忙于等待这个锁,所以自己永远也不会有机会释放释放锁。
2. 设计规则
以下简单规则避免死锁
- 按顺序加锁
- 防止发生饥饿
- 不要重复请求一个锁
- 设计简单-复杂加锁机制,越可能造成死锁
注:锁的争用严重时,加锁太粗会降低可扩展性,而锁争用不明显时,加锁过细会加大系统开销,带来浪费。
3. 内核同步方法
原子操作:保证指令以原子的方式执行,过程不被打断。
原子整数操作引入特殊类型atomic_t,在不同体系结构,都能实现原子操作;同时确保编辑器优化,不针对原子操作。
3.1 原子整数操作
3.1.1 32位原子操作
atomic_t类型:<linux/types.h | 源代码 | v5.4>
typedef struct {
int counter;
} atomic_t;
标准原子整数操作,所有体系结构都包含的操作,可以在特定的体系结构下找到atomic.h,<asm/atomic.h | 源代码 | v5.4>
原子整数操作 | 描述 |
---|---|
ATOMIC_INIT(int i) | 声明一个atomic_t变量,并将它初始化为i |
int atomic_read(atomic_t *v) | 读取atomic_t变量,转化成int类型,原子操作 |
void atomic_set(atomic_t *v, int i) | 设置v值为i,原子操作 |
void atomic_add(int i, atomic_t *v) | v=v+i,原子操作 |
void atomic_sub(int i, atomic_t *v) | v=v-i,原子操作 |
void atomic_inc(atomic_t *v) | v=v+1,原子操作 |
void atomic_dec(atomic_t *v) | v=v-1,原子操作 |
int atomic_sub_and_test(int i, atomic_t *v) | v=v-i,结果等于0,返回真;反之为假,原子操作 |
int atomic_add_negative(int i, atomic_t *v) | v=v+i,结果是负数,返回真;反之为假,原子操作 |
int atomic_add_return(int i, atomic_t *v) | v=v+i,且返回结果,原子操作 |
int atomic_sub_return(int i, atomic_t *v) | v=v-i,且返回结果,原子操作 |
int atomic_inc_return(atomic_t *v) | v=v+1,且返回结果,原子操作 |
int atomic_dec_return(atomic_t *v) | v=v-1,且返回结果,原子操作 |
int atomic_dec_and_test(atomic_t *v) | v=v-1,结果等于0,返回真;反之为假,原子操作 |
int atomic_inc_and_test(atomic_t *v) | v=v+1,结果等于0,返回真;反之为假,原子操作 |
// example
atomic_t v; //定义
atomic_t u = ATOMIC_INIT(0); //定义u,并初始化为0
atomic_set(&v, 4);//相当于v = 4,原子操作
atomic_add(4, &v);//相当于v=v+4,原子操作
atomic_inc(&v);//相当于v=v+1,原子操作
atomic_read(&v);//将atomic_t类型转化成
3.1.2 64位原子操作
atomic64_t类型:<linux/types.h | 源代码 | v5.4>
typedef struct {
s64 counter;
} atomic64_t;
原子整数操作 | 描述 |
---|---|
ATOMIC64_INIT(long i) | 声明一个atomic64_t变量,并将它初始化为i |
long atomic64_read(atomic64_t *v) | 读取atomic64_t变量,转化成long类型,原子操作 |
void atomic64_set(atomic64_t *v, int i) | 设置v值为i,原子操作 |
void atomic64_add(int i, atomic64_t *v) | v=v+i,原子操作 |
void atomic64_sub(int i, atomic64_t *v) | v=v-i,原子操作 |
void atomic64_inc(atomic64_t *v) | v=v+1,原子操作 |
void atomic64_dec(atomic64_t *v) | v=v-1,原子操作 |
int atomic64_sub_and_test(int i, atomic64_t *v) | v=v-i,结果等于0,返回真;反之为假,原子操作 |
int atomic64_add_negative(int i, atomic64_t *v) | v=v+i,结果是负数,返回真;反之为假,原子操作 |
long atomic64_add_return(int i, atomic64_t *v) | v=v+i,且返回结果,原子操作 |
long atomic64_sub_return(int i, atomic64_t *v) | v=v-i,且返回结果,原子操作 |
long atomic64_inc_return(atomic64_t *v) | v=v+1,且返回结果,原子操作 |
long atomic64_dec_return(atomic64_t *v) | v=v-1,且返回结果,原子操作 |
int atomic64_dec_and_test(atomic64_t *v) | v=v-1,结果等于0,返回真;反之为假,原子操作 |
int atomic64_inc_and_test(atomic64_t *v) | v=v+1,结果等于0,返回真;反之为假,原子操作 |
3.1.3 原子位操作
原子位操作列表<linux/bitops.h | 源代码 | v5.4>
原子位操作 | 描述 |
---|---|
void set_bit(int nr, void *addr) | 设置addr所指对象的第nr位,原子操作 |
void clear_bit(int nr, void *addr) | 清空addr所指对象的第nr位,原子操作 |
void change_bit(int nr, void *addr) | 翻转addr所指对象的第nr位,原子操作 |
void test_and_set_bit(int nr, void *addr) | 设置addr所指对象的第nr位,并返回原先的值,原子操作 |
void test_and_clear_bit(int nr, void *addr) | 清除addr所指对象的第nr位,并返回原先的值,原子操作 |
void test_and_change_bit(int nr, void *addr) | 翻转addr所指对象的第nr位,并返回原先的值,原子操作 |
void test_bit(int nr, void *addr) | 返回addr所指对象的第nr位,原子操作 |
非原子位操作也有一组位操作函数,在函数前面加两个下划线,如void __set_bit(int nr, void *addr)。
// 从指定地址开始搜索,第一个被被置位/没被置位的位
// 参数:指定地址,要搜索的位数
// 返回值:第一个被置位/没被置位的位号
int find_first_bit(unsigned long *addr, unsigned int size)
int find_first_zero_bit(unsigned long *addr, unsigned int size)
3.2 自旋锁(spin lock)
3.2.1 定义
多个线程访问临界区,可利用自旋锁进行保护临界区,自旋锁最多只能被一个可执行线程持有, 其他线程自旋等待锁释放。
注:linux内核实现的自旋锁是不可递归的。
锁争用处理
持有自旋锁的时间尽可能小于两次上下文切换的耗时。
- 其他线程自旋等待(自旋锁):特别耗费处理器时间
- 其他线程进入睡眠状态,锁释放后唤醒线程(信号量):两次上下文切换的开销
3.2.2 自旋锁用法
自旋锁接口定义<linux/spin_lock.h | 源代码 | v5.4>
DEFINE_SPINLOCK(sp_lock);
spin_lock(&sp_lock);
/*临界区*/
spin_unlock(&sp_lock);
自旋锁可以使用在中断处理程序,但是在获取锁之前需要禁用本地中断(当前处理器中的中断请求),否则会导致死锁。在多核处理器中,在不同的处理器自旋,并不会影响锁的持有和释放,注意,中断处理程序不能使用信号量。
DEFINE_SPINLOCK(sp_lock);
unsigned long flags;
//保存中断当前状态,禁用本地中断,获取自旋锁
spin_lock_irqsave(&sp_lock, flags);
/* 临界区 */
spin_unlock_irqrestore(&sp_lock, flags);
如果能确定中断在加锁前是激活的,不需要在解锁后恢复到中断以前的状态。一般不建议使用。
DEFINE_SPINLOCK(sp_lock);
//禁用本地中断,获取自旋锁
spin_lock_irq(&sp_lock);
/* 临界区 */
spin_unlock_irq(&sp_lock);
调试自旋锁:配置CONFIG_DEBUG_SPINLOCK,如果想进一步调试锁,还应该打开CONFIG_DEBUG_LOCK_ALLOC选项。
3.2.3 函数接口
函数 | 描述 |
---|---|
spin_lock() | 获取指定自旋锁 |
spin_lock_irq() | 禁止本地中断,并获取指定的自旋锁 |
spin_lock_bh() | 禁用所有下半部的执行,并获取指定的自旋锁 |
spin_lock_irqsave() | 保存本地中断的当前状态,禁止本地中断,并获取指定的自旋锁 |
spin_unlock() | 释放指定的自旋锁 |
spin_unlock_irq() | 释放指定的自旋锁,并激活本地中断 |
spin_unlock_bh() | 释放指定的自旋锁,并激活所有下半部的执行 |
spin_unlock_irqrestore() | 释放指定的自旋锁,并让本地中断恢复以前状态 |
spin_lock_init() | 动态初始化指定的spinlock_t |
spin_trylock() | 试图获取指定的自旋锁,如果获取到锁,则返回0,防止返回非0 |
spin_is_locked() | 如果指定的自旋锁当前正在被获取,则返回非0,反之返回0 |
3.2.4 下半部和自旋锁
- 由于下半部可以抢占进程上下文中的代码,下半部和进程上下文共享数据时,需要使用spin_lock_bh()进行禁止下半部,并获取自旋锁对临界区进行保护。
- 由于中断处理程序可以抢占下半部,如果中断处理程序和下半部共享数据时,需要使用spin_lock_irqsave()或者spin_lock_irq()进行禁止本地中断,并获取自旋锁对临界区进行保护。
- 由于同类的tasklet不可能同时运行,如果同类的tasklet共享数据,不需要加锁。
- 由于不同类的tasklet可以在同个处理器上运行,但同个处理器上tasklet不会互相抢占,如果不同类的tasklet共享数据,则需要获取普通的自旋锁对临界区进行保护。
- 由于同类型的软中断可以同时在多个处理器,如果两个软中断不管是否同类型进行共享数据,都需要获取普通的自旋锁对临界区进行保护。
3.3 读 - 写自旋锁
3.3.1 定义
读自旋锁:一个或多个读任务可以并发持有读自旋锁,写锁在自旋等待所有读者释放锁。
写自旋锁:写自旋锁最多只能被一个写任务持有,且不能存在并发的读操作。
3.3.2 读写锁用法
DEFINE_RWLOCK(sp_rwlock);
read_lock(&sp_rwlock);
/* 临界区(只读) */
read_unlock(&sp_rwlock);
write_lock(&sp_rwlock);
/* 临界区(读写) */
write_unlock(&sp_rwlock);
// 这样写,直接死锁
read_lock(&sp_rwlock);
write_lock(&sp_rwlock);
3.3.3 函数接口
函数 | 描述 |
---|---|
read_lock() | 获取指定的读自旋锁 |
read_lock_irq() | 禁止本地中断,并获取指定的读自旋锁 |
read_lock_irqsave() | 保存本地中断的当前状态,禁止本地中断,并获取指定的读自旋锁 |
read_unlock() | 释放指定的读自旋锁 |
read_unlock_irq() | 释放指定的读自旋锁,并激活本地中断 |
read_unlock_irqrestore() | 释放指定的读自旋锁,并让本地中断恢复以前状态 |
write_lock() | 获取指定的写自旋锁 |
write_lock_irq() | 禁止本地中断,并获取指定的写自旋锁 |
write_lock_irqsave() | 保存本地中断的当前状态,禁止本地中断,并获取指定的写自旋锁 |
write_unlock() | 释放指定的写自旋锁 |
write_unlock_irq() | 释放指定的写自旋锁,并激活本地中断 |
write_unlock_irqrestore() | 释放指定的写自旋锁,并让本地中断恢复以前状态 |
write_trylock() | 试图获取指定的写自旋锁,如果获取到锁,则返回0,防止返回非0 |
rwlock_init() | 动态初始化指定的rwlock_t |
3.3 信号量
3.3.1 定义
利用信号量进行保护临界区,当试图获取的信号量被占用,其他线程将进入等待队列,且进入睡眠状态。等待信号量释放后, 从等待队列中唤醒其他线程,并重新获取信号量。相对于自旋锁,提高CPU使用率,但是同时引入两次上下文切换和维护等待队列的开销。
互斥信号量:临界区只允许一个线程持有信号量。
计数信号量 :临界区允许至多有count个线程持有信号量,当count=1,就是互斥信号量。
3.3.2 创建和初始化信号量
// 计数信号量的定义和初始化
// count:临界区允许访问的最多线程数量
struct semphore name;
sema_init(&name,count);
// 互斥信号量的定义和初始化
static DECLARE_MUTEX(name);
//初始化一个动态创建的计数信号量
sema_init(sem,count);
//初始化一个动态创建的互斥信号量
init_MUTEX(sem);
3.3.3 使用信号量
count:信号量计数,临界区允许访问的最多线程数量
P()和down():通过信号量计数减1来请求获取一个信号量,如果结果大于等于0,则获取信号量锁,进入临界区。反之,线程进入等待队列,进入睡眠状态。
V()和up():通过信号量计数加1来释放一个信号量,唤醒等待队列中的线程。
3.3.4 函数接口
函数 | 描述 |
---|---|
sema_init(struct semaphore *, int) | 通过指定的信号量计数初始化信号量 |
init_MUTEX(struct semaphore *) | 通过信号量计数为1,初始化信号量 |
init_MUTEX_LOCKED(struct semaphore *) | 通过信号量计数为0,初始化信号量(初始为加锁状态) |
down_interruptible(struct semaphore *) | 尝试获取信号量,如果信号量被占用,则进入可中断睡眠状态 |
down(struct semaphore *) | 尝试获取信号量,如果信号量被占用,则进入不可中断睡眠状态 |
down_trylock(struct semaphore *) | 尝试获取信号量,如果信号量被占用,则返回非0值 |
up(struct semaphore *) | 释放信号量,如果等待队列不为空,则唤醒等待队列中的线程 |
3.4 读 - 写信号量
3.4.1 定义
读信号量:一个或多个读任务可以并发持有读信号量,写锁在等待队列睡眠,不可被打断,等待所有读者释放锁。
写信号量:写信号量最多只能被一个写任务持有,且不能存在并发的读操作。
注:所有的读写信号量都是互斥信号量,信号量计数为1,只对写者互斥,不对读者。所有的读-写锁睡眠都不可中断。
定义在文件<linux/rwsem.h | 源代码 | v5.4>中,读 - 写信号量在内核中是由rw_semaphore结构表示的。
struct rw_semaphore {
atomic_long_t count;
/*
* Write owner or one of the read owners as well flags regarding
* the current state of the rwsem. Can be used as a speculative
* check to see if the write owner is running on the cpu.
*/
atomic_long_t owner;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* spinner MCS lock */
#endif
raw_spinlock_t wait_lock;
struct list_head wait_list;
#ifdef CONFIG_DEBUG_RWSEMS
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
3.4.2 读 - 写信号量用法
// 创建静态声明的读-写信号量
static DECLARE_RWSEM(name);
// 读信号量
down_read(&name);
// 临界区(只读)
up_read(&name);
// 写信号量
down_write(&name);
// 临界区(读和写)
up_write(&name)
3.4.3 函数接口
函数 | 描述 |
---|---|
static DECLARE_RWSEM(struct rw_semaphore); | 创建静态声明的读-写信号量 |
init_rwsem(struct rw_semaphore *) | 初始化读-写信号量 |
down_write(struct rw_semaphore *); | 尝试获取写信号量,如果信号量被占用,则进入不可中断睡眠状态 |
down_write_trylock(struct rw_semaphore *); | 尝试获取写信号量,如果成功获取锁,则返回非0,反之返回0。 |
down_read(struct rw_semaphore *); | 尝试获取读信号量,如果信号量被写任务占用,则进入不可中断睡眠状态。 |
down_read_trylock(struct rw_semaphore *); | 尝试获取读信号量,如果成功获取锁,则返回非0,反之返回0。 |
up_write(struct rw_semaphore *); | 释放写信号量,如果等待队列不为空,则唤醒等待队列中的线。 |
up_read(struct rw_semaphore *); | 释放读信号量,如果等待队列不为空,则唤醒等待队列中的线程。 |
downgrade_write(struct rw_semaphore *) | 动态地将写锁转化成读锁 |
3.5 互斥体
3.5.1 定义
一种实现互斥的特定睡眠锁,有以下特点:
- 任何时刻中只有一个任务可以持有mutex
- 需要在同一个上下文加解锁
- 不能递归加解锁
- 持有mutex的进程不能退出
- 不能在中断或者下半部使用mutex
- 只能通过官方API管理,不可被拷贝、手动初始化或者重复初始化。
3.5.2 函数接口
函数 | 描述 |
---|---|
DEFINE_MUTEX(struct mutex); | 静态地定义mutex |
mutex_init(struct mutex *); | 动态初始化mutex |
mutex_lock(struct mutex *); | 为指定的mutex上锁,如果锁不可用则睡眠 |
mutex_unlock(struct mutex *); | 为指定的mutex解锁 |
mutex_trylock(struct mutex *); | 尝试获取指定的mutex,如果成功则返回1,反之返回0。 |
mutex_is_lock(struct mutex *); | 如果锁被占用,则返回1,反之返回0。 |
打开内核配置CONFIG_DEBUG_MUTEXES,可以进行调试mutex。
3.6 完成变量
3.6.1 定义
内核中一个任务完成后,使用完成变量去唤醒正在等待的任务,完成变量(completion variable)定义在<linux/completion.h>中。
3.6.2 函数接口
函数 | 描述 |
---|---|
DECLARE_COMPLETION(mr_comp); | 静态地定义并初始化完成变量 |
init_completion(struct completion *); | 初始化指定的动态创建的完成变量 |
wait_for_completion(struct completion *); | 等待接收指定的完成变量信号 |
complete(struct completion *); | 发信号唤醒任何等待任务 |
3.7 BLK:大内核锁
3.7.1 定义
BLK是一个全局自旋锁。
- 任务持有BLK锁可以睡眠,当任务无法被调度时,锁自动丢弃。当任务被调度,锁又重新获得。睡眠不会导致死锁。
- BLK是递归锁,一个进程可以多次请求获取一个锁,不会导致死锁。自旋锁递归获取会导致死锁。
- BLK只能用在进程上下文,不能用于中断上下文。自旋锁可以用于进程上下文和中断上下文。
注意:目前不再推荐使用BLK,只是一直保留着接口。
3.7.2 函数接口
函数 | 描述 |
---|---|
lock_kernel(); | 获得BKL |
unlock_kernel(); | 释放BKL |
kernel_locked(); | 如果锁被持有,返回非0值,否则返回0(UP方式总是返回非0) |
3.8 顺序锁
3.8.1 定义
顺序锁(seq锁)对临界区进行读取数据不加锁,对写进程加锁,但是需要保证读取的过程中因为写进程修改了临界区的数据,导致读进程读取数据错误。
- 写进程在进行写入操作需要更新顺序锁中sequence值(序列计数器,初始值为0,递增)。
- 读进程在读操作之前读取顺序锁中sequence值,读操作之后再次读取顺序锁中sequence值,前后值进行对比是否相等,如果不相等,则说明本次读取操作过程中数据发生了更新,需要重新读取。
typedef struct {
struct seqcount seqcount;
spinlock_t lock;
} seqlock_t;
typedef struct seqcount {
unsigned sequence;
} seqcount_t;
顺序锁是优先写进程,适用于读者多,写者少,写优先读的场景。
读-写信号量是优先读进程,因为写进程进入时需要等待所有读进程退出临界区。
3.8.2 顺序锁用法
// 静态地定义并初始化seq锁
seqlock_t seq_lock = DEFINE_SEQLOCK(seq_lock);
// 获取写锁
write_seqlock(&seq_lock);
// 写操作
write_sequnlock(&seq_lock);
// 获取读锁
unsigned int seq;
do{
seq = read_seqbegin(&seq_lock);
// 读操作
}while(read_seqretry(&seq_lock,seq));
3.8.3 函数接口
函数 | 描述 |
---|---|
DEFINE_SEQLOCK(const seqlock_t); | 静态地定义并初始化seq锁 |
write_seqlock(const seqlock_t *); | 获取写锁 |
write_sequnlock(const seqlock_t *); | 释放写锁 |
read_seqbegin(const seqlock_t *); | 读取获取读锁之前的值,返回sequence的值。 |
read_seqretry(const seqlock_t *,unsigned ); | 将seq锁中sequence的前后值进行对比。如果前后值不相等,则返回值为1,读者需要重新进行读操作;反之,返回值为0,则读者成功完成了读操作。 |
3.9 禁止内核抢占
preempt_enable()和preempt_disable(),成对出现,可以嵌套调用。
禁止中断和禁止抢占区别
禁止中断,确保某个中断处理程序不会抢占当前的代码。如local_irq_disable()和 local_irq_enable()函数。
禁止内核抢占,防止内核中当前进程不会突然被另一个进程抢占。如preempt_disable()和preempt_enable()函数。
3.9.1 函数接口
函数 | 描述 |
---|---|
preempt_disable() | 增加抢占计数值,从而禁止内核抢占 |
preempt_enable() | 减少抢占计数值,并当该值降为0时,检查和执行被挂起的需要调度的任务 |
preempt_enable_no_rasched() | 激活内核抢占,但不再检查任何被挂起的需调度的任务 |
preempt_count() | 返回抢占计数 |
3.10 屏障
编译器为了提高效率,可能存在将读和写操作进行重新排序。屏障是一组确保代码的载入存储顺序设计的指令,指示编译器不要对周围的指令序列进行重排序。
函数 | 描述 |
---|---|
rmb() | 阻止跨越屏障的载入动作发生重排序 |
read_barrier_depends() | 阻止跨越屏障的具有数据依赖关系的载入动作发生重排序 |
wmb() | 阻止跨越屏障的存储动作发生重排序 |
mb() | 阻止跨越屏障的载入和存储动作发生重排序 |
smp_rmb() | 在SMP上提供rmb()功能,在UP()上提供barrier()功能 |
smp_read_barrier_depends() | 在SMP上提供read_barrier_depends()功能,在UP()上提供barrier()功能 |
smp_wmb() | 在SMP上提供wmb()功能,在UP()上提供barrier()功能 |
smp_mb() | 在SMP上提供mb()功能,在UP()上提供barrier()功能 |
barrier() | 阻止编译器跨屏障对载入或存储操作进行优化 |
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。