summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2016-04-28 16:07:25 +0200
committerIlya Dryomov <idryomov@gmail.com>2016-05-26 01:12:27 +0200
commit42c1b1240326cbea86f15f5d4ce565d8b54be31f (patch)
tree0bae037f5d57712f1bfb843d7840b5f566aff369 /net
parente5253a7bde13788d9dc75f42eb47ea119af5609f (diff)
libceph: handle_one_map()
Separate osdmap handling from decoding and iterating over a bag of maps in a fresh MOSDMap message. This sets up the scene for the updated OSD client. Of particular importance here is the addition of pi->was_full, which can be used to answer "did this pool go full -> not-full in this map?". This is the key bit for supporting pool quotas. We won't be able to downgrade map_sem for much longer, so drop downgrade_write(). Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/mon_client.c8
-rw-r--r--net/ceph/osd_client.c186
2 files changed, 138 insertions, 56 deletions
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index a426a4b03e75..98bfbe1f6807 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -376,6 +376,14 @@ void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
}
EXPORT_SYMBOL(ceph_monc_got_map);
+void ceph_monc_renew_subs(struct ceph_mon_client *monc)
+{
+ mutex_lock(&monc->mutex);
+ __send_subscribe(monc);
+ mutex_unlock(&monc->mutex);
+}
+EXPORT_SYMBOL(ceph_monc_renew_subs);
+
/*
* Register interest in the next osdmap
*/
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 9c35fd84a410..4227c55226c3 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1245,6 +1245,21 @@ static bool __pool_full(struct ceph_pg_pool_info *pi)
return pi->flags & CEPH_POOL_FLAG_FULL;
}
+static bool have_pool_full(struct ceph_osd_client *osdc)
+{
+ struct rb_node *n;
+
+ for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
+ struct ceph_pg_pool_info *pi =
+ rb_entry(n, struct ceph_pg_pool_info, node);
+
+ if (__pool_full(pi))
+ return true;
+ }
+
+ return false;
+}
+
/*
* Returns whether a request should be blocked from being sent
* based on the current osdmap and osd_client settings.
@@ -1639,6 +1654,26 @@ static void __send_queued(struct ceph_osd_client *osdc)
}
}
+static void maybe_request_map(struct ceph_osd_client *osdc)
+{
+ bool continuous = false;
+
+ WARN_ON(!osdc->osdmap->epoch);
+
+ if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
+ dout("%s osdc %p continuous\n", __func__, osdc);
+ continuous = true;
+ } else {
+ dout("%s osdc %p onetime\n", __func__, osdc);
+ }
+
+ if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+ osdc->osdmap->epoch + 1, continuous))
+ ceph_monc_renew_subs(&osdc->client->monc);
+}
+
/*
* Caller should hold map_sem for read and request_mutex.
*/
@@ -2119,6 +2154,18 @@ out_unlock:
up_read(&osdc->map_sem);
}
+static void set_pool_was_full(struct ceph_osd_client *osdc)
+{
+ struct rb_node *n;
+
+ for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
+ struct ceph_pg_pool_info *pi =
+ rb_entry(n, struct ceph_pg_pool_info, node);
+
+ pi->was_full = __pool_full(pi);
+ }
+}
+
static void reset_changed_osds(struct ceph_osd_client *osdc)
{
struct rb_node *p, *n;
@@ -2237,6 +2284,57 @@ static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
}
}
+static int handle_one_map(struct ceph_osd_client *osdc,
+ void *p, void *end, bool incremental)
+{
+ struct ceph_osdmap *newmap;
+ struct rb_node *n;
+ bool skipped_map = false;
+ bool was_full;
+
+ was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+ set_pool_was_full(osdc);
+
+ if (incremental)
+ newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
+ else
+ newmap = ceph_osdmap_decode(&p, end);
+ if (IS_ERR(newmap))
+ return PTR_ERR(newmap);
+
+ if (newmap != osdc->osdmap) {
+ /*
+ * Preserve ->was_full before destroying the old map.
+ * For pools that weren't in the old map, ->was_full
+ * should be false.
+ */
+ for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
+ struct ceph_pg_pool_info *pi =
+ rb_entry(n, struct ceph_pg_pool_info, node);
+ struct ceph_pg_pool_info *old_pi;
+
+ old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
+ if (old_pi)
+ pi->was_full = old_pi->was_full;
+ else
+ WARN_ON(pi->was_full);
+ }
+
+ if (osdc->osdmap->epoch &&
+ osdc->osdmap->epoch + 1 < newmap->epoch) {
+ WARN_ON(incremental);
+ skipped_map = true;
+ }
+
+ ceph_osdmap_destroy(osdc->osdmap);
+ osdc->osdmap = newmap;
+ }
+
+ was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+ kick_requests(osdc, skipped_map, was_full);
+
+ return 0;
+}
/*
* Process updated osd map.
@@ -2247,27 +2345,29 @@ static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
*/
void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{
- void *p, *end, *next;
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front.iov_len;
u32 nr_maps, maplen;
u32 epoch;
- struct ceph_osdmap *newmap = NULL, *oldmap;
- int err;
struct ceph_fsid fsid;
- bool was_full;
+ bool handled_incremental = false;
+ bool was_pauserd, was_pausewr;
+ bool pauserd, pausewr;
+ int err;
- dout("handle_map have %u\n", osdc->osdmap->epoch);
- p = msg->front.iov_base;
- end = p + msg->front.iov_len;
+ dout("%s have %u\n", __func__, osdc->osdmap->epoch);
+ down_write(&osdc->map_sem);
/* verify fsid */
ceph_decode_need(&p, end, sizeof(fsid), bad);
ceph_decode_copy(&p, &fsid, sizeof(fsid));
if (ceph_check_fsid(osdc->client, &fsid) < 0)
- return;
-
- down_write(&osdc->map_sem);
+ goto bad;
- was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+ was_pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+ was_pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+ have_pool_full(osdc);
/* incremental maps */
ceph_decode_32_safe(&p, end, nr_maps, bad);
@@ -2277,33 +2377,22 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
epoch = ceph_decode_32(&p);
maplen = ceph_decode_32(&p);
ceph_decode_need(&p, end, maplen, bad);
- next = p + maplen;
- if (osdc->osdmap->epoch+1 == epoch) {
+ if (osdc->osdmap->epoch &&
+ osdc->osdmap->epoch + 1 == epoch) {
dout("applying incremental map %u len %d\n",
epoch, maplen);
- newmap = osdmap_apply_incremental(&p, next,
- osdc->osdmap);
- if (IS_ERR(newmap)) {
- err = PTR_ERR(newmap);
+ err = handle_one_map(osdc, p, p + maplen, true);
+ if (err)
goto bad;
- }
- BUG_ON(!newmap);
- if (newmap != osdc->osdmap) {
- ceph_osdmap_destroy(osdc->osdmap);
- osdc->osdmap = newmap;
- }
- was_full = was_full ||
- ceph_osdmap_flag(osdc->osdmap,
- CEPH_OSDMAP_FULL);
- kick_requests(osdc, 0, was_full);
+ handled_incremental = true;
} else {
dout("ignoring incremental map %u len %d\n",
epoch, maplen);
}
- p = next;
+ p += maplen;
nr_maps--;
}
- if (newmap)
+ if (handled_incremental)
goto done;
/* full maps */
@@ -2322,50 +2411,35 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
"older than our %u\n", epoch, maplen,
osdc->osdmap->epoch);
} else {
- int skipped_map = 0;
-
dout("taking full map %u len %d\n", epoch, maplen);
- newmap = ceph_osdmap_decode(&p, p+maplen);
- if (IS_ERR(newmap)) {
- err = PTR_ERR(newmap);
+ err = handle_one_map(osdc, p, p + maplen, false);
+ if (err)
goto bad;
- }
- BUG_ON(!newmap);
- oldmap = osdc->osdmap;
- osdc->osdmap = newmap;
- if (oldmap) {
- if (oldmap->epoch + 1 < newmap->epoch)
- skipped_map = 1;
- ceph_osdmap_destroy(oldmap);
- }
- was_full = was_full ||
- ceph_osdmap_flag(osdc->osdmap,
- CEPH_OSDMAP_FULL);
- kick_requests(osdc, skipped_map, was_full);
}
p += maplen;
nr_maps--;
}
done:
- downgrade_write(&osdc->map_sem);
- ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
- osdc->osdmap->epoch);
-
/*
* subscribe to subsequent osdmap updates if full to ensure
* we find out when we are no longer full and stop returning
* ENOSPC.
*/
- if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
- ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
- ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
- ceph_monc_request_next_osdmap(&osdc->client->monc);
+ pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+ pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+ have_pool_full(osdc);
+ if (was_pauserd || was_pausewr || pauserd || pausewr)
+ maybe_request_map(osdc);
mutex_lock(&osdc->request_mutex);
__send_queued(osdc);
mutex_unlock(&osdc->request_mutex);
- up_read(&osdc->map_sem);
+
+ ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+ osdc->osdmap->epoch);
+ up_write(&osdc->map_sem);
wake_up_all(&osdc->client->auth_wq);
return;