diff options
Diffstat (limited to 'fs')
-rw-r--r-- | fs/dlm/ast.c | 3 | ||||
-rw-r--r-- | fs/dlm/dlm_internal.h | 8 | ||||
-rw-r--r-- | fs/dlm/lock.c | 286 | ||||
-rw-r--r-- | fs/dlm/lock.h | 4 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 20 | ||||
-rw-r--r-- | fs/dlm/rcom.c | 23 | ||||
-rw-r--r-- | fs/dlm/recover.c | 73 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 9 | ||||
-rw-r--r-- | fs/dlm/requestqueue.c | 39 | ||||
-rw-r--r-- | fs/gfs2/incore.h | 1 | ||||
-rw-r--r-- | fs/gfs2/lock_dlm.c | 2 | ||||
-rw-r--r-- | fs/gfs2/ops_fstype.c | 7 |
12 files changed, 304 insertions, 171 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 90e5997262ea..63dc19c54d5a 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls) } mutex_unlock(&ls->ls_cb_mutex); - log_debug(ls, "dlm_callback_resume %d", count); + if (count) + log_debug(ls, "dlm_callback_resume %d", count); } diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 0e74832c021b..bc342f7ac3af 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -271,6 +271,8 @@ struct dlm_lkb { ktime_t lkb_last_cast_time; /* for debugging */ ktime_t lkb_last_bast_time; /* for debugging */ + uint64_t lkb_recover_seq; /* from ls_recover_seq */ + char *lkb_lvbptr; struct dlm_lksb *lkb_lksb; /* caller's status block */ void (*lkb_astfn) (void *astparam); @@ -325,7 +327,7 @@ enum rsb_flags { RSB_NEW_MASTER, RSB_NEW_MASTER2, RSB_RECOVER_CONVERT, - RSB_LOCKS_PURGED, + RSB_RECOVER_GRANT, }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) @@ -571,6 +573,7 @@ struct dlm_ls { struct mutex ls_requestqueue_mutex; struct dlm_rcom *ls_recover_buf; int ls_recover_nodeid; /* for debugging */ + unsigned int ls_recover_locks_in; /* for log info */ uint64_t ls_rcom_seq; spinlock_t ls_rcom_spin; struct list_head ls_recover_list; @@ -597,6 +600,7 @@ struct dlm_ls { #define LSFL_UEVENT_WAIT 5 #define LSFL_TIMEWARN 6 #define LSFL_CB_DELAY 7 +#define LSFL_NODIR 8 /* much of this is just saving user space pointers associated with the lock that we pass back to the user lib with an ast */ @@ -644,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls) static inline int dlm_no_directory(struct dlm_ls *ls) { - return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0; + return test_bit(LSFL_NODIR, &ls->ls_flags); } int dlm_netlink_init(void); diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index f3ba70301a45..bdafb65a5234 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -161,10 +161,11 @@ static const int __quecvt_compat_matrix[8][8] = { void dlm_print_lkb(struct dlm_lkb *lkb) { printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " - "sts %d rq %d gr %d wait_type %d wait_nodeid %d\n", + "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, - lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid); + lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, + (unsigned long long)lkb->lkb_recover_seq); } static void dlm_print_rsb(struct dlm_rsb *r) @@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb) static inline int is_master_copy(struct dlm_lkb *lkb) { - if (lkb->lkb_flags & DLM_IFL_MSTCPY) - DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb);); return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; } @@ -1519,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) } lkb->lkb_rqmode = DLM_LOCK_IV; + lkb->lkb_highbast = 0; } static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) { set_lvb_lock(r, lkb); _grant_lock(r, lkb); - lkb->lkb_highbast = 0; } static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, @@ -1887,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, /* Returns the highest requested mode of all blocked conversions; sets cw if there's a blocked conversion to DLM_LOCK_CW. */ -static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) +static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, + unsigned int *count) { struct dlm_lkb *lkb, *s; int hi, demoted, quit, grant_restart, demote_restart; @@ -1906,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) if (can_be_granted(r, lkb, 0, &deadlk)) { grant_lock_pending(r, lkb); grant_restart = 1; + if (count) + (*count)++; continue; } @@ -1939,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) return max_t(int, high, hi); } -static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) +static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, + unsigned int *count) { struct dlm_lkb *lkb, *s; list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { - if (can_be_granted(r, lkb, 0, NULL)) + if (can_be_granted(r, lkb, 0, NULL)) { grant_lock_pending(r, lkb); - else { + if (count) + (*count)++; + } else { high = max_t(int, lkb->lkb_rqmode, high); if (lkb->lkb_rqmode == DLM_LOCK_CW) *cw = 1; @@ -1975,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) return 0; } -static void grant_pending_locks(struct dlm_rsb *r) +static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) { struct dlm_lkb *lkb, *s; int high = DLM_LOCK_IV; int cw = 0; - DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); + if (!is_master(r)) { + log_print("grant_pending_locks r nodeid %d", r->res_nodeid); + dlm_dump_rsb(r); + return; + } - high = grant_pending_convert(r, high, &cw); - high = grant_pending_wait(r, high, &cw); + high = grant_pending_convert(r, high, &cw, count); + high = grant_pending_wait(r, high, &cw, count); if (high == DLM_LOCK_IV) return; @@ -2520,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) before we try again to grant this one. */ if (is_demoted(lkb)) { - grant_pending_convert(r, DLM_LOCK_IV, NULL); + grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); if (_can_be_granted(r, lkb, 1)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); @@ -2548,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, { switch (error) { case 0: - grant_pending_locks(r); + grant_pending_locks(r, NULL); /* grant_pending_locks also sends basts */ break; case -EAGAIN: @@ -2571,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, int error) { - grant_pending_locks(r); + grant_pending_locks(r, NULL); } /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ @@ -2592,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, int error) { if (error) - grant_pending_locks(r); + grant_pending_locks(r, NULL); } /* @@ -3452,8 +3461,9 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) goto fail; if (lkb->lkb_remid != ms->m_lkid) { - log_error(ls, "receive_convert %x remid %x remote %d %x", - lkb->lkb_id, lkb->lkb_remid, + log_error(ls, "receive_convert %x remid %x recover_seq %llu " + "remote %d %x", lkb->lkb_id, lkb->lkb_remid, + (unsigned long long)lkb->lkb_recover_seq, ms->m_header.h_nodeid, ms->m_lkid); error = -ENOENT; goto fail; @@ -3631,6 +3641,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) goto out; queue_bast(r, lkb, ms->m_bastmode); + lkb->lkb_highbast = ms->m_bastmode; out: unlock_rsb(r); put_rsb(r); @@ -3710,8 +3721,13 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) mstype = lkb->lkb_wait_type; error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); - if (error) + if (error) { + log_error(ls, "receive_request_reply %x remote %d %x result %d", + lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, + ms->m_result); + dlm_dump_rsb(r); goto out; + } /* Optimization: the dir node was also the master, so it took our lookup as a request and sent request reply instead of lookup reply */ @@ -4122,21 +4138,28 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, * happen in normal usage for the async messages and cancel, so * only use log_debug for them. * - * Other errors are expected and normal. + * Some errors are expected and normal. */ if (error == -ENOENT && noent) { - log_debug(ls, "receive %d no %x remote %d %x seq %u", + log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", ms->m_type, ms->m_remid, ms->m_header.h_nodeid, ms->m_lkid, saved_seq); } else if (error == -ENOENT) { - log_error(ls, "receive %d no %x remote %d %x seq %u", + log_error(ls, "receive %d no %x remote %d %x saved_seq %u", ms->m_type, ms->m_remid, ms->m_header.h_nodeid, ms->m_lkid, saved_seq); if (ms->m_type == DLM_MSG_CONVERT) dlm_dump_rsb_hash(ls, ms->m_hash); } + + if (error == -EINVAL) { + log_error(ls, "receive %d inval from %d lkid %x remid %x " + "saved_seq %u", + ms->m_type, ms->m_header.h_nodeid, + ms->m_lkid, ms->m_remid, saved_seq); + } } /* If the lockspace is in recovery mode (locking stopped), then normal @@ -4200,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid) ls = dlm_find_lockspace_global(hd->h_lockspace); if (!ls) { - if (dlm_config.ci_log_debug) - log_print("invalid lockspace %x from %d cmd %d type %d", - hd->h_lockspace, nodeid, hd->h_cmd, type); + if (dlm_config.ci_log_debug) { + printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " + "%u from %d cmd %d type %d\n", + hd->h_lockspace, nodeid, hd->h_cmd, type); + } if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) dlm_send_ls_not_ready(nodeid, &p->rcom); @@ -4253,16 +4278,10 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, int dir_nodeid) { - if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) - return 1; - - if (!dlm_no_directory(ls)) - return 0; - - if (dir_nodeid == dlm_our_nodeid()) + if (dlm_no_directory(ls)) return 1; - if (dir_nodeid != lkb->lkb_wait_nodeid) + if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) return 1; return 0; @@ -4519,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) return error; } -static void purge_queue(struct dlm_rsb *r, struct list_head *queue, - int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) +static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, + struct list_head *list) { - struct dlm_ls *ls = r->res_ls; struct dlm_lkb *lkb, *safe; - list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { - if (test(ls, lkb)) { - rsb_set_flag(r, RSB_LOCKS_PURGED); - del_lkb(r, lkb); - /* this put should free the lkb */ - if (!dlm_put_lkb(lkb)) - log_error(ls, "purged lkb not released"); - } + list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { + if (!is_master_copy(lkb)) + continue; + + /* don't purge lkbs we've added in recover_master_copy for + the current recovery seq */ + + if (lkb->lkb_recover_seq == ls->ls_recover_seq) + continue; + + del_lkb(r, lkb); + + /* this put should free the lkb */ + if (!dlm_put_lkb(lkb)) + log_error(ls, "purged mstcpy lkb not released"); } } -static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) +void dlm_purge_mstcpy_locks(struct dlm_rsb *r) { - return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); -} + struct dlm_ls *ls = r->res_ls; -static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) -{ - return is_master_copy(lkb); + purge_mstcpy_list(ls, r, &r->res_grantqueue); + purge_mstcpy_list(ls, r, &r->res_convertqueue); + purge_mstcpy_list(ls, r, &r->res_waitqueue); } -static void purge_dead_locks(struct dlm_rsb *r) +static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, + struct list_head *list, + int nodeid_gone, unsigned int *count) { - purge_queue(r, &r->res_grantqueue, &purge_dead_test); - purge_queue(r, &r->res_convertqueue, &purge_dead_test); - purge_queue(r, &r->res_waitqueue, &purge_dead_test); -} + struct dlm_lkb *lkb, *safe; -void dlm_purge_mstcpy_locks(struct dlm_rsb *r) -{ - purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); - purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); - purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); + list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { + if (!is_master_copy(lkb)) + continue; + + if ((lkb->lkb_nodeid == nodeid_gone) || + dlm_is_removed(ls, lkb->lkb_nodeid)) { + + del_lkb(r, lkb); + + /* this put should free the lkb */ + if (!dlm_put_lkb(lkb)) + log_error(ls, "purged dead lkb not released"); + + rsb_set_flag(r, RSB_RECOVER_GRANT); + + (*count)++; + } + } } /* Get rid of locks held by nodes that are gone. */ -int dlm_purge_locks(struct dlm_ls *ls) +void dlm_recover_purge(struct dlm_ls *ls) { struct dlm_rsb *r; + struct dlm_member *memb; + int nodes_count = 0; + int nodeid_gone = 0; + unsigned int lkb_count = 0; + + /* cache one removed nodeid to optimize the common + case of a single node removed */ + + list_for_each_entry(memb, &ls->ls_nodes_gone, list) { + nodes_count++; + nodeid_gone = memb->nodeid; + } - log_debug(ls, "dlm_purge_locks"); + if (!nodes_count) + return; down_write(&ls->ls_root_sem); list_for_each_entry(r, &ls->ls_root_list, res_root_list) { hold_rsb(r); lock_rsb(r); - if (is_master(r)) - purge_dead_locks(r); + if (is_master(r)) { + purge_dead_list(ls, r, &r->res_grantqueue, + nodeid_gone, &lkb_count); + purge_dead_list(ls, r, &r->res_convertqueue, + nodeid_gone, &lkb_count); + purge_dead_list(ls, r, &r->res_waitqueue, + nodeid_gone, &lkb_count); + } unlock_rsb(r); unhold_rsb(r); - - schedule(); + cond_resched(); } up_write(&ls->ls_root_sem); - return 0; + if (lkb_count) + log_debug(ls, "dlm_recover_purge %u locks for %u nodes", + lkb_count, nodes_count); } -static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) +static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) { struct rb_node *n; - struct dlm_rsb *r, *r_ret = NULL; + struct dlm_rsb *r; spin_lock(&ls->ls_rsbtbl[bucket].lock); for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { r = rb_entry(n, struct dlm_rsb, res_hashnode); - if (!rsb_flag(r, RSB_LOCKS_PURGED)) + + if (!rsb_flag(r, RSB_RECOVER_GRANT)) + continue; + rsb_clear_flag(r, RSB_RECOVER_GRANT); + if (!is_master(r)) continue; hold_rsb(r); - rsb_clear_flag(r, RSB_LOCKS_PURGED); - r_ret = r; - break; + spin_unlock(&ls->ls_rsbtbl[bucket].lock); + return r; } spin_unlock(&ls->ls_rsbtbl[bucket].lock); - return r_ret; + return NULL; } -void dlm_grant_after_purge(struct dlm_ls *ls) +/* + * Attempt to grant locks on resources that we are the master of. + * Locks may have become grantable during recovery because locks + * from departed nodes have been purged (or not rebuilt), allowing + * previously blocked locks to now be granted. The subset of rsb's + * we are interested in are those with lkb's on either the convert or + * waiting queues. + * + * Simplest would be to go through each master rsb and check for non-empty + * convert or waiting queues, and attempt to grant on those rsbs. + * Checking the queues requires lock_rsb, though, for which we'd need + * to release the rsbtbl lock. This would make iterating through all + * rsb's very inefficient. So, we rely on earlier recovery routines + * to set RECOVER_GRANT on any rsb's that we should attempt to grant + * locks for. + */ + +void dlm_recover_grant(struct dlm_ls *ls) { struct dlm_rsb *r; int bucket = 0; + unsigned int count = 0; + unsigned int rsb_count = 0; + unsigned int lkb_count = 0; while (1) { - r = find_purged_rsb(ls, bucket); + r = find_grant_rsb(ls, bucket); if (!r) { if (bucket == ls->ls_rsbtbl_size - 1) break; bucket++; continue; } + rsb_count++; + count = 0; lock_rsb(r); - if (is_master(r)) { - grant_pending_locks(r); - confirm_master(r, 0); - } + grant_pending_locks(r, &count); + lkb_count += count; + confirm_master(r, 0); unlock_rsb(r); put_rsb(r); - schedule(); + cond_resched(); } + + if (lkb_count) + log_debug(ls, "dlm_recover_grant %u locks on %u resources", + lkb_count, rsb_count); } static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, @@ -4723,11 +4807,26 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) remid = le32_to_cpu(rl->rl_lkid); - error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), - R_MASTER, &r); + /* In general we expect the rsb returned to be R_MASTER, but we don't + have to require it. Recovery of masters on one node can overlap + recovery of locks on another node, so one node can send us MSTCPY + locks before we've made ourselves master of this rsb. We can still + add new MSTCPY locks that we receive here without any harm; when + we make ourselves master, dlm_recover_masters() won't touch the + MSTCPY locks we've received early. */ + + error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r); if (error) goto out; + if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { + log_error(ls, "dlm_recover_master_copy remote %d %x not dir", + rc->rc_header.h_nodeid, remid); + error = -EBADR; + put_rsb(r); + goto out; + } + lock_rsb(r); lkb = search_remid(r, rc->rc_header.h_nodeid, remid); @@ -4749,12 +4848,18 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) attach_lkb(r, lkb); add_lkb(r, lkb, rl->rl_status); error = 0; + ls->ls_recover_locks_in++; + + if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) + rsb_set_flag(r, RSB_RECOVER_GRANT); out_remid: /* this is the new value returned to the lock holder for saving in its process-copy lkb */ rl->rl_remid = cpu_to_le32(lkb->lkb_id); + lkb->lkb_recover_seq = ls->ls_recover_seq; + out_unlock: unlock_rsb(r); put_rsb(r); @@ -4786,17 +4891,20 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) return error; } + r = lkb->lkb_resource; + hold_rsb(r); + lock_rsb(r); + if (!is_process_copy(lkb)) { log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", lkid, rc->rc_header.h_nodeid, remid, result); - dlm_print_lkb(lkb); + dlm_dump_rsb(r); + unlock_rsb(r); + put_rsb(r); + dlm_put_lkb(lkb); return -EINVAL; } - r = lkb->lkb_resource; - hold_rsb(r); - lock_rsb(r); - switch (result) { case -EBADR: /* There's a chance the new master received our lock before diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index 56e2bc646565..c8b226c62807 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -32,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls); int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, unsigned int flags, struct dlm_rsb **r_ret); -int dlm_purge_locks(struct dlm_ls *ls); +void dlm_recover_purge(struct dlm_ls *ls); void dlm_purge_mstcpy_locks(struct dlm_rsb *r); -void dlm_grant_after_purge(struct dlm_ls *ls); +void dlm_recover_grant(struct dlm_ls *ls); int dlm_recover_waiters_post(struct dlm_ls *ls); void dlm_recover_waiters_pre(struct dlm_ls *ls); int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index a1ea25face82..ca506abbdd3b 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) return len; } +static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) +{ + return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); +} + +static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) +{ + int val = simple_strtoul(buf, NULL, 0); + if (val == 1) + set_bit(LSFL_NODIR, &ls->ls_flags); + return len; +} + static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) { uint32_t status = dlm_recover_status(ls); @@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = { .store = dlm_id_store }; +static struct dlm_attr dlm_attr_nodir = { + .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, + .show = dlm_nodir_show, + .store = dlm_nodir_store +}; + static struct dlm_attr dlm_attr_recover_status = { .attr = {.name = "recover_status", .mode = S_IRUGO}, .show = dlm_recover_status_show @@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = { &dlm_attr_control.attr, &dlm_attr_event.attr, &dlm_attr_id.attr, + &dlm_attr_nodir.attr, &dlm_attr_recover_status.attr, &dlm_attr_recover_nodeid.attr, NULL, diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 6565fd5e28ef..64d3e2b958c7 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -492,30 +492,41 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) { int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); - int stop, reply = 0; + int stop, reply = 0, lock = 0; + uint32_t status; uint64_t seq; switch (rc->rc_type) { + case DLM_RCOM_LOCK: + lock = 1; + break; + case DLM_RCOM_LOCK_REPLY: + lock = 1; + reply = 1; + break; case DLM_RCOM_STATUS_REPLY: case DLM_RCOM_NAMES_REPLY: case DLM_RCOM_LOOKUP_REPLY: - case DLM_RCOM_LOCK_REPLY: reply = 1; }; spin_lock(&ls->ls_recover_lock); + status = ls->ls_recover_status; stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); seq = ls->ls_recover_seq; spin_unlock(&ls->ls_recover_lock); if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || - (reply && (rc->rc_seq_reply != seq))) { + (reply && (rc->rc_seq_reply != seq)) || + (lock && !(status & DLM_RS_DIR))) { log_limit(ls, "dlm_receive_rcom ignore msg %d " - "from %d %llu %llu seq %llu", - rc->rc_type, nodeid, + "from %d %llu %llu recover seq %llu sts %x gen %u", + rc->rc_type, + nodeid, (unsigned long long)rc->rc_seq, (unsigned long long)rc->rc_seq_reply, - (unsigned long long)seq); + (unsigned long long)seq, + status, ls->ls_generation); goto out; } diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 34d5adf1fce7..7554e4dac6bb 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid) { struct dlm_lkb *lkb; - list_for_each_entry(lkb, queue, lkb_statequeue) - if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) + list_for_each_entry(lkb, queue, lkb_statequeue) { + if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) { lkb->lkb_nodeid = nodeid; + lkb->lkb_remid = 0; + } + } } static void set_master_lkbs(struct dlm_rsb *r) @@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r) /* * Propagate the new master nodeid to locks * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. - * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which + * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which * rsb's to consider. */ static void set_new_master(struct dlm_rsb *r, int nodeid) { - lock_rsb(r); r->res_nodeid = nodeid; set_master_lkbs(r); rsb_set_flag(r, RSB_NEW_MASTER); rsb_set_flag(r, RSB_NEW_MASTER2); - unlock_rsb(r); } /* @@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid) static int recover_master(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); - - dir_nodeid = dlm_dir_nodeid(r); + int error, ret_nodeid; + int our_nodeid = dlm_our_nodeid(); + int dir_nodeid = dlm_dir_nodeid(r); if (dir_nodeid == our_nodeid) { error = dlm_dir_lookup(ls, our_nodeid, r->res_name, @@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r) if (ret_nodeid == our_nodeid) ret_nodeid = 0; + lock_rsb(r); set_new_master(r, ret_nodeid); + unlock_rsb(r); } else { recover_list_add(r); error = dlm_send_rcom_lookup(r, dir_nodeid); @@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r) } /* - * When not using a directory, most resource names will hash to a new static - * master nodeid and the resource will need to be remastered. + * All MSTCPY locks are purged and rebuilt, even if the master stayed the same. + * This is necessary because recovery can be started, aborted and restarted, + * causing the master nodeid to briefly change during the aborted recovery, and + * change back to the original value in the second recovery. The MSTCPY locks + * may or may not have been purged during the aborted recovery. Another node + * with an outstanding request in waiters list and a request reply saved in the + * requestqueue, cannot know whether it should ignore the reply and resend the + * request, or accept the reply and complete the request. It must do the + * former if the remote node purged MSTCPY locks, and it must do the later if + * the remote node did not. This is solved by always purging MSTCPY locks, in + * which case, the request reply would always be ignored and the request + * resent. */ static int recover_master_static(struct dlm_rsb *r) { - int master = dlm_dir_nodeid(r); + int dir_nodeid = dlm_dir_nodeid(r); + int new_master = dir_nodeid; - if (master == dlm_our_nodeid()) - master = 0; + if (dir_nodeid == dlm_our_nodeid()) + new_master = 0; - if (r->res_nodeid != master) { - if (is_master(r)) - dlm_purge_mstcpy_locks(r); - set_new_master(r, master); - return 1; - } - return 0; + lock_rsb(r); + dlm_purge_mstcpy_locks(r); + set_new_master(r, new_master); + unlock_rsb(r); + return 1; } /* @@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) if (nodeid == dlm_our_nodeid()) nodeid = 0; + lock_rsb(r); set_new_master(r, nodeid); + unlock_rsb(r); recover_list_del(r); if (recover_list_empty(ls)) @@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls) struct dlm_rsb *r; int error, count = 0; - log_debug(ls, "dlm_recover_locks"); - down_read(&ls->ls_root_sem); list_for_each_entry(r, &ls->ls_root_list, res_root_list) { if (is_master(r)) { @@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls) } up_read(&ls->ls_root_sem); - log_debug(ls, "dlm_recover_locks %d locks", count); + log_debug(ls, "dlm_recover_locks %d out", count); error = dlm_wait_function(ls, &recover_list_empty); out: @@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r) } /* We've become the new master for this rsb and waiting/converting locks may - need to be granted in dlm_grant_after_purge() due to locks that may have + need to be granted in dlm_recover_grant() due to locks that may have existed from a removed node. */ -static void set_locks_purged(struct dlm_rsb *r) +static void recover_grant(struct dlm_rsb *r) { if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) - rsb_set_flag(r, RSB_LOCKS_PURGED); + rsb_set_flag(r, RSB_RECOVER_GRANT); } void dlm_recover_rsbs(struct dlm_ls *ls) { struct dlm_rsb *r; - int count = 0; - - log_debug(ls, "dlm_recover_rsbs"); + unsigned int count = 0; down_read(&ls->ls_root_sem); list_for_each_entry(r, &ls->ls_root_list, res_root_list) { @@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls) if (rsb_flag(r, RSB_RECOVER_CONVERT)) recover_conversion(r); if (rsb_flag(r, RSB_NEW_MASTER2)) - set_locks_purged(r); + recover_grant(r); recover_lvb(r); count++; } @@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls) } up_read(&ls->ls_root_sem); - log_debug(ls, "dlm_recover_rsbs %d rsbs", count); + if (count) + log_debug(ls, "dlm_recover_rsbs %d done", count); } /* Create a single list of all root rsb's to be used during recovery */ diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 11351b57c781..f1a9073c0835 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) goto fail; } + ls->ls_recover_locks_in = 0; + dlm_set_recover_status(ls, DLM_RS_NODES); error = dlm_recover_members_wait(ls); @@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * Clear lkb's for departed nodes. */ - dlm_purge_locks(ls); + dlm_recover_purge(ls); /* * Get new master nodeid's for rsb's that were mastered on @@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) goto fail; } + log_debug(ls, "dlm_recover_locks %u in", + ls->ls_recover_locks_in); + /* * Finalize state in master rsb's now that all locks can be * checked. This includes conversion resolution and lvb @@ -225,7 +230,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) goto fail; } - dlm_grant_after_purge(ls); + dlm_recover_grant(ls); log_debug(ls, "dlm_recover %llu generation %u done: %u ms", (unsigned long long)rv->seq, ls->ls_generation, diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c index d3191bf03a68..1695f1b0dd45 100644 --- a/fs/dlm/requestqueue.c +++ b/fs/dlm/requestqueue.c @@ -65,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms) int dlm_process_requestqueue(struct dlm_ls *ls) { struct rq_entry *e; + struct dlm_message *ms; int error = 0; mutex_lock(&ls->ls_requestqueue_mutex); @@ -78,6 +79,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls) e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); mutex_unlock(&ls->ls_requestqueue_mutex); + ms = &e->request; + + log_limit(ls, "dlm_process_requestqueue msg %d from %d " + "lkid %x remid %x result %d seq %u", + ms->m_type, ms->m_header.h_nodeid, + ms->m_lkid, ms->m_remid, ms->m_result, + e->recover_seq); + dlm_receive_message_saved(ls, &e->request, e->recover_seq); mutex_lock(&ls->ls_requestqueue_mutex); @@ -140,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) if (!dlm_no_directory(ls)) return 0; - /* with no directory, the master is likely to change as a part of - recovery; requests to/from the defunct master need to be purged */ - - switch (type) { - case DLM_MSG_REQUEST: - case DLM_MSG_CONVERT: - case DLM_MSG_UNLOCK: - case DLM_MSG_CANCEL: - /* we're no longer the master of this resource, the sender - will resend to the new master (see waiter_needs_recovery) */ - - if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid()) - return 1; - break; - - case DLM_MSG_REQUEST_REPLY: - case DLM_MSG_CONVERT_REPLY: - case DLM_MSG_UNLOCK_REPLY: - case DLM_MSG_CANCEL_REPLY: - case DLM_MSG_GRANT: - /* this reply is from the former master of the resource, - we'll resend to the new master if needed */ - - if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid) - return 1; - break; - } - - return 0; + return 1; } void dlm_purge_requestqueue(struct dlm_ls *ls) diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h index 47d0bda5ac2b..c7975bf4fd43 100644 --- a/fs/gfs2/incore.h +++ b/fs/gfs2/incore.h @@ -556,7 +556,6 @@ struct gfs2_sb_host { struct lm_lockstruct { int ls_jid; unsigned int ls_first; - unsigned int ls_nodir; const struct lm_lockops *ls_ops; dlm_lockspace_t *ls_dlm; diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c index 5f5e70e047dc..4a38db739ca0 100644 --- a/fs/gfs2/lock_dlm.c +++ b/fs/gfs2/lock_dlm.c @@ -1209,8 +1209,6 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *table) fsname++; flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL; - if (ls->ls_nodir) - flags |= DLM_LSFL_NODIR; /* * create/join lockspace diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c index 6f3a18f9e176..018b4c13db92 100644 --- a/fs/gfs2/ops_fstype.c +++ b/fs/gfs2/ops_fstype.c @@ -994,6 +994,7 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent) ls->ls_jid = option; break; case Opt_id: + case Opt_nodir: /* Obsolete, but left for backward compat purposes */ break; case Opt_first: @@ -1002,12 +1003,6 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent) goto hostdata_error; ls->ls_first = option; break; - case Opt_nodir: - ret = match_int(&tmp[0], &option); - if (ret || (option != 0 && option != 1)) - goto hostdata_error; - ls->ls_nodir = option; - break; case Opt_err: default: hostdata_error: |