介绍
RCU(Read-Copy Update),用于实现在并发访问共享数据时的读操作优化。它通过一种延迟内存释放的机制来实现高效的读操作,同时保证数据的一致性。
对于读操作,相当于是免锁的操作,是一定能获取到读锁的(api层面),且是可以并发获取到,因为读操作并不会改变共享资源。 而对于写操作,它采用副本创建技术copy+update,并不是直接在原本的共享资源上直接操作,而是创建一个副本,然后更新副本的内容, 对原本的资源没有影响,即不会产生阻塞,更新副本后,将原本的资源指向新的副本。
rcu还采用了延迟回收机制,即写入的值更新后,不会立即删除旧的值,这样正在读的线程可以继续仍可以访问到原来的数据。在确保没有线程读取老的值后,再把老的值删除。 rcu机制,实现了:即使同时有高并发的读取和写入,读取的值,统一的,要么是写入者修改之前,要么是修改之后的,不会出现新旧值交叉。
这种特点,使得rcu可以实现无阻塞的读和写。但有利有弊,rcu的这种特性,要求rcu保护的共享资源通常是指针,使用指针指向实际的资源,不像其他的锁,保护实际资源。 rcu锁也分读侧和写侧,读侧相对简单,主要工作在写侧,因为需要完成更新共享资源,替换老指针,释放老指针内存等。
详细使用参考 示例
使用场景:多读少写,要求读很多,写很少,由于机制特性,rcu的写操作其实是比其他锁更加费时间的,但它的优点是读操作几乎就是无锁操作,并发读效率高。
相关API
内核源码头文件:/include/linux/rcupdate.h
1
2
3
4
5
6
7
8
9
10
rcu_read_lock()
rcu_read_unlock() // 这两个函数分别用于获取和释放RCU读锁。在获取RCU读锁后,可以进行并发的读操作,而无需担心数据的一致性。
rcu_dereference() // 用于获取访问共享数据的指针。它提供了一种安全的方式来读取数据,确保在读操作期间数据不会被修改或释放。
//回收,一个同步,一个异步
synchronize_rcu() //等待所有的老指针失效,会阻塞,返回后,就可以删除老指针使用的内存了,是一种同步方式
call_rcu() //绑定回收回调函数,立即返回,等所有老指针失效后,自动回调绑定的函数,以释放老指针占用的空间。这个是异步的,对应rcu的延迟回收机制。
rcu_barrier() //该函数用于确保在RCU更新完成之前,先前的所有RCU读操作都已完成。它会阻塞当前线程,直到先前的RCU读取操作全部完成。
以上为常用的rcu的API,具体使用参考下文示例。
示例
创建NUM_THREADS-1个线程,读取者,最后一个线程作为写入者。主要是写入者里面,需要自己负责申请新值,并代替老值,并释放老值。 另外,共享数据结构体中需要包含一个 struct rcu_head rcu
结构。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/kthread.h>
#include <linux/slab.h>
#include <linux/delay.h>
#include <linux/rcupdate.h>
#define NUM_THREADS 5
struct shared_data {
int value;
struct rcu_head rcu; // RCU头部,给RCU使用
};
static struct shared_data *data = NULL; // 共享数据指针
static struct task_struct *threads[NUM_THREADS]; // 线程数组
static int reader_thread(void *arg)
{
int value;
struct shared_data *p;
while (!kthread_should_stop()) {
//这里为读取测临界区,在临界区中不能睡眠
rcu_read_lock(); // 获取RCU读锁
p = rcu_dereference(data); //先获取共享资源的指针
value = p->value; //再获取里面的实际对象数据
rcu_read_unlock(); // 释放RCU读锁
// 打印读取的数据
printk(KERN_INFO "Reader Thread: Read data: %d\n", value);
msleep(1000); // 延迟一段时间,模拟其他操作
}
return 0;
}
// 回调函数,用于释放旧的共享数据内存
static void release_data(struct rcu_head *rcu)
{
struct shared_data *old_data = container_of(rcu, struct shared_data, rcu);
printk(KERN_INFO "Free old data: %d\n", old_data->value);
kfree(old_data);
}
static int writer_thread(void *arg)
{
int i=0;
struct shared_data *new_data;
struct shared_data *old_data;
while (!kthread_should_stop()) {
// 创建新的共享数据
new_data = kmalloc(sizeof(struct shared_data), GFP_KERNEL);
if (!new_data) {
printk(KERN_ERR "Failed to allocate memory\n");
do_exit(1);
}
new_data->value = i;
// 更新共享数据指针并进行延迟内存释放
old_data = rcu_dereference(data); //获取旧共享资源指针
rcu_assign_pointer(data, new_data); //更新共享资源指针为新的
call_rcu(&old_data->rcu, release_data); //绑定延迟回收函数,用于回收旧的指针的内存,是异步的
// synchronize_rcu(); //也可以使用这个,这个会同步等待老指针失效,全部失效后返回,然后就可以同步手动释放老指针的内存空间了,一般用异步的方式。
// 打印写入的数据
printk(KERN_INFO "Writer Thread: Wrote data: %d\n", i);
i+=1;
msleep(1000); // 延迟一段时间,模拟其他操作
}
return 0;
}
static int __init rcu_test_init(void)
{
int i;
printk(KERN_INFO "RCU Example: Module initialized\n");
// 共享资源指针,最开始是NULL的,先给它申请一个对象
data = kmalloc(sizeof(struct shared_data), GFP_KERNEL);
if (!data) {
printk(KERN_ERR "Failed to allocate memory\n");
do_exit(1);
}
data->value = 123;
// 创建读线程
for (i = 0; i < NUM_THREADS - 1 ; ++i) {
threads[i] = kthread_run(reader_thread, NULL, "reader_thread");
if (IS_ERR(threads[i])) {
printk(KERN_ERR "Failed to create reader thread\n");
return PTR_ERR(threads[i]);
}
}
// 创建写线程
threads[NUM_THREADS - 1] = kthread_run(writer_thread, NULL, "writer_thread");
if (IS_ERR(threads[NUM_THREADS - 1])) {
printk(KERN_ERR "Failed to create writer thread\n");
return PTR_ERR(threads[NUM_THREADS - 1]);
}
return 0;
}
static void __exit rcu_test_exit(void)
{
int i;
// 等待所有线程退出
for (i = 0; i < NUM_THREADS; ++i) {
if (threads[i]) {
kthread_stop(threads[i]);
}
}
release_data(&data->rcu); //最后一个更新的值,没有绑定释放函数,自己释放。
printk(KERN_INFO "RCU Example: Module exited\n");
}
module_init(rcu_test_init);
module_exit(rcu_test_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("joy pan");
MODULE_DESCRIPTION("Sample driver to demonstrate rcu usage");