diff options
author | Dave Airlie <airlied@redhat.com> | 2018-07-06 06:30:32 +1000 |
---|---|---|
committer | Dave Airlie <airlied@redhat.com> | 2018-07-06 08:47:14 +1000 |
commit | c5be9b54034339a7983a1167cdc80dc27fea1799 (patch) | |
tree | a73128c5a42a8338fa7ae76f89069aaadfe57563 /kernel/locking | |
parent | 96b2bb0b9637df1a68bb5b6853903a207fabcefd (diff) | |
parent | 07c13bb78c8b8a9cb6ee169659528945038d5e85 (diff) |
Merge branch 'vmwgfx-next' of git://people.freedesktop.org/~thomash/linux into drm-next
A patchset worked out together with Peter Zijlstra. Ingo is OK with taking
it through the DRM tree:
This is a small fallout from a work to allow batching WW mutex locks and
unlocks.
Our Wound-Wait mutexes actually don't use the Wound-Wait algorithm but
the Wait-Die algorithm. One could perhaps rename those mutexes tree-wide to
"Wait-Die mutexes" or "Deadlock Avoidance mutexes". Another approach suggested
here is to implement also the "Wound-Wait" algorithm as a per-WW-class
choice, as it has advantages in some cases. See for example
http://www.mathcs.emory.edu/~cheung/Courses/554/Syllabus/8-recv+serial/deadlock-compare.html
Now Wound-Wait is a preemptive algorithm, and the preemption is implemented
using a lazy scheme: If a wounded transaction is about to go to sleep on
a contended WW mutex, we return -EDEADLK. That is sufficient for deadlock
prevention. Since with WW mutexes we also require the aborted transaction to
sleep waiting to lock the WW mutex it was aborted on, this choice also provides
a suitable WW mutex to sleep on. If we were to return -EDEADLK on the first
WW mutex lock after the transaction was wounded whether the WW mutex was
contended or not, the transaction might frequently be restarted without a wait,
which is far from optimal. Note also that with the lazy preemption scheme,
contrary to Wait-Die there will be no rollbacks on lock contention of locks
held by a transaction that has completed its locking sequence.
The modeset locks are then changed from Wait-Die to Wound-Wait since the
typical locking pattern of those locks very well matches the criterion for
a substantial reduction in the number of rollbacks. For reservation objects,
the benefit is more unclear at this point and they remain using Wait-Die.
Signed-off-by: Dave Airlie <airlied@redhat.com>
Link: https://patchwork.freedesktop.org/patch/msgid/20180703105339.4461-1-thellstrom@vmware.com
Diffstat (limited to 'kernel/locking')
-rw-r--r-- | kernel/locking/locktorture.c | 2 | ||||
-rw-r--r-- | kernel/locking/mutex.c | 345 | ||||
-rw-r--r-- | kernel/locking/test-ww_mutex.c | 2 |
3 files changed, 264 insertions, 85 deletions
diff --git a/kernel/locking/locktorture.c b/kernel/locking/locktorture.c index 8402b3349dca..c28224347d69 100644 --- a/kernel/locking/locktorture.c +++ b/kernel/locking/locktorture.c @@ -365,7 +365,7 @@ static struct lock_torture_ops mutex_lock_ops = { }; #include <linux/ww_mutex.h> -static DEFINE_WW_CLASS(torture_ww_class); +static DEFINE_WD_CLASS(torture_ww_class); static DEFINE_WW_MUTEX(torture_ww_mutex_0, &torture_ww_class); static DEFINE_WW_MUTEX(torture_ww_mutex_1, &torture_ww_class); static DEFINE_WW_MUTEX(torture_ww_mutex_2, &torture_ww_class); diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c index f44f658ae629..1a81a1257b3f 100644 --- a/kernel/locking/mutex.c +++ b/kernel/locking/mutex.c @@ -174,6 +174,21 @@ static inline bool __mutex_waiter_is_first(struct mutex *lock, struct mutex_wait } /* + * Add @waiter to a given location in the lock wait_list and set the + * FLAG_WAITERS flag if it's the first waiter. + */ +static void __sched +__mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter, + struct list_head *list) +{ + debug_mutex_add_waiter(lock, waiter, current); + + list_add_tail(&waiter->list, list); + if (__mutex_waiter_is_first(lock, waiter)) + __mutex_set_flag(lock, MUTEX_FLAG_WAITERS); +} + +/* * Give up ownership to a specific task, when @task = NULL, this is equivalent * to a regular unlock. Sets PICKUP on a handoff, clears HANDOF, preserves * WAITERS. Provides RELEASE semantics like a regular unlock, the @@ -244,6 +259,22 @@ void __sched mutex_lock(struct mutex *lock) EXPORT_SYMBOL(mutex_lock); #endif +/* + * Wait-Die: + * The newer transactions are killed when: + * It (the new transaction) makes a request for a lock being held + * by an older transaction. + * + * Wound-Wait: + * The newer transactions are wounded when: + * An older transaction makes a request for a lock being held by + * the newer transaction. + */ + +/* + * Associate the ww_mutex @ww with the context @ww_ctx under which we acquired + * it. + */ static __always_inline void ww_mutex_lock_acquired(struct ww_mutex *ww, struct ww_acquire_ctx *ww_ctx) { @@ -282,26 +313,108 @@ ww_mutex_lock_acquired(struct ww_mutex *ww, struct ww_acquire_ctx *ww_ctx) DEBUG_LOCKS_WARN_ON(ww_ctx->ww_class != ww->ww_class); #endif ww_ctx->acquired++; + ww->ctx = ww_ctx; } +/* + * Determine if context @a is 'after' context @b. IOW, @a is a younger + * transaction than @b and depending on algorithm either needs to wait for + * @b or die. + */ static inline bool __sched __ww_ctx_stamp_after(struct ww_acquire_ctx *a, struct ww_acquire_ctx *b) { - return a->stamp - b->stamp <= LONG_MAX && - (a->stamp != b->stamp || a > b); + + return (signed long)(a->stamp - b->stamp) > 0; +} + +/* + * Wait-Die; wake a younger waiter context (when locks held) such that it can + * die. + * + * Among waiters with context, only the first one can have other locks acquired + * already (ctx->acquired > 0), because __ww_mutex_add_waiter() and + * __ww_mutex_check_kill() wake any but the earliest context. + */ +static bool __sched +__ww_mutex_die(struct mutex *lock, struct mutex_waiter *waiter, + struct ww_acquire_ctx *ww_ctx) +{ + if (!ww_ctx->is_wait_die) + return false; + + if (waiter->ww_ctx->acquired > 0 && + __ww_ctx_stamp_after(waiter->ww_ctx, ww_ctx)) { + debug_mutex_wake_waiter(lock, waiter); + wake_up_process(waiter->task); + } + + return true; +} + +/* + * Wound-Wait; wound a younger @hold_ctx if it holds the lock. + * + * Wound the lock holder if there are waiters with older transactions than + * the lock holders. Even if multiple waiters may wound the lock holder, + * it's sufficient that only one does. + */ +static bool __ww_mutex_wound(struct mutex *lock, + struct ww_acquire_ctx *ww_ctx, + struct ww_acquire_ctx *hold_ctx) +{ + struct task_struct *owner = __mutex_owner(lock); + + lockdep_assert_held(&lock->wait_lock); + + /* + * Possible through __ww_mutex_add_waiter() when we race with + * ww_mutex_set_context_fastpath(). In that case we'll get here again + * through __ww_mutex_check_waiters(). + */ + if (!hold_ctx) + return false; + + /* + * Can have !owner because of __mutex_unlock_slowpath(), but if owner, + * it cannot go away because we'll have FLAG_WAITERS set and hold + * wait_lock. + */ + if (!owner) + return false; + + if (ww_ctx->acquired > 0 && __ww_ctx_stamp_after(hold_ctx, ww_ctx)) { + hold_ctx->wounded = 1; + + /* + * wake_up_process() paired with set_current_state() + * inserts sufficient barriers to make sure @owner either sees + * it's wounded in __ww_mutex_lock_check_stamp() or has a + * wakeup pending to re-read the wounded state. + */ + if (owner != current) + wake_up_process(owner); + + return true; + } + + return false; } /* - * Wake up any waiters that may have to back off when the lock is held by the - * given context. + * We just acquired @lock under @ww_ctx, if there are later contexts waiting + * behind us on the wait-list, check if they need to die, or wound us. * - * Due to the invariants on the wait list, this can only affect the first - * waiter with a context. + * See __ww_mutex_add_waiter() for the list-order construction; basically the + * list is ordered by stamp, smallest (oldest) first. + * + * This relies on never mixing wait-die/wound-wait on the same wait-list; + * which is currently ensured by that being a ww_class property. * * The current task must not be on the wait list. */ static void __sched -__ww_mutex_wakeup_for_backoff(struct mutex *lock, struct ww_acquire_ctx *ww_ctx) +__ww_mutex_check_waiters(struct mutex *lock, struct ww_acquire_ctx *ww_ctx) { struct mutex_waiter *cur; @@ -311,66 +424,51 @@ __ww_mutex_wakeup_for_backoff(struct mutex *lock, struct ww_acquire_ctx *ww_ctx) if (!cur->ww_ctx) continue; - if (cur->ww_ctx->acquired > 0 && - __ww_ctx_stamp_after(cur->ww_ctx, ww_ctx)) { - debug_mutex_wake_waiter(lock, cur); - wake_up_process(cur->task); - } - - break; + if (__ww_mutex_die(lock, cur, ww_ctx) || + __ww_mutex_wound(lock, cur->ww_ctx, ww_ctx)) + break; } } /* - * After acquiring lock with fastpath or when we lost out in contested - * slowpath, set ctx and wake up any waiters so they can recheck. + * After acquiring lock with fastpath, where we do not hold wait_lock, set ctx + * and wake up any waiters so they can recheck. */ static __always_inline void ww_mutex_set_context_fastpath(struct ww_mutex *lock, struct ww_acquire_ctx *ctx) { ww_mutex_lock_acquired(lock, ctx); - lock->ctx = ctx; - /* * The lock->ctx update should be visible on all cores before - * the atomic read is done, otherwise contended waiters might be + * the WAITERS check is done, otherwise contended waiters might be * missed. The contended waiters will either see ww_ctx == NULL * and keep spinning, or it will acquire wait_lock, add itself * to waiter list and sleep. */ - smp_mb(); /* ^^^ */ + smp_mb(); /* See comments above and below. */ /* - * Check if lock is contended, if not there is nobody to wake up + * [W] ww->ctx = ctx [W] MUTEX_FLAG_WAITERS + * MB MB + * [R] MUTEX_FLAG_WAITERS [R] ww->ctx + * + * The memory barrier above pairs with the memory barrier in + * __ww_mutex_add_waiter() and makes sure we either observe ww->ctx + * and/or !empty list. */ if (likely(!(atomic_long_read(&lock->base.owner) & MUTEX_FLAG_WAITERS))) return; /* - * Uh oh, we raced in fastpath, wake up everyone in this case, - * so they can see the new lock->ctx. + * Uh oh, we raced in fastpath, check if any of the waiters need to + * die or wound us. */ spin_lock(&lock->base.wait_lock); - __ww_mutex_wakeup_for_backoff(&lock->base, ctx); + __ww_mutex_check_waiters(&lock->base, ctx); spin_unlock(&lock->base.wait_lock); } -/* - * After acquiring lock in the slowpath set ctx. - * - * Unlike for the fast path, the caller ensures that waiters are woken up where - * necessary. - * - * Callers must hold the mutex wait_lock. - */ -static __always_inline void -ww_mutex_set_context_slowpath(struct ww_mutex *lock, struct ww_acquire_ctx *ctx) -{ - ww_mutex_lock_acquired(lock, ctx); - lock->ctx = ctx; -} - #ifdef CONFIG_MUTEX_SPIN_ON_OWNER static inline @@ -646,37 +744,83 @@ void __sched ww_mutex_unlock(struct ww_mutex *lock) } EXPORT_SYMBOL(ww_mutex_unlock); + +static __always_inline int __sched +__ww_mutex_kill(struct mutex *lock, struct ww_acquire_ctx *ww_ctx) +{ + if (ww_ctx->acquired > 0) { +#ifdef CONFIG_DEBUG_MUTEXES + struct ww_mutex *ww; + + ww = container_of(lock, struct ww_mutex, base); + DEBUG_LOCKS_WARN_ON(ww_ctx->contending_lock); + ww_ctx->contending_lock = ww; +#endif + return -EDEADLK; + } + + return 0; +} + + +/* + * Check the wound condition for the current lock acquire. + * + * Wound-Wait: If we're wounded, kill ourself. + * + * Wait-Die: If we're trying to acquire a lock already held by an older + * context, kill ourselves. + * + * Since __ww_mutex_add_waiter() orders the wait-list on stamp, we only have to + * look at waiters before us in the wait-list. + */ static inline int __sched -__ww_mutex_lock_check_stamp(struct mutex *lock, struct mutex_waiter *waiter, - struct ww_acquire_ctx *ctx) +__ww_mutex_check_kill(struct mutex *lock, struct mutex_waiter *waiter, + struct ww_acquire_ctx *ctx) { struct ww_mutex *ww = container_of(lock, struct ww_mutex, base); struct ww_acquire_ctx *hold_ctx = READ_ONCE(ww->ctx); struct mutex_waiter *cur; + if (ctx->acquired == 0) + return 0; + + if (!ctx->is_wait_die) { + if (ctx->wounded) + return __ww_mutex_kill(lock, ctx); + + return 0; + } + if (hold_ctx && __ww_ctx_stamp_after(ctx, hold_ctx)) - goto deadlock; + return __ww_mutex_kill(lock, ctx); /* * If there is a waiter in front of us that has a context, then its - * stamp is earlier than ours and we must back off. + * stamp is earlier than ours and we must kill ourself. */ cur = waiter; list_for_each_entry_continue_reverse(cur, &lock->wait_list, list) { - if (cur->ww_ctx) - goto deadlock; + if (!cur->ww_ctx) + continue; + + return __ww_mutex_kill(lock, ctx); } return 0; - -deadlock: -#ifdef CONFIG_DEBUG_MUTEXES - DEBUG_LOCKS_WARN_ON(ctx->contending_lock); - ctx->contending_lock = ww; -#endif - return -EDEADLK; } +/* + * Add @waiter to the wait-list, keep the wait-list ordered by stamp, smallest + * first. Such that older contexts are preferred to acquire the lock over + * younger contexts. + * + * Waiters without context are interspersed in FIFO order. + * + * Furthermore, for Wait-Die kill ourself immediately when possible (there are + * older contexts already waiting) to avoid unnecessary waiting and for + * Wound-Wait ensure we wound the owning context when it is younger. + */ static inline int __sched __ww_mutex_add_waiter(struct mutex_waiter *waiter, struct mutex *lock, @@ -684,16 +828,21 @@ __ww_mutex_add_waiter(struct mutex_waiter *waiter, { struct mutex_waiter *cur; struct list_head *pos; + bool is_wait_die; if (!ww_ctx) { - list_add_tail(&waiter->list, &lock->wait_list); + __mutex_add_waiter(lock, waiter, &lock->wait_list); return 0; } + is_wait_die = ww_ctx->is_wait_die; + /* * Add the waiter before the first waiter with a higher stamp. * Waiters without a context are skipped to avoid starving - * them. + * them. Wait-Die waiters may die here. Wound-Wait waiters + * never die here, but they are sorted in stamp order and + * may wound the lock holder. */ pos = &lock->wait_list; list_for_each_entry_reverse(cur, &lock->wait_list, list) { @@ -701,16 +850,16 @@ __ww_mutex_add_waiter(struct mutex_waiter *waiter, continue; if (__ww_ctx_stamp_after(ww_ctx, cur->ww_ctx)) { - /* Back off immediately if necessary. */ - if (ww_ctx->acquired > 0) { -#ifdef CONFIG_DEBUG_MUTEXES - struct ww_mutex *ww; - - ww = container_of(lock, struct ww_mutex, base); - DEBUG_LOCKS_WARN_ON(ww_ctx->contending_lock); - ww_ctx->contending_lock = ww; -#endif - return -EDEADLK; + /* + * Wait-Die: if we find an older context waiting, there + * is no point in queueing behind it, as we'd have to + * die the moment it would acquire the lock. + */ + if (is_wait_die) { + int ret = __ww_mutex_kill(lock, ww_ctx); + + if (ret) + return ret; } break; @@ -718,17 +867,28 @@ __ww_mutex_add_waiter(struct mutex_waiter *waiter, pos = &cur->list; + /* Wait-Die: ensure younger waiters die. */ + __ww_mutex_die(lock, cur, ww_ctx); + } + + __mutex_add_waiter(lock, waiter, pos); + + /* + * Wound-Wait: if we're blocking on a mutex owned by a younger context, + * wound that such that we might proceed. + */ + if (!is_wait_die) { + struct ww_mutex *ww = container_of(lock, struct ww_mutex, base); + /* - * Wake up the waiter so that it gets a chance to back - * off. + * See ww_mutex_set_context_fastpath(). Orders setting + * MUTEX_FLAG_WAITERS vs the ww->ctx load, + * such that either we or the fastpath will wound @ww->ctx. */ - if (cur->ww_ctx->acquired > 0) { - debug_mutex_wake_waiter(lock, cur); - wake_up_process(cur->task); - } + smp_mb(); + __ww_mutex_wound(lock, ww_ctx, ww->ctx); } - list_add_tail(&waiter->list, pos); return 0; } @@ -751,6 +911,14 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, if (use_ww_ctx && ww_ctx) { if (unlikely(ww_ctx == READ_ONCE(ww->ctx))) return -EALREADY; + + /* + * Reset the wounded flag after a kill. No other process can + * race and wound us here since they can't have a valid owner + * pointer if we don't have any locks held. + */ + if (ww_ctx->acquired == 0) + ww_ctx->wounded = 0; } preempt_disable(); @@ -772,7 +940,7 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, */ if (__mutex_trylock(lock)) { if (use_ww_ctx && ww_ctx) - __ww_mutex_wakeup_for_backoff(lock, ww_ctx); + __ww_mutex_check_waiters(lock, ww_ctx); goto skip_wait; } @@ -784,25 +952,26 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, if (!use_ww_ctx) { /* add waiting tasks to the end of the waitqueue (FIFO): */ - list_add_tail(&waiter.list, &lock->wait_list); + __mutex_add_waiter(lock, &waiter, &lock->wait_list); + #ifdef CONFIG_DEBUG_MUTEXES waiter.ww_ctx = MUTEX_POISON_WW_CTX; #endif } else { - /* Add in stamp order, waking up waiters that must back off. */ + /* + * Add in stamp order, waking up waiters that must kill + * themselves. + */ ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx); if (ret) - goto err_early_backoff; + goto err_early_kill; waiter.ww_ctx = ww_ctx; } waiter.task = current; - if (__mutex_waiter_is_first(lock, &waiter)) - __mutex_set_flag(lock, MUTEX_FLAG_WAITERS); - set_current_state(state); for (;;) { /* @@ -815,7 +984,7 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, goto acquired; /* - * Check for signals and wound conditions while holding + * Check for signals and kill conditions while holding * wait_lock. This ensures the lock cancellation is ordered * against mutex_unlock() and wake-ups do not go missing. */ @@ -824,8 +993,8 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, goto err; } - if (use_ww_ctx && ww_ctx && ww_ctx->acquired > 0) { - ret = __ww_mutex_lock_check_stamp(lock, &waiter, ww_ctx); + if (use_ww_ctx && ww_ctx) { + ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx); if (ret) goto err; } @@ -859,6 +1028,16 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, acquired: __set_current_state(TASK_RUNNING); + if (use_ww_ctx && ww_ctx) { + /* + * Wound-Wait; we stole the lock (!first_waiter), check the + * waiters as anyone might want to wound us. + */ + if (!ww_ctx->is_wait_die && + !__mutex_waiter_is_first(lock, &waiter)) + __ww_mutex_check_waiters(lock, ww_ctx); + } + mutex_remove_waiter(lock, &waiter, current); if (likely(list_empty(&lock->wait_list))) __mutex_clear_flag(lock, MUTEX_FLAGS); @@ -870,7 +1049,7 @@ skip_wait: lock_acquired(&lock->dep_map, ip); if (use_ww_ctx && ww_ctx) - ww_mutex_set_context_slowpath(ww, ww_ctx); + ww_mutex_lock_acquired(ww, ww_ctx); spin_unlock(&lock->wait_lock); preempt_enable(); @@ -879,7 +1058,7 @@ skip_wait: err: __set_current_state(TASK_RUNNING); mutex_remove_waiter(lock, &waiter, current); -err_early_backoff: +err_early_kill: spin_unlock(&lock->wait_lock); debug_mutex_free_waiter(&waiter); mutex_release(&lock->dep_map, 1, ip); diff --git a/kernel/locking/test-ww_mutex.c b/kernel/locking/test-ww_mutex.c index 0e4cd64ad2c0..5b915b370d5a 100644 --- a/kernel/locking/test-ww_mutex.c +++ b/kernel/locking/test-ww_mutex.c @@ -26,7 +26,7 @@ #include <linux/slab.h> #include <linux/ww_mutex.h> -static DEFINE_WW_CLASS(ww_class); +static DEFINE_WD_CLASS(ww_class); struct workqueue_struct *wq; struct test_mutex { |