diff options
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 283 |
1 files changed, 255 insertions, 28 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2b4b32aaa893..010ff3bd58ad 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, msg_size = 4 + 4 + 8 + 8 + 4+8; msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pg_t */ - msg_size += 4 + MAX_OBJ_NAME_SIZE; + msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); msg_size += 8; /* snapid */ msg_size += 8; /* snap_seq */ @@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); + req->r_base_oloc.pool = -1; + req->r_target_oloc.pool = -1; + /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -761,11 +764,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (num_ops > 1) osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); - req->r_file_layout = *layout; /* keep a copy */ + req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); - snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", - vino.ino, objnum); - req->r_oid_len = strlen(req->r_oid); + snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), + "%llx.%08llx", vino.ino, objnum); + req->r_base_oid.name_len = strlen(req->r_base_oid.name); return req; } @@ -1044,8 +1047,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) !ceph_con_opened(&osd->o_con)) { struct ceph_osd_request *req; - dout(" osd addr hasn't changed and connection never opened," - " letting msgr retry"); + dout("osd addr hasn't changed and connection never opened, " + "letting msgr retry\n"); /* touch each r_stamp for handle_timeout()'s benfit */ list_for_each_entry(req, &osd->o_requests, r_osd_item) req->r_stamp = jiffies; @@ -1232,6 +1235,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, EXPORT_SYMBOL(ceph_osdc_set_request_linger); /* + * Returns whether a request should be blocked from being sent + * based on the current osdmap and osd_client settings. + * + * Caller should hold map_sem for read. + */ +static bool __req_should_be_paused(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); + bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); + return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || + (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); +} + +/* + * Calculate mapping of a request to a PG. Takes tiering into account. + */ +static int __calc_request_pg(struct ceph_osdmap *osdmap, + struct ceph_osd_request *req, + struct ceph_pg *pg_out) +{ + bool need_check_tiering; + + need_check_tiering = false; + if (req->r_target_oloc.pool == -1) { + req->r_target_oloc = req->r_base_oloc; /* struct */ + need_check_tiering = true; + } + if (req->r_target_oid.name_len == 0) { + ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); + need_check_tiering = true; + } + + if (need_check_tiering && + (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + struct ceph_pg_pool_info *pi; + + pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); + if (pi) { + if ((req->r_flags & CEPH_OSD_FLAG_READ) && + pi->read_tier >= 0) + req->r_target_oloc.pool = pi->read_tier; + if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + pi->write_tier >= 0) + req->r_target_oloc.pool = pi->write_tier; + } + /* !pi is caught in ceph_oloc_oid_to_pg() */ + } + + return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, + &req->r_target_oid, pg_out); +} + +/* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct * (as needed), and set the request r_osd appropriately. If there is * no up osd, set r_osd to NULL. Move the request to the appropriate list @@ -1248,10 +1306,11 @@ static int __map_request(struct ceph_osd_client *osdc, int acting[CEPH_PG_MAX_SIZE]; int o = -1, num = 0; int err; + bool was_paused; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, - ceph_file_layout_pg_pool(req->r_file_layout)); + + err = __calc_request_pg(osdc->osdmap, req, &pgid); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; @@ -1264,12 +1323,18 @@ static int __map_request(struct ceph_osd_client *osdc, num = err; } + was_paused = req->r_paused; + req->r_paused = __req_should_be_paused(osdc, req); + if (was_paused && !req->r_paused) + force_resend = 1; + if ((!force_resend && req->r_osd && req->r_osd->o_osd == o && req->r_sent >= req->r_osd->o_incarnation && req->r_num_pg_osds == num && memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || - (req->r_osd == NULL && o == -1)) + (req->r_osd == NULL && o == -1) || + req->r_paused) return 0; /* no change */ dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", @@ -1331,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc, /* fill in message content that changes each time we send it */ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); + put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); p = req->r_request_pgid; ceph_encode_64(&p, req->r_pgid.pool); ceph_encode_32(&p, req->r_pgid.seed); @@ -1432,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work) round_jiffies_relative(delay)); } +static int ceph_oloc_decode(void **p, void *end, + struct ceph_object_locator *oloc) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret = 0; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_v < 3) { + pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + if (struct_cv > 6) { + pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + oloc->pool = ceph_decode_64(p); + *p += 4; /* skip preferred */ + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::key is set\n"); + goto e_inval; + } + + if (struct_v >= 5) { + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_object_locator::nspace is set\n"); + goto e_inval; + } + } + + if (struct_v >= 6) { + s64 hash = ceph_decode_64(p); + if (hash != -1) { + pr_warn("ceph_object_locator::hash is set\n"); + goto e_inval; + } + } + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static int ceph_redirect_decode(void **p, void *end, + struct ceph_request_redirect *redir) +{ + u8 struct_v, struct_cv; + u32 len; + void *struct_end; + int ret; + + ceph_decode_need(p, end, 1 + 1 + 4, e_inval); + struct_v = ceph_decode_8(p); + struct_cv = ceph_decode_8(p); + if (struct_cv > 1) { + pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", + struct_v, struct_cv); + goto e_inval; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, e_inval); + struct_end = *p + len; + + ret = ceph_oloc_decode(p, end, &redir->oloc); + if (ret) + goto out; + + len = ceph_decode_32(p); + if (len > 0) { + pr_warn("ceph_request_redirect::object_name is set\n"); + goto e_inval; + } + + len = ceph_decode_32(p); + *p += len; /* skip osd_instructions */ + + /* skip the rest */ + *p = struct_end; +out: + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + static void complete_request(struct ceph_osd_request *req) { complete_all(&req->r_safe_completion); /* fsync waiter */ @@ -1446,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, { void *p, *end; struct ceph_osd_request *req; + struct ceph_request_redirect redir; u64 tid; int object_len; unsigned int numops; @@ -1525,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, for (i = 0; i < numops; i++) req->r_reply_op_result[i] = ceph_decode_32(&p); - already_completed = req->r_got_reply; + if (le16_to_cpu(msg->hdr.version) >= 6) { + p += 8 + 4; /* skip replay_version */ + p += 8; /* skip user_version */ - if (!req->r_got_reply) { + err = ceph_redirect_decode(&p, end, &redir); + if (err) + goto bad_put; + } else { + redir.oloc.pool = -1; + } + + if (redir.oloc.pool != -1) { + dout("redirect pool %lld\n", redir.oloc.pool); + + __unregister_request(osdc, req); + mutex_unlock(&osdc->request_mutex); + + req->r_target_oloc = redir.oloc; /* struct */ + + /* + * Start redirect requests with nofail=true. If + * mapping fails, request will end up on the notarget + * list, waiting for the new osdmap (which can take + * a while), even though the original request mapped + * successfully. In the future we might want to follow + * original request's nofail setting here. + */ + err = ceph_osdc_start_request(osdc, req, true); + BUG_ON(err); + goto done; + } + + already_completed = req->r_got_reply; + if (!req->r_got_reply) { req->r_result = result; dout("handle_reply result %d bytes %d\n", req->r_result, bytes); @@ -1581,6 +1781,13 @@ done: return; bad_put: + req->r_result = -EIO; + __unregister_request(osdc, req); + if (req->r_callback) + req->r_callback(req, msg); + else + complete_all(&req->r_completion); + complete_request(req); ceph_osdc_put_request(req); bad_mutex: mutex_unlock(&osdc->request_mutex); @@ -1613,14 +1820,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) * * Caller should hold map_sem for read. */ -static void kick_requests(struct ceph_osd_client *osdc, int force_resend) +static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, + bool force_resend_writes) { struct ceph_osd_request *req, *nreq; struct rb_node *p; int needmap = 0; int err; + bool force_resend_req; - dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); + dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", + force_resend_writes ? " (force resend writes)" : ""); mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; ) { req = rb_entry(p, struct ceph_osd_request, r_node); @@ -1645,7 +1855,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) continue; } - err = __map_request(osdc, req, force_resend); + force_resend_req = force_resend || + (force_resend_writes && + req->r_flags & CEPH_OSD_FLAG_WRITE); + err = __map_request(osdc, req, force_resend_req); if (err < 0) continue; /* error */ if (req->r_osd == NULL) { @@ -1665,7 +1878,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) r_linger_item) { dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); - err = __map_request(osdc, req, force_resend); + err = __map_request(osdc, req, + force_resend || force_resend_writes); dout("__map_request returned %d\n", err); if (err == 0) continue; /* no change and no osd was specified */ @@ -1707,6 +1921,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) struct ceph_osdmap *newmap = NULL, *oldmap; int err; struct ceph_fsid fsid; + bool was_full; dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); p = msg->front.iov_base; @@ -1720,6 +1935,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) down_write(&osdc->map_sem); + was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); + /* incremental maps */ ceph_decode_32_safe(&p, end, nr_maps, bad); dout(" %d inc maps\n", nr_maps); @@ -1744,7 +1961,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_osdmap_destroy(osdc->osdmap); osdc->osdmap = newmap; } - kick_requests(osdc, 0); + was_full = was_full || + ceph_osdmap_flag(osdc->osdmap, + CEPH_OSDMAP_FULL); + kick_requests(osdc, 0, was_full); } else { dout("ignoring incremental map %u len %d\n", epoch, maplen); @@ -1787,7 +2007,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) skipped_map = 1; ceph_osdmap_destroy(oldmap); } - kick_requests(osdc, skipped_map); + was_full = was_full || + ceph_osdmap_flag(osdc->osdmap, + CEPH_OSDMAP_FULL); + kick_requests(osdc, skipped_map, was_full); } p += maplen; nr_maps--; @@ -1804,7 +2027,9 @@ done: * we find out when we are no longer full and stop returning * ENOSPC. */ - if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) ceph_monc_request_next_osdmap(&osdc->client->monc); mutex_lock(&osdc->request_mutex); @@ -2068,10 +2293,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, ceph_encode_32(&p, -1); /* preferred */ /* oid */ - ceph_encode_32(&p, req->r_oid_len); - memcpy(p, req->r_oid, req->r_oid_len); - dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); - p += req->r_oid_len; + ceph_encode_32(&p, req->r_base_oid.name_len); + memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); + dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, + req->r_base_oid.name, req->r_base_oid.name_len); + p += req->r_base_oid.name_len; /* ops--can imply data */ ceph_encode_16(&p, (u16)req->r_num_ops); @@ -2454,7 +2680,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_osd_client *osdc = osd->o_osdc; struct ceph_msg *m; struct ceph_osd_request *req; - int front = le32_to_cpu(hdr->front_len); + int front_len = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid; @@ -2474,12 +2700,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); - if (front > req->r_reply->front.iov_len) { + if (front_len > req->r_reply->front_alloc_len) { pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", - front, (int)req->r_reply->front.iov_len, + front_len, req->r_reply->front_alloc_len, (unsigned int)con->peer_name.type, le64_to_cpu(con->peer_name.num)); - m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); + m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, + false); if (!m) goto out; ceph_msg_put(req->r_reply); |