diff options
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/addr.c | 28 | ||||
-rw-r--r-- | fs/ceph/caps.c | 170 | ||||
-rw-r--r-- | fs/ceph/dir.c | 79 | ||||
-rw-r--r-- | fs/ceph/file.c | 12 | ||||
-rw-r--r-- | fs/ceph/inode.c | 56 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 33 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 3 | ||||
-rw-r--r-- | fs/ceph/snap.c | 8 | ||||
-rw-r--r-- | fs/ceph/super.h | 53 |
9 files changed, 313 insertions, 129 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index dbf07051aacd..b4336b42ce3b 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -299,7 +299,8 @@ unlock: * start an async read(ahead) operation. return nr_pages we submitted * a read for on success, or negative error code. */ -static int start_read(struct inode *inode, struct list_head *page_list, int max) +static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx, + struct list_head *page_list, int max) { struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; @@ -316,7 +317,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) int got = 0; int ret = 0; - if (!current->journal_info) { + if (!rw_ctx) { /* caller of readpages does not hold buffer and read caps * (fadvise, madvise and readahead cases) */ int want = CEPH_CAP_FILE_CACHE; @@ -437,6 +438,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, { struct inode *inode = file_inode(file); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_file_info *ci = file->private_data; + struct ceph_rw_context *rw_ctx; int rc = 0; int max = 0; @@ -449,11 +452,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, if (rc == 0) goto out; + rw_ctx = ceph_find_rw_context(ci); max = fsc->mount_options->rsize >> PAGE_SHIFT; - dout("readpages %p file %p nr_pages %d max %d\n", - inode, file, nr_pages, max); + dout("readpages %p file %p ctx %p nr_pages %d max %d\n", + inode, file, rw_ctx, nr_pages, max); while (!list_empty(page_list)) { - rc = start_read(inode, page_list, max); + rc = start_read(inode, rw_ctx, page_list, max); if (rc < 0) goto out; } @@ -574,7 +578,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) struct ceph_fs_client *fsc; struct ceph_snap_context *snapc, *oldest; loff_t page_off = page_offset(page); - long writeback_stat; int err, len = PAGE_SIZE; struct ceph_writeback_ctl ceph_wbc; @@ -615,8 +618,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n", inode, page, page->index, page_off, len, snapc, snapc->seq); - writeback_stat = atomic_long_inc_return(&fsc->writeback_count); - if (writeback_stat > + if (atomic_long_inc_return(&fsc->writeback_count) > CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC); @@ -651,6 +653,11 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) end_page_writeback(page); ceph_put_wrbuffer_cap_refs(ci, 1, snapc); ceph_put_snap_context(snapc); /* page's reference */ + + if (atomic_long_dec_return(&fsc->writeback_count) < + CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) + clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC); + return err; } @@ -1450,9 +1457,10 @@ static int ceph_filemap_fault(struct vm_fault *vmf) if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || ci->i_inline_version == CEPH_INLINE_NONE) { - current->journal_info = vma->vm_file; + CEPH_DEFINE_RW_CONTEXT(rw_ctx, got); + ceph_add_rw_context(fi, &rw_ctx); ret = filemap_fault(vmf); - current->journal_info = NULL; + ceph_del_rw_context(fi, &rw_ctx); } else ret = -EAGAIN; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index a14b2c974c9e..6582c4507e6c 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -154,13 +154,19 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) spin_unlock(&mdsc->caps_list_lock); } -void ceph_reserve_caps(struct ceph_mds_client *mdsc, +/* + * Called under mdsc->mutex. + */ +int ceph_reserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx, int need) { - int i; + int i, j; struct ceph_cap *cap; int have; int alloc = 0; + int max_caps; + bool trimmed = false; + struct ceph_mds_session *s; LIST_HEAD(newcaps); dout("reserve caps ctx=%p need=%d\n", ctx, need); @@ -179,16 +185,37 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc, spin_unlock(&mdsc->caps_list_lock); for (i = have; i < need; i++) { +retry: cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); - if (!cap) - break; + if (!cap) { + if (!trimmed) { + for (j = 0; j < mdsc->max_sessions; j++) { + s = __ceph_lookup_mds_session(mdsc, j); + if (!s) + continue; + mutex_unlock(&mdsc->mutex); + + mutex_lock(&s->s_mutex); + max_caps = s->s_nr_caps - (need - i); + ceph_trim_caps(mdsc, s, max_caps); + mutex_unlock(&s->s_mutex); + + ceph_put_mds_session(s); + mutex_lock(&mdsc->mutex); + } + trimmed = true; + goto retry; + } else { + pr_warn("reserve caps ctx=%p ENOMEM " + "need=%d got=%d\n", + ctx, need, have + alloc); + goto out_nomem; + } + } list_add(&cap->caps_item, &newcaps); alloc++; } - /* we didn't manage to reserve as much as we needed */ - if (have + alloc != need) - pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n", - ctx, need, have + alloc); + BUG_ON(have + alloc != need); spin_lock(&mdsc->caps_list_lock); mdsc->caps_total_count += alloc; @@ -204,6 +231,24 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc, dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", ctx, mdsc->caps_total_count, mdsc->caps_use_count, mdsc->caps_reserve_count, mdsc->caps_avail_count); + return 0; + +out_nomem: + while (!list_empty(&newcaps)) { + cap = list_first_entry(&newcaps, + struct ceph_cap, caps_item); + list_del(&cap->caps_item); + kmem_cache_free(ceph_cap_cachep, cap); + } + + spin_lock(&mdsc->caps_list_lock); + mdsc->caps_avail_count += have; + mdsc->caps_reserve_count -= have; + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); + return -ENOMEM; } int ceph_unreserve_caps(struct ceph_mds_client *mdsc, @@ -498,7 +543,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, */ if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) { if (issued & CEPH_CAP_FILE_SHARED) - ci->i_shared_gen++; + atomic_inc(&ci->i_shared_gen); if (S_ISDIR(ci->vfs_inode.i_mode)) { dout(" marking %p NOT complete\n", &ci->vfs_inode); __ceph_dir_clear_complete(ci); @@ -577,18 +622,30 @@ void ceph_add_cap(struct inode *inode, } } - if (!ci->i_snap_realm) { + if (!ci->i_snap_realm || + ((flags & CEPH_CAP_FLAG_AUTH) && + realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) { /* * add this inode to the appropriate snap realm */ struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, realmino); if (realm) { + struct ceph_snap_realm *oldrealm = ci->i_snap_realm; + if (oldrealm) { + spin_lock(&oldrealm->inodes_with_caps_lock); + list_del_init(&ci->i_snap_realm_item); + spin_unlock(&oldrealm->inodes_with_caps_lock); + } + spin_lock(&realm->inodes_with_caps_lock); ci->i_snap_realm = realm; list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps); spin_unlock(&realm->inodes_with_caps_lock); + + if (oldrealm) + ceph_put_snap_realm(mdsc, oldrealm); } else { pr_err("ceph_add_cap: couldn't find snap realm %llx\n", realmino); @@ -890,6 +947,11 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check) /* * called under i_ceph_lock */ +static int __ceph_is_single_caps(struct ceph_inode_info *ci) +{ + return rb_first(&ci->i_caps) == rb_last(&ci->i_caps); +} + static int __ceph_is_any_caps(struct ceph_inode_info *ci) { return !RB_EMPTY_ROOT(&ci->i_caps); @@ -1703,21 +1765,24 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, int mds = -1; /* keep track of how far we've gone through i_caps list to avoid an infinite loop on retry */ struct rb_node *p; - int delayed = 0, sent = 0, num; - bool is_delayed = flags & CHECK_CAPS_NODELAY; + int delayed = 0, sent = 0; + bool no_delay = flags & CHECK_CAPS_NODELAY; bool queue_invalidate = false; - bool force_requeue = false; bool tried_invalidate = false; /* if we are unmounting, flush any unused caps immediately. */ if (mdsc->stopping) - is_delayed = true; + no_delay = true; spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_FLUSH) flags |= CHECK_CAPS_FLUSH; + if (!(flags & CHECK_CAPS_AUTHONLY) || + (ci->i_auth_cap && __ceph_is_single_caps(ci))) + __cap_delay_cancel(mdsc, ci); + goto retry_locked; retry: spin_lock(&ci->i_ceph_lock); @@ -1772,7 +1837,7 @@ retry_locked: * have cached pages, but don't want them, then try to invalidate. * If we fail, it's because pages are locked.... try again later. */ - if ((!is_delayed || mdsc->stopping) && + if ((!no_delay || mdsc->stopping) && !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */ inode->i_data.nrpages && /* have cached pages */ @@ -1781,27 +1846,16 @@ retry_locked: !tried_invalidate) { dout("check_caps trying to invalidate on %p\n", inode); if (try_nonblocking_invalidate(inode) < 0) { - if (revoking & (CEPH_CAP_FILE_CACHE| - CEPH_CAP_FILE_LAZYIO)) { - dout("check_caps queuing invalidate\n"); - queue_invalidate = true; - ci->i_rdcache_revoking = ci->i_rdcache_gen; - } else { - dout("check_caps failed to invalidate pages\n"); - /* we failed to invalidate pages. check these - caps again later. */ - force_requeue = true; - __cap_set_timeouts(mdsc, ci); - } + dout("check_caps queuing invalidate\n"); + queue_invalidate = true; + ci->i_rdcache_revoking = ci->i_rdcache_gen; } tried_invalidate = true; goto retry_locked; } - num = 0; for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); - num++; /* avoid looping forever */ if (mds >= cap->mds || @@ -1864,7 +1918,7 @@ retry_locked: cap->mds_wanted == want) continue; /* nope, all good */ - if (is_delayed) + if (no_delay) goto ack; /* delay? */ @@ -1955,15 +2009,8 @@ ack: goto retry; /* retake i_ceph_lock and restart our cap scan. */ } - /* - * Reschedule delayed caps release if we delayed anything, - * otherwise cancel. - */ - if (delayed && is_delayed) - force_requeue = true; /* __send_cap delayed release; requeue */ - if (!delayed && !is_delayed) - __cap_delay_cancel(mdsc, ci); - else if (!is_delayed || force_requeue) + /* Reschedule delayed caps release if we delayed anything */ + if (delayed) __cap_delay_requeue(mdsc, ci); spin_unlock(&ci->i_ceph_lock); @@ -2160,7 +2207,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) u64 flush_tid; int err = 0; int dirty; - int wait = wbc->sync_mode == WB_SYNC_ALL; + int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync); dout("write_inode %p wait=%d\n", inode, wait); if (wait) { @@ -3426,7 +3473,14 @@ retry: */ issued = cap->issued; - WARN_ON(issued != cap->implemented); + if (issued != cap->implemented) + pr_err_ratelimited("handle_cap_export: issued != implemented: " + "ino (%llx.%llx) mds%d seq %d mseq %d " + "issued %s implemented %s\n", + ceph_vinop(inode), mds, cap->seq, cap->mseq, + ceph_cap_string(issued), + ceph_cap_string(cap->implemented)); + tcap = __get_cap_for_mds(ci, target); if (tcap) { @@ -3572,12 +3626,13 @@ retry: if ((ph->flags & CEPH_CAP_FLAG_AUTH) && (ocap->seq != le32_to_cpu(ph->seq) || ocap->mseq != le32_to_cpu(ph->mseq))) { - pr_err("handle_cap_import: mismatched seq/mseq: " - "ino (%llx.%llx) mds%d seq %d mseq %d " - "importer mds%d has peer seq %d mseq %d\n", - ceph_vinop(inode), peer, ocap->seq, - ocap->mseq, mds, le32_to_cpu(ph->seq), - le32_to_cpu(ph->mseq)); + pr_err_ratelimited("handle_cap_import: " + "mismatched seq/mseq: ino (%llx.%llx) " + "mds%d seq %d mseq %d importer mds%d " + "has peer seq %d mseq %d\n", + ceph_vinop(inode), peer, ocap->seq, + ocap->mseq, mds, le32_to_cpu(ph->seq), + le32_to_cpu(ph->mseq)); } __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); } @@ -3939,11 +3994,20 @@ int ceph_encode_inode_release(void **p, struct inode *inode, cap = __get_cap_for_mds(ci, mds); if (cap && __cap_is_valid(cap)) { - if (force || - ((cap->issued & drop) && - (cap->issued & unless) == 0)) { - if ((cap->issued & drop) && - (cap->issued & unless) == 0) { + unless &= cap->issued; + if (unless) { + if (unless & CEPH_CAP_AUTH_EXCL) + drop &= ~CEPH_CAP_AUTH_SHARED; + if (unless & CEPH_CAP_LINK_EXCL) + drop &= ~CEPH_CAP_LINK_SHARED; + if (unless & CEPH_CAP_XATTR_EXCL) + drop &= ~CEPH_CAP_XATTR_SHARED; + if (unless & CEPH_CAP_FILE_EXCL) + drop &= ~CEPH_CAP_FILE_SHARED; + } + + if (force || (cap->issued & drop)) { + if (cap->issued & drop) { int wanted = __ceph_caps_wanted(ci); if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) wanted |= cap->mds_wanted; @@ -3975,7 +4039,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode, *p += sizeof(*rel); ret = 1; } else { - dout("encode_inode_release %p cap %p %s\n", + dout("encode_inode_release %p cap %p %s (noop)\n", inode, cap, ceph_cap_string(cap->issued)); } } diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 8a5266699b67..0c4346806e17 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -173,7 +173,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, * the MDS if/when the directory is modified). */ static int __dcache_readdir(struct file *file, struct dir_context *ctx, - u32 shared_gen) + int shared_gen) { struct ceph_file_info *fi = file->private_data; struct dentry *parent = file->f_path.dentry; @@ -184,7 +184,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, u64 idx = 0; int err = 0; - dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos); + dout("__dcache_readdir %p v%u at %llx\n", dir, (unsigned)shared_gen, ctx->pos); /* search start position */ if (ctx->pos > 2) { @@ -231,11 +231,17 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, goto out; } - di = ceph_dentry(dentry); spin_lock(&dentry->d_lock); - if (di->lease_shared_gen == shared_gen && - d_really_is_positive(dentry) && - fpos_cmp(ctx->pos, di->offset) <= 0) { + di = ceph_dentry(dentry); + if (d_unhashed(dentry) || + d_really_is_negative(dentry) || + di->lease_shared_gen != shared_gen) { + spin_unlock(&dentry->d_lock); + dput(dentry); + err = -EAGAIN; + goto out; + } + if (fpos_cmp(ctx->pos, di->offset) <= 0) { emit_dentry = true; } spin_unlock(&dentry->d_lock); @@ -333,7 +339,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ceph_snap(inode) != CEPH_SNAPDIR && __ceph_dir_is_complete_ordered(ci) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { - u32 shared_gen = ci->i_shared_gen; + int shared_gen = atomic_read(&ci->i_shared_gen); spin_unlock(&ci->i_ceph_lock); err = __dcache_readdir(file, ctx, shared_gen); if (err != -EAGAIN) @@ -381,6 +387,7 @@ more: if (op == CEPH_MDS_OP_READDIR) { req->r_direct_hash = ceph_frag_value(frag); __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); + req->r_inode_drop = CEPH_CAP_FILE_EXCL; } if (fi->last_name) { req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL); @@ -750,7 +757,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, spin_unlock(&ci->i_ceph_lock); dout(" dir %p complete, -ENOENT\n", dir); d_add(dentry, NULL); - di->lease_shared_gen = ci->i_shared_gen; + di->lease_shared_gen = atomic_read(&ci->i_shared_gen); return NULL; } spin_unlock(&ci->i_ceph_lock); @@ -835,7 +842,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_args.mknod.mode = cpu_to_le32(mode); req->r_args.mknod.rdev = cpu_to_le32(rdev); - req->r_dentry_drop = CEPH_CAP_FILE_SHARED; + req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; if (acls.pagelist) { req->r_pagelist = acls.pagelist; @@ -887,7 +894,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry = dget(dentry); req->r_num_caps = 2; - req->r_dentry_drop = CEPH_CAP_FILE_SHARED; + req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) @@ -936,7 +943,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) req->r_parent = dir; set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_args.mkdir.mode = cpu_to_le32(mode); - req->r_dentry_drop = CEPH_CAP_FILE_SHARED; + req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; if (acls.pagelist) { req->r_pagelist = acls.pagelist; @@ -983,7 +990,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; /* release LINK_SHARED on source inode (mds will lock it) */ - req->r_old_inode_drop = CEPH_CAP_LINK_SHARED; + req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; err = ceph_mdsc_do_request(mdsc, dir, req); if (err) { d_drop(dentry); @@ -1096,7 +1103,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; /* release LINK_RDCACHE on source inode (mds will lock it) */ - req->r_old_inode_drop = CEPH_CAP_LINK_SHARED; + req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; if (d_really_is_positive(new_dentry)) req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry)); err = ceph_mdsc_do_request(mdsc, old_dir, req); @@ -1106,16 +1113,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, * do_request, above). If there is no trace, we need * to do it here. */ - - /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_complete(old_dir); - ceph_dir_clear_complete(new_dir); - d_move(old_dentry, new_dentry); - - /* ensure target dentry is invalidated, despite - rehashing bug in vfs_rename_dir */ - ceph_invalidate_dentry_lease(new_dentry); } ceph_mdsc_put_request(req); return err; @@ -1199,12 +1197,12 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) int valid = 0; spin_lock(&ci->i_ceph_lock); - if (ci->i_shared_gen == di->lease_shared_gen) + if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen) valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); spin_unlock(&ci->i_ceph_lock); dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n", - dir, (unsigned)ci->i_shared_gen, dentry, - (unsigned)di->lease_shared_gen, valid); + dir, (unsigned)atomic_read(&ci->i_shared_gen), + dentry, (unsigned)di->lease_shared_gen, valid); return valid; } @@ -1332,24 +1330,37 @@ static void ceph_d_release(struct dentry *dentry) */ static void ceph_d_prune(struct dentry *dentry) { - dout("ceph_d_prune %p\n", dentry); + struct ceph_inode_info *dir_ci; + struct ceph_dentry_info *di; + + dout("ceph_d_prune %pd %p\n", dentry, dentry); /* do we have a valid parent? */ if (IS_ROOT(dentry)) return; - /* if we are not hashed, we don't affect dir's completeness */ - if (d_unhashed(dentry)) + /* we hold d_lock, so d_parent is stable */ + dir_ci = ceph_inode(d_inode(dentry->d_parent)); + if (dir_ci->i_vino.snap == CEPH_SNAPDIR) return; - if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR) + /* who calls d_delete() should also disable dcache readdir */ + if (d_really_is_negative(dentry)) return; - /* - * we hold d_lock, so d_parent is stable, and d_fsdata is never - * cleared until d_release - */ - ceph_dir_clear_complete(d_inode(dentry->d_parent)); + /* d_fsdata does not get cleared until d_release */ + if (!d_unhashed(dentry)) { + __ceph_dir_clear_complete(dir_ci); + return; + } + + /* Disable dcache readdir just in case that someone called d_drop() + * or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED + * properly (dcache readdir is still enabled) */ + di = ceph_dentry(dentry); + if (di->offset > 0 && + di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen)) + __ceph_dir_clear_ordered(dir_ci); } /* diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 5c17125f45c7..6639926eed4e 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -181,6 +181,10 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) return -ENOMEM; } cf->fmode = fmode; + + spin_lock_init(&cf->rw_contexts_lock); + INIT_LIST_HEAD(&cf->rw_contexts); + cf->next_offset = 2; cf->readdir_cache_idx = -1; file->private_data = cf; @@ -396,7 +400,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, req->r_dentry = dget(dentry); req->r_num_caps = 2; if (flags & O_CREAT) { - req->r_dentry_drop = CEPH_CAP_FILE_SHARED; + req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; if (acls.pagelist) { req->r_pagelist = acls.pagelist; @@ -464,6 +468,7 @@ int ceph_release(struct inode *inode, struct file *file) ceph_mdsc_put_request(cf->last_readdir); kfree(cf->last_name); kfree(cf->dir_info); + WARN_ON(!list_empty(&cf->rw_contexts)); kmem_cache_free(ceph_file_cachep, cf); /* wake up anyone waiting for caps on this inode */ @@ -1199,12 +1204,13 @@ again: retry_op = READ_INLINE; } } else { + CEPH_DEFINE_RW_CONTEXT(rw_ctx, got); dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, ceph_cap_string(got)); - current->journal_info = filp; + ceph_add_rw_context(fi, &rw_ctx); ret = generic_file_read_iter(iocb, to); - current->journal_info = NULL; + ceph_del_rw_context(fi, &rw_ctx); } dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index ab81652198c4..c6ec5aa46100 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -494,7 +494,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_wrbuffer_ref = 0; ci->i_wrbuffer_ref_head = 0; atomic_set(&ci->i_filelock_ref, 0); - ci->i_shared_gen = 0; + atomic_set(&ci->i_shared_gen, 0); ci->i_rdcache_gen = 0; ci->i_rdcache_revoking = 0; @@ -1041,7 +1041,7 @@ static void update_dentry_lease(struct dentry *dentry, if (ceph_snap(dir) != CEPH_NOSNAP) goto out_unlock; - di->lease_shared_gen = ceph_inode(dir)->i_shared_gen; + di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen); if (duration == 0) goto out_unlock; @@ -1080,6 +1080,27 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in) BUG_ON(d_inode(dn)); + if (S_ISDIR(in->i_mode)) { + /* If inode is directory, d_splice_alias() below will remove + * 'realdn' from its origin parent. We need to ensure that + * origin parent's readdir cache will not reference 'realdn' + */ + realdn = d_find_any_alias(in); + if (realdn) { + struct ceph_dentry_info *di = ceph_dentry(realdn); + spin_lock(&realdn->d_lock); + + realdn->d_op->d_prune(realdn); + + di->time = jiffies; + di->lease_shared_gen = 0; + di->offset = 0; + + spin_unlock(&realdn->d_lock); + dput(realdn); + } + } + /* dn must be unhashed */ if (!d_unhashed(dn)) d_drop(dn); @@ -1295,8 +1316,8 @@ retry_lookup: if (!rinfo->head->is_target) { dout("fill_trace null dentry\n"); if (d_really_is_positive(dn)) { - ceph_dir_clear_ordered(dir); dout("d_delete %p\n", dn); + ceph_dir_clear_ordered(dir); d_delete(dn); } else if (have_lease) { if (d_unhashed(dn)) @@ -1323,7 +1344,6 @@ retry_lookup: dout(" %p links to %p %llx.%llx, not %llx.%llx\n", dn, d_inode(dn), ceph_vinop(d_inode(dn)), ceph_vinop(in)); - ceph_dir_clear_ordered(dir); d_invalidate(dn); have_lease = false; } @@ -1573,9 +1593,19 @@ retry_lookup: } else if (d_really_is_positive(dn) && (ceph_ino(d_inode(dn)) != tvino.ino || ceph_snap(d_inode(dn)) != tvino.snap)) { + struct ceph_dentry_info *di = ceph_dentry(dn); dout(" dn %p points to wrong inode %p\n", dn, d_inode(dn)); - __ceph_dir_clear_ordered(ci); + + spin_lock(&dn->d_lock); + if (di->offset > 0 && + di->lease_shared_gen == + atomic_read(&ci->i_shared_gen)) { + __ceph_dir_clear_ordered(ci); + di->offset = 0; + } + spin_unlock(&dn->d_lock); + d_delete(dn); dput(dn); goto retry_lookup; @@ -1600,9 +1630,7 @@ retry_lookup: &req->r_caps_reservation); if (ret < 0) { pr_err("fill_inode badness on %p\n", in); - if (d_really_is_positive(dn)) - __ceph_dir_clear_ordered(ci); - else + if (d_really_is_negative(dn)) iput(in); d_drop(dn); err = ret; @@ -2000,8 +2028,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) ceph_encode_timespec(&req->r_args.setattr.atime, &attr->ia_atime); mask |= CEPH_SETATTR_ATIME; - release |= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD | - CEPH_CAP_FILE_WR; + release |= CEPH_CAP_FILE_SHARED | + CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR; } } if (ia_valid & ATTR_MTIME) { @@ -2022,8 +2050,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) ceph_encode_timespec(&req->r_args.setattr.mtime, &attr->ia_mtime); mask |= CEPH_SETATTR_MTIME; - release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD | - CEPH_CAP_FILE_WR; + release |= CEPH_CAP_FILE_SHARED | + CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR; } } if (ia_valid & ATTR_SIZE) { @@ -2041,8 +2069,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) req->r_args.setattr.old_size = cpu_to_le64(inode->i_size); mask |= CEPH_SETATTR_SIZE; - release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD | - CEPH_CAP_FILE_WR; + release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL | + CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR; } } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 1b468250e947..2e8f90f96540 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -604,10 +604,20 @@ static void __register_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, struct inode *dir) { + int ret = 0; + req->r_tid = ++mdsc->last_tid; - if (req->r_num_caps) - ceph_reserve_caps(mdsc, &req->r_caps_reservation, - req->r_num_caps); + if (req->r_num_caps) { + ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, + req->r_num_caps); + if (ret < 0) { + pr_err("__register_request %p " + "failed to reserve caps: %d\n", req, ret); + /* set req->r_err to fail early from __do_request */ + req->r_err = ret; + return; + } + } dout("__register_request %p tid %lld\n", req, req->r_tid); ceph_mdsc_get_request(req); insert_request(&mdsc->request_tree, req); @@ -1545,9 +1555,9 @@ out: /* * Trim session cap count down to some max number. */ -static int trim_caps(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - int max_caps) +int ceph_trim_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + int max_caps) { int trim_caps = session->s_nr_caps - max_caps; @@ -2438,11 +2448,14 @@ out: */ void ceph_invalidate_dir_request(struct ceph_mds_request *req) { - struct inode *inode = req->r_parent; + struct inode *dir = req->r_parent; + struct inode *old_dir = req->r_old_dentry_dir; - dout("invalidate_dir_request %p (complete, lease(s))\n", inode); + dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); - ceph_dir_clear_complete(inode); + ceph_dir_clear_complete(dir); + if (old_dir) + ceph_dir_clear_complete(old_dir); if (req->r_dentry) ceph_invalidate_dentry_lease(req->r_dentry); if (req->r_old_dentry) @@ -2773,7 +2786,7 @@ static void handle_session(struct ceph_mds_session *session, break; case CEPH_SESSION_RECALL_STATE: - trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); + ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); break; case CEPH_SESSION_FLUSHMSG: diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 837ac4b087a0..71e3b783ee6f 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -444,4 +444,7 @@ ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target); extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); +extern int ceph_trim_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + int max_caps); #endif diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 8a2ca41e4b97..07cf95e6413d 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -922,13 +922,17 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, /* * Move the inode to the new realm */ - spin_lock(&realm->inodes_with_caps_lock); + oldrealm = ci->i_snap_realm; + spin_lock(&oldrealm->inodes_with_caps_lock); list_del_init(&ci->i_snap_realm_item); + spin_unlock(&oldrealm->inodes_with_caps_lock); + + spin_lock(&realm->inodes_with_caps_lock); list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps); - oldrealm = ci->i_snap_realm; ci->i_snap_realm = realm; spin_unlock(&realm->inodes_with_caps_lock); + spin_unlock(&ci->i_ceph_lock); ceph_get_snap_realm(mdsc, realm); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 2beeec07fa76..21b2e5b004eb 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -256,7 +256,8 @@ struct ceph_inode_xattr { */ struct ceph_dentry_info { struct ceph_mds_session *lease_session; - u32 lease_gen, lease_shared_gen; + int lease_shared_gen; + u32 lease_gen; u32 lease_seq; unsigned long lease_renew_after, lease_renew_from; struct list_head lru; @@ -353,7 +354,7 @@ struct ceph_inode_info { int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref; int i_wrbuffer_ref, i_wrbuffer_ref_head; atomic_t i_filelock_ref; - u32 i_shared_gen; /* increment each time we get FILE_SHARED */ + atomic_t i_shared_gen; /* increment each time we get FILE_SHARED */ u32 i_rdcache_gen; /* incremented each time we get FILE_CACHE. */ u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */ @@ -648,7 +649,7 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check); extern void ceph_caps_init(struct ceph_mds_client *mdsc); extern void ceph_caps_finalize(struct ceph_mds_client *mdsc); extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta); -extern void ceph_reserve_caps(struct ceph_mds_client *mdsc, +extern int ceph_reserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx, int need); extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx); @@ -668,6 +669,9 @@ struct ceph_file_info { short fmode; /* initialized on open */ short flags; /* CEPH_F_* */ + spinlock_t rw_contexts_lock; + struct list_head rw_contexts; + /* readdir: position within the dir */ u32 frag; struct ceph_mds_request *last_readdir; @@ -684,6 +688,49 @@ struct ceph_file_info { int dir_info_len; }; +struct ceph_rw_context { + struct list_head list; + struct task_struct *thread; + int caps; +}; + +#define CEPH_DEFINE_RW_CONTEXT(_name, _caps) \ + struct ceph_rw_context _name = { \ + .thread = current, \ + .caps = _caps, \ + } + +static inline void ceph_add_rw_context(struct ceph_file_info *cf, + struct ceph_rw_context *ctx) +{ + spin_lock(&cf->rw_contexts_lock); + list_add(&ctx->list, &cf->rw_contexts); + spin_unlock(&cf->rw_contexts_lock); +} + +static inline void ceph_del_rw_context(struct ceph_file_info *cf, + struct ceph_rw_context *ctx) +{ + spin_lock(&cf->rw_contexts_lock); + list_del(&ctx->list); + spin_unlock(&cf->rw_contexts_lock); +} + +static inline struct ceph_rw_context* +ceph_find_rw_context(struct ceph_file_info *cf) +{ + struct ceph_rw_context *ctx, *found = NULL; + spin_lock(&cf->rw_contexts_lock); + list_for_each_entry(ctx, &cf->rw_contexts, list) { + if (ctx->thread == current) { + found = ctx; + break; + } + } + spin_unlock(&cf->rw_contexts_lock); + return found; +} + struct ceph_readdir_cache_control { struct page *page; struct dentry **dentries; |