
futex同步机制分析之三内核实现
发布日期:2021-05-07 01:18:09
浏览次数:9
分类:技术文章
本文共 9544 字,大约阅读时间需要 31 分钟。
一、源码引入
前两篇从应用分析到了库,本篇到内核中看看,futex到底何方神圣?(Linux3.1.1) 先看一下futex.c和futex.h(kennel/futex.c linux/futex.h):/** * struct futex_q - The hashed futex queue entry, one per waiting task * @list: priority-sorted list of tasks waiting on this futex * @task: the task waiting on the futex * @lock_ptr: the hash bucket lock * @key: the key the futex is hashed on * @pi_state: optional priority inheritance state * @rt_waiter: rt_waiter storage for use with requeue_pi * @requeue_pi_key: the requeue_pi target futex key * @bitset: bitset for the optional bitmasked wakeup * * We use this hashed waitqueue, instead of a normal wait_queue_t, so * we can wake only the relevant ones (hashed queues may be shared). * * A futex_q has a woken state, just like tasks have TASK_RUNNING. * It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0. * The order of wakeup is always to make the first condition true, then * the second. * * PI futexes are typically woken before they are removed from the hash list via * the rt_mutex code. See unqueue_me_pi(). */struct futex_q { struct plist_node list; //内核中的链表节点,用于将数据进行管理 struct task_struct \*task;//futex相关联的指针队列-线程或者进程 spinlock_t \*lock_ptr;//自旋锁指针 union futex_key key;//查找futex的key struct futex_pi_state \*pi_state; //控制优先级继承 struct rt_mutex_waiter \*rt_waiter; union futex_key \*requeue_pi_key; u32 bitset; //常用的掩码匹配};英文注释写得非常清楚,画蛇添足,又增加了中文注释。
/* * Priority Inheritance state: */struct futex_pi_state { /* * list of 'owned' pi_state instances - these have to be * cleaned up in do_exit() if the task exits prematurely: */ struct list_head list; /* * The PI object: \*/ struct rt_mutex pi_mutex; struct task_struct \*owner; atomic_t refcount; union futex_key key;};//key是一个共用体union futex_key { struct { unsigned long pgoff; struct inode \*inode; int offset; } shared;//futex地址结构体--不同进程间区别用 struct { unsigned long address; struct mm_struct \*mm; int offset; } private;//地址结构体--同一进程间区别 struct { unsigned long word; void \*ptr; int offset; } both;//共用结构体};它们在使用时上面结构体中的相关task会被挂载到一个哈希桶中:
/* * Hash buckets are shared by all the futex_keys that hash to the same * location. Each key may have multiple futex_q structures, one for each task * waiting on a futex. */struct futex_hash_bucket { spinlock_t lock; struct plist_head chain;};static struct futex_hash_bucket futex_queues[1<区别它们的办法就是通过上面的key来查找相关的futex的。看一下相关的futex进程应用模型:
static int __init futex_init(void){ u32 curval; int i; /* * This will fail and we want it. Some arch implementations do * runtime detection of the futex_atomic_cmpxchg_inatomic() * functionality. We want to know that before we call in any * of the complex code paths. Also we want to prevent * registration of robust lists in that case. NULL is * guaranteed to fault and we get -EFAULT on functional * implementation, the non-functional ones will return * -ENOSYS. \*/ if (cmpxchg_futex_value_locked(&curval, NULL, 0, 0) == -EFAULT) futex_cmpxchg_enabled = 1; for (i = 0; i < ARRAY_SIZE(futex_queues); i++) { plist_head_init(&futex_queues[i].chain); spin_lock_init(&futex_queues[i].lock); } return 0;}在前面的第二篇glibc的分析中提到了三种futex,在这里都可以找到,主要说明pi-futex: pi-futex,也就是Priority Inheritance,优先级继承,如果学习过操作系统原理的一定知道“优先级反转”这个名词,它就是用来解决优先级反转的一种办法。 看一下相关的代码,这里不全列出,如有兴趣可参看内核的代码:
static int wake_futex_pi(u32 \__user *uaddr, u32 uval, struct futex_q *this){ struct task_struct *new_owner; struct futex_pi_state *pi_state = this->pi_state; u32 curval, newval; if (!pi_state) return -EINVAL; /* * If current does not own the pi_state then the futex is * inconsistent and user space fiddled with the futex value. */ if (pi_state->owner != current) return -EINVAL; raw_spin_lock(&pi_state->pi_mutex.wait_lock); new_owner = rt_mutex_next_owner(&pi_state->pi_mutex); /* * It is possible that the next waiter (the one that brought * this owner to the kernel) timed out and is no longer * waiting on the lock. */ if (!new_owner) new_owner = this->task; /* * We pass it to the next owner. (The WAITERS bit is always * kept enabled while there is PI state around. We must also * preserve the owner died bit.) */ if (!(uval & FUTEX_OWNER_DIED)) { int ret = 0; newval = FUTEX_WAITERS | task_pid_vnr(new_owner); if (cmpxchg_futex_value_locked(&curval, uaddr, uval, newval)) ret = -EFAULT; else if (curval != uval) ret = -EINVAL; if (ret) { raw_spin_unlock(&pi_state->pi_mutex.wait_lock); return ret; } } raw_spin_lock_irq(&pi_state->owner->pi_lock); WARN_ON(list_empty(&pi_state->list)); list_del_init(&pi_state->list); raw_spin_unlock_irq(&pi_state->owner->pi_lock); raw_spin_lock_irq(&new_owner->pi_lock); WARN_ON(!list_empty(&pi_state->list)); list_add(&pi_state->list, &new_owner->pi_state_list); pi_state->owner = new_owner; raw_spin_unlock_irq(&new_owner->pi_lock); raw_spin_unlock(&pi_state->pi_mutex.wait_lock); rt_mutex_unlock(&pi_state->pi_mutex); return 0;}static int unlock_futex_pi(u32 __user *uaddr, u32 uval){ u32 oldval; /* * There is no waiter, so we unlock the futex. The owner died * bit has not to be preserved here. We are the owner: */ if (cmpxchg_futex_value_locked(&oldval, uaddr, uval, 0)) return -EFAULT; if (oldval != uval) return -EAGAIN; return 0;}/** * requeue_pi_wake_futex() - Wake a task that acquired the lock during requeue * @q: the futex_q * @key: the key of the requeue target futex * @hb: the hash_bucket of the requeue target futex * * During futex_requeue, with requeue_pi=1, it is possible to acquire the * target futex if it is uncontended or via a lock steal. Set the futex_q key * to the requeue target futex so the waiter can detect the wakeup on the right * futex, but remove it from the hb and NULL the rt_waiter so it can detect * atomic lock acquisition. Set the q->lock_ptr to the requeue target hb->lock * to protect access to the pi_state to fixup the owner later. Must be called * with both q->lock_ptr and hb->lock held. \*/static inlinevoid requeue_pi_wake_futex(struct futex_q \*q, union futex_key \*key, struct futex_hash_bucket \*hb){ get_futex_key_refs(key); q->key = \*key; __unqueue_futex(q); WARN_ON(!q->rt_waiter); q->rt_waiter = NULL; q->lock_ptr = &hb->lock; wake_up_state(q->task, TASK_NORMAL);}从这里可以看出,内核针对不同的场景,设计了三种不同的futex,而上面的pi-futex正是用来处理优先级反转的一种,优先级反转是一种进程调用现象,在进程(线程)管理中,低优先级的进程(线程)掌握了高优先级的进程的同步资源,那么就需要处理一下优先级来达到尽快让高优先级的进程(线程)执行,解决优先级翻转问题有优先级天花板(priority ceiling)和优先级继承(priority inheritance)两种办法。内核中使用的是后者,即临时把低优先级的进程提高优先级,参与高优先级的共同执行,待释放资源后再将其降低到原来的优先级。 这里需要重点提一下requeue这个功能,这是为了更好的支持pthread_cond的broad_cast,用过条件变量的同学们可能都知道,这玩意儿一般会和一个mutex共同来使用,保证线程操作的顺序(底层通过cond和mutex的从属关系来达到),即它包含了两次等待,或两个队列等待。一个是cond信号的等待(队列),另一个是它所属的mutex锁的等待(队列)。这使得一次cond wait必须先释放mutex,然后依次等待cond信号和mutex。那这样的结果就是会造成两次的等待和唤醒,可是在广播唤醒时,只需唤醒一个线程就可以了,不需要惊醒所有的线程来竞争。所以这个requeue就是用来优化这部分的。其实就是将两个等待的唤醒队列优化成一次。 所以说,不读内核源码,你对线程的理解总是浮在上面的,无法掌握其本质。
二、调用方式
看一下对外的接口:
SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val, struct timespec __user *, utime, u32 __user *, uaddr2, u32, val3){ struct timespec ts; ktime_t t, *tp = NULL; u32 val2 = 0; int cmd = op & FUTEX_CMD_MASK; if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI || cmd == FUTEX_WAIT_BITSET || cmd == FUTEX_WAIT_REQUEUE_PI)) { if (copy_from_user(&ts, utime, sizeof(ts)) != 0) return -EFAULT; if (!timespec_valid(&ts)) return -EINVAL; t = timespec_to_ktime(ts); if (cmd == FUTEX_WAIT) t = ktime_add_safe(ktime_get(), t); tp = &t; } /* * requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*. * number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP. */ if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE || cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP) val2 = (u32) (unsigned long) utime; return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);}long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout, u32 __user *uaddr2, u32 val2, u32 val3){ int ret = -ENOSYS, cmd = op & FUTEX_CMD_MASK; unsigned int flags = 0; if (!(op & FUTEX_PRIVATE_FLAG)) flags |= FLAGS_SHARED; if (op & FUTEX_CLOCK_REALTIME) { flags |= FLAGS_CLOCKRT; if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI) return -ENOSYS; } switch (cmd) { case FUTEX_WAIT: val3 = FUTEX_BITSET_MATCH_ANY; case FUTEX_WAIT_BITSET: ret = futex_wait(uaddr, flags, val, timeout, val3); break; case FUTEX_WAKE: val3 = FUTEX_BITSET_MATCH_ANY; case FUTEX_WAKE_BITSET: ret = futex_wake(uaddr, flags, val, val3); break; case FUTEX_REQUEUE: ret = futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0); break; case FUTEX_CMP_REQUEUE: ret = futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0); break; case FUTEX_WAKE_OP: ret = futex_wake_op(uaddr, flags, uaddr2, val, val2, val3); break; case FUTEX_LOCK_PI: if (futex_cmpxchg_enabled) ret = futex_lock_pi(uaddr, flags, val, timeout, 0); break; case FUTEX_UNLOCK_PI: if (futex_cmpxchg_enabled) ret = futex_unlock_pi(uaddr, flags); break; case FUTEX_TRYLOCK_PI: if (futex_cmpxchg_enabled) ret = futex_lock_pi(uaddr, flags, 0, timeout, 1); break; case FUTEX_WAIT_REQUEUE_PI: val3 = FUTEX_BITSET_MATCH_ANY; ret = futex_wait_requeue_pi(uaddr, flags, val, timeout, val3, uaddr2); break; case FUTEX_CMP_REQUEUE_PI: ret = futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1); break; default: ret = -ENOSYS; } return ret;}通过代码可以看到,系统调用会调用do_futex,而它又会通过op来产生cmd来确定具体的调用的FUTEX的形式。因为此处不是分析内核源码,所以点到为止。 三篇futex的文章,写得有些粗糙,但是如果跟踪库和内核仔细分析下去,估计还得写好多,这就不是分析futex的本意了。
三、总结
前面分析了futex的各种优点,它有什么缺点没?有,而且很坑,搞JAVA的如果搞得深的都遇到过这个问题:
还有这个: 还有一个是拒绝服务的漏洞,好像跟它也有关系,好东西,不可能啥都好,是不是?发表评论
最新留言
感谢大佬
[***.8.128.20]2025年04月07日 02时07分21秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Mybatis-plus代码生成器模板(MySQL数据库)
2019-03-01
使用redis管理Mybatis的二级缓存
2019-03-01
使用redis管理Mybatis-Plus的二级缓存
2019-03-01
Mybatis中的SQL语句等于、不等于和模糊查询的语法
2019-03-01
使用 github 搜索
2019-03-01
java有包名的类访问没有包名的类
2019-03-01
整型关键字的散列映射
2019-03-03
多位水仙花数-python(出现运行超时?不妨用减法计算)
2019-03-03
地下迷宫探索(后两个测试点无法通过?这里有你想要的答案)
2019-03-03
小白看完都会了!阿里云大师深入拆解Java虚拟机,看完这一篇你就懂了
2019-03-03
VBA之正则表达式(19)-- 相对引用转绝对引用
2019-03-03
巧用VBA统一数字单位
2019-03-03
Transpose实现数组行列转置的限制
2019-03-03
用float/double作为中转类型的“雷区”
2019-03-03
golang中interface的一些语法缺陷的改进
2019-03-03
vue-router路由 学习笔记
2019-03-03
【数据库】第七章课后题
2019-03-03
第四章 串、数组和广义表 —— BF算法和KMP算法
2019-03-03
[选拔赛1]花园(矩阵快速幂),JM的月亮神树(最短路),保护出题人(斜率优化)
2019-03-03
DLA:一种深度网络特征融合方法
2019-03-03