summaryrefslogtreecommitdiff
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c149
1 files changed, 137 insertions, 12 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 639dd91e7dab..c34719c917b1 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -218,6 +218,7 @@ struct rbd_obj_request {
u32 page_count;
};
};
+ struct page **copyup_pages;
struct ceph_osd_request *osd_req;
@@ -1498,7 +1499,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
obj_request->result = osd_req->r_result;
obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
- WARN_ON(osd_req->r_num_ops != 1); /* For now */
+ BUG_ON(osd_req->r_num_ops > 2);
/*
* We support a 64-bit length, but ultimately it has to be
@@ -1601,6 +1602,48 @@ static struct ceph_osd_request *rbd_osd_req_create(
return osd_req;
}
+/*
+ * Create a copyup osd request based on the information in the
+ * object request supplied. A copyup request has two osd ops,
+ * a copyup method call, and a "normal" write request.
+ */
+static struct ceph_osd_request *
+rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
+{
+ struct rbd_img_request *img_request;
+ struct ceph_snap_context *snapc;
+ struct rbd_device *rbd_dev;
+ struct ceph_osd_client *osdc;
+ struct ceph_osd_request *osd_req;
+
+ rbd_assert(obj_request_img_data_test(obj_request));
+ img_request = obj_request->img_request;
+ rbd_assert(img_request);
+ rbd_assert(img_request_write_test(img_request));
+
+ /* Allocate and initialize the request, for the two ops */
+
+ snapc = img_request->snapc;
+ rbd_dev = img_request->rbd_dev;
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
+ if (!osd_req)
+ return NULL; /* ENOMEM */
+
+ osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+ osd_req->r_callback = rbd_osd_req_callback;
+ osd_req->r_priv = obj_request;
+
+ osd_req->r_oid_len = strlen(obj_request->object_name);
+ rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
+ memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
+
+ osd_req->r_file_layout = rbd_dev->layout; /* struct */
+
+ return osd_req;
+}
+
+
static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
{
ceph_osdc_put_request(osd_req);
@@ -1960,11 +2003,49 @@ out_unwind:
}
static void
+rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
+{
+ struct rbd_img_request *img_request;
+ struct rbd_device *rbd_dev;
+ u64 length;
+ u32 page_count;
+
+ rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+ rbd_assert(obj_request_img_data_test(obj_request));
+ img_request = obj_request->img_request;
+ rbd_assert(img_request);
+
+ rbd_dev = img_request->rbd_dev;
+ rbd_assert(rbd_dev);
+ length = (u64)1 << rbd_dev->header.obj_order;
+ page_count = (u32)calc_pages_for(0, length);
+
+ rbd_assert(obj_request->copyup_pages);
+ ceph_release_page_vector(obj_request->copyup_pages, page_count);
+ obj_request->copyup_pages = NULL;
+
+ /*
+ * We want the transfer count to reflect the size of the
+ * original write request. There is no such thing as a
+ * successful short write, so if the request was successful
+ * we can just set it to the originally-requested length.
+ */
+ if (!obj_request->result)
+ obj_request->xferred = obj_request->length;
+
+ /* Finish up with the normal image object callback */
+
+ rbd_img_obj_callback(obj_request);
+}
+
+static void
rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
{
struct rbd_obj_request *orig_request;
+ struct ceph_osd_request *osd_req;
+ struct ceph_osd_client *osdc;
+ struct rbd_device *rbd_dev;
struct page **pages;
- u32 page_count;
int result;
u64 obj_size;
u64 xferred;
@@ -1979,25 +2060,60 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
orig_request = img_request->obj_request;
rbd_assert(orig_request != NULL);
-
+ rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
result = img_request->result;
obj_size = img_request->length;
xferred = img_request->xferred;
+ rbd_dev = img_request->rbd_dev;
+ rbd_assert(rbd_dev);
+ rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
+
rbd_img_request_put(img_request);
- obj_request_existence_set(orig_request, true);
+ if (result)
+ goto out_err;
+
+ /* Allocate the new copyup osd request for the original request */
- page_count = (u32)calc_pages_for(0, obj_size);
- ceph_release_page_vector(pages, page_count);
+ result = -ENOMEM;
+ rbd_assert(!orig_request->osd_req);
+ osd_req = rbd_osd_req_create_copyup(orig_request);
+ if (!osd_req)
+ goto out_err;
+ orig_request->osd_req = osd_req;
+ orig_request->copyup_pages = pages;
- /* Resubmit the original request (for now). */
+ /* Initialize the copyup op */
- orig_request->result = rbd_img_obj_request_submit(orig_request);
- if (orig_request->result) {
- obj_request_done_set(orig_request);
- rbd_obj_request_complete(orig_request);
- }
+ osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
+ osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+ false, false);
+
+ /* Then the original write request op */
+
+ osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
+ orig_request->offset,
+ orig_request->length, 0, 0);
+ osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
+ orig_request->length);
+
+ rbd_osd_req_format_write(orig_request);
+
+ /* All set, send it off. */
+
+ orig_request->callback = rbd_img_obj_copyup_callback;
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ result = rbd_obj_request_submit(osdc, orig_request);
+ if (!result)
+ return;
+out_err:
+ /* Record the error code and complete the request */
+
+ orig_request->result = result;
+ orig_request->xferred = 0;
+ obj_request_done_set(orig_request);
+ rbd_obj_request_complete(orig_request);
}
/*
@@ -2034,6 +2150,15 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
rbd_assert(rbd_dev->parent != NULL);
/*
+ * First things first. The original osd request is of no
+ * use to use any more, we'll need a new one that can hold
+ * the two ops in a copyup request. We'll get that later,
+ * but for now we can release the old one.
+ */
+ rbd_osd_req_destroy(obj_request->osd_req);
+ obj_request->osd_req = NULL;
+
+ /*
* Determine the byte range covered by the object in the
* child image to which the original request was to be sent.
*/