summaryrefslogtreecommitdiff
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/auth_x.c13
-rw-r--r--net/ceph/ceph_common.c14
-rw-r--r--net/ceph/crush/crush.c3
-rw-r--r--net/ceph/crush/mapper.c81
-rw-r--r--net/ceph/debugfs.c112
-rw-r--r--net/ceph/messenger.c36
-rw-r--r--net/ceph/mon_client.c12
-rw-r--r--net/ceph/osd_client.c905
-rw-r--r--net/ceph/osdmap.c841
9 files changed, 1683 insertions, 334 deletions
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index 2034fb926670..8757fb87dab8 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -151,7 +151,7 @@ static int process_one_ticket(struct ceph_auth_client *ac,
struct timespec validity;
void *tp, *tpend;
void **ptp;
- struct ceph_crypto_key new_session_key;
+ struct ceph_crypto_key new_session_key = { 0 };
struct ceph_buffer *new_ticket_blob;
unsigned long new_expires, new_renew_after;
u64 new_secret_id;
@@ -215,6 +215,9 @@ static int process_one_ticket(struct ceph_auth_client *ac,
dout(" ticket blob is %d bytes\n", dlen);
ceph_decode_need(ptp, tpend, 1 + sizeof(u64), bad);
blob_struct_v = ceph_decode_8(ptp);
+ if (blob_struct_v != 1)
+ goto bad;
+
new_secret_id = ceph_decode_64(ptp);
ret = ceph_decode_buffer(&new_ticket_blob, ptp, tpend);
if (ret)
@@ -234,13 +237,13 @@ static int process_one_ticket(struct ceph_auth_client *ac,
type, ceph_entity_type_name(type), th->secret_id,
(int)th->ticket_blob->vec.iov_len);
xi->have_keys |= th->service;
-
-out:
- return ret;
+ return 0;
bad:
ret = -EINVAL;
- goto out;
+out:
+ ceph_crypto_key_destroy(&new_session_key);
+ return ret;
}
static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac,
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 26ab58665f77..5c036d2f401e 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -56,19 +56,6 @@ static const struct kernel_param_ops param_ops_supported_features = {
module_param_cb(supported_features, &param_ops_supported_features, NULL,
S_IRUGO);
-/*
- * find filename portion of a path (/foo/bar/baz -> baz)
- */
-const char *ceph_file_part(const char *s, int len)
-{
- const char *e = s + len;
-
- while (e != s && *(e-1) != '/')
- e--;
- return e;
-}
-EXPORT_SYMBOL(ceph_file_part);
-
const char *ceph_msg_type_name(int type)
{
switch (type) {
@@ -98,6 +85,7 @@ const char *ceph_msg_type_name(int type)
case CEPH_MSG_OSD_OP: return "osd_op";
case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
+ case CEPH_MSG_OSD_BACKOFF: return "osd_backoff";
default: return "unknown";
}
}
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 5bf94c04f645..4b428f46a8ca 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -1,6 +1,7 @@
#ifdef __KERNEL__
# include <linux/slab.h>
# include <linux/crush/crush.h>
+void clear_choose_args(struct crush_map *c);
#else
# include "crush_compat.h"
# include "crush.h"
@@ -127,6 +128,8 @@ void crush_destroy(struct crush_map *map)
#ifndef __KERNEL__
kfree(map->choose_tries);
+#else
+ clear_choose_args(map);
#endif
kfree(map);
}
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index b5cd8c21bfdf..746b145bfd11 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -302,19 +302,42 @@ static __u64 crush_ln(unsigned int xin)
*
*/
+static __u32 *get_choose_arg_weights(const struct crush_bucket_straw2 *bucket,
+ const struct crush_choose_arg *arg,
+ int position)
+{
+ if (!arg || !arg->weight_set || arg->weight_set_size == 0)
+ return bucket->item_weights;
+
+ if (position >= arg->weight_set_size)
+ position = arg->weight_set_size - 1;
+ return arg->weight_set[position].weights;
+}
+
+static __s32 *get_choose_arg_ids(const struct crush_bucket_straw2 *bucket,
+ const struct crush_choose_arg *arg)
+{
+ if (!arg || !arg->ids)
+ return bucket->h.items;
+
+ return arg->ids;
+}
+
static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
- int x, int r)
+ int x, int r,
+ const struct crush_choose_arg *arg,
+ int position)
{
unsigned int i, high = 0;
unsigned int u;
- unsigned int w;
__s64 ln, draw, high_draw = 0;
+ __u32 *weights = get_choose_arg_weights(bucket, arg, position);
+ __s32 *ids = get_choose_arg_ids(bucket, arg);
for (i = 0; i < bucket->h.size; i++) {
- w = bucket->item_weights[i];
- if (w) {
- u = crush_hash32_3(bucket->h.hash, x,
- bucket->h.items[i], r);
+ dprintk("weight 0x%x item %d\n", weights[i], ids[i]);
+ if (weights[i]) {
+ u = crush_hash32_3(bucket->h.hash, x, ids[i], r);
u &= 0xffff;
/*
@@ -335,7 +358,7 @@ static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
* weight means a larger (less negative) value
* for draw.
*/
- draw = div64_s64(ln, w);
+ draw = div64_s64(ln, weights[i]);
} else {
draw = S64_MIN;
}
@@ -352,7 +375,9 @@ static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
static int crush_bucket_choose(const struct crush_bucket *in,
struct crush_work_bucket *work,
- int x, int r)
+ int x, int r,
+ const struct crush_choose_arg *arg,
+ int position)
{
dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
BUG_ON(in->size == 0);
@@ -374,7 +399,7 @@ static int crush_bucket_choose(const struct crush_bucket *in,
case CRUSH_BUCKET_STRAW2:
return bucket_straw2_choose(
(const struct crush_bucket_straw2 *)in,
- x, r);
+ x, r, arg, position);
default:
dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
return in->items[0];
@@ -436,7 +461,8 @@ static int crush_choose_firstn(const struct crush_map *map,
unsigned int vary_r,
unsigned int stable,
int *out2,
- int parent_r)
+ int parent_r,
+ const struct crush_choose_arg *choose_args)
{
int rep;
unsigned int ftotal, flocal;
@@ -486,7 +512,10 @@ static int crush_choose_firstn(const struct crush_map *map,
else
item = crush_bucket_choose(
in, work->work[-1-in->id],
- x, r);
+ x, r,
+ (choose_args ?
+ &choose_args[-1-in->id] : 0),
+ outpos);
if (item >= map->max_devices) {
dprintk(" bad item %d\n", item);
skip_rep = 1;
@@ -543,7 +572,8 @@ static int crush_choose_firstn(const struct crush_map *map,
vary_r,
stable,
NULL,
- sub_r) <= outpos)
+ sub_r,
+ choose_args) <= outpos)
/* didn't get leaf */
reject = 1;
} else {
@@ -620,7 +650,8 @@ static void crush_choose_indep(const struct crush_map *map,
unsigned int recurse_tries,
int recurse_to_leaf,
int *out2,
- int parent_r)
+ int parent_r,
+ const struct crush_choose_arg *choose_args)
{
const struct crush_bucket *in = bucket;
int endpos = outpos + left;
@@ -692,7 +723,10 @@ static void crush_choose_indep(const struct crush_map *map,
item = crush_bucket_choose(
in, work->work[-1-in->id],
- x, r);
+ x, r,
+ (choose_args ?
+ &choose_args[-1-in->id] : 0),
+ outpos);
if (item >= map->max_devices) {
dprintk(" bad item %d\n", item);
out[rep] = CRUSH_ITEM_NONE;
@@ -746,7 +780,8 @@ static void crush_choose_indep(const struct crush_map *map,
x, 1, numrep, 0,
out2, rep,
recurse_tries, 0,
- 0, NULL, r);
+ 0, NULL, r,
+ choose_args);
if (out2[rep] == CRUSH_ITEM_NONE) {
/* placed nothing; no leaf */
break;
@@ -823,7 +858,7 @@ void crush_init_workspace(const struct crush_map *map, void *v)
* set the pointer first and then reserve the space for it to
* point to by incrementing the point.
*/
- v += sizeof(struct crush_work *);
+ v += sizeof(struct crush_work);
w->work = v;
v += map->max_buckets * sizeof(struct crush_work_bucket *);
for (b = 0; b < map->max_buckets; ++b) {
@@ -854,11 +889,12 @@ void crush_init_workspace(const struct crush_map *map, void *v)
* @weight: weight vector (for map leaves)
* @weight_max: size of weight vector
* @cwin: pointer to at least crush_work_size() bytes of memory
+ * @choose_args: weights and ids for each known bucket
*/
int crush_do_rule(const struct crush_map *map,
int ruleno, int x, int *result, int result_max,
const __u32 *weight, int weight_max,
- void *cwin)
+ void *cwin, const struct crush_choose_arg *choose_args)
{
int result_len;
struct crush_work *cw = cwin;
@@ -968,11 +1004,6 @@ int crush_do_rule(const struct crush_map *map,
for (i = 0; i < wsize; i++) {
int bno;
- /*
- * see CRUSH_N, CRUSH_N_MINUS macros.
- * basically, numrep <= 0 means relative to
- * the provided result_max
- */
numrep = curstep->arg1;
if (numrep <= 0) {
numrep += result_max;
@@ -1013,7 +1044,8 @@ int crush_do_rule(const struct crush_map *map,
vary_r,
stable,
c+osize,
- 0);
+ 0,
+ choose_args);
} else {
out_size = ((numrep < (result_max-osize)) ?
numrep : (result_max-osize));
@@ -1030,7 +1062,8 @@ int crush_do_rule(const struct crush_map *map,
choose_leaf_tries : 1,
recurse_to_leaf,
c+osize,
- 0);
+ 0,
+ choose_args);
osize += out_size;
}
}
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 71ba13927b3d..fa5233e0d01c 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -77,7 +77,7 @@ static int osdmap_show(struct seq_file *s, void *p)
}
for (i = 0; i < map->max_osd; i++) {
struct ceph_entity_addr *addr = &map->osd_addr[i];
- int state = map->osd_state[i];
+ u32 state = map->osd_state[i];
char sb[64];
seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n",
@@ -104,6 +104,29 @@ static int osdmap_show(struct seq_file *s, void *p)
seq_printf(s, "primary_temp %llu.%x %d\n", pg->pgid.pool,
pg->pgid.seed, pg->primary_temp.osd);
}
+ for (n = rb_first(&map->pg_upmap); n; n = rb_next(n)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(n, struct ceph_pg_mapping, node);
+
+ seq_printf(s, "pg_upmap %llu.%x [", pg->pgid.pool,
+ pg->pgid.seed);
+ for (i = 0; i < pg->pg_upmap.len; i++)
+ seq_printf(s, "%s%d", (i == 0 ? "" : ","),
+ pg->pg_upmap.osds[i]);
+ seq_printf(s, "]\n");
+ }
+ for (n = rb_first(&map->pg_upmap_items); n; n = rb_next(n)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(n, struct ceph_pg_mapping, node);
+
+ seq_printf(s, "pg_upmap_items %llu.%x [", pg->pgid.pool,
+ pg->pgid.seed);
+ for (i = 0; i < pg->pg_upmap_items.len; i++)
+ seq_printf(s, "%s%d->%d", (i == 0 ? "" : ","),
+ pg->pg_upmap_items.from_to[i][0],
+ pg->pg_upmap_items.from_to[i][1]);
+ seq_printf(s, "]\n");
+ }
up_read(&osdc->lock);
return 0;
@@ -147,17 +170,26 @@ static int monc_show(struct seq_file *s, void *p)
return 0;
}
+static void dump_spgid(struct seq_file *s, const struct ceph_spg *spgid)
+{
+ seq_printf(s, "%llu.%x", spgid->pgid.pool, spgid->pgid.seed);
+ if (spgid->shard != CEPH_SPG_NOSHARD)
+ seq_printf(s, "s%d", spgid->shard);
+}
+
static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
{
int i;
- seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed);
+ seq_printf(s, "osd%d\t%llu.%x\t", t->osd, t->pgid.pool, t->pgid.seed);
+ dump_spgid(s, &t->spgid);
+ seq_puts(s, "\t[");
for (i = 0; i < t->up.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]);
seq_printf(s, "]/%d\t[", t->up.primary);
for (i = 0; i < t->acting.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
- seq_printf(s, "]/%d\t", t->acting.primary);
+ seq_printf(s, "]/%d\te%u\t", t->acting.primary, t->epoch);
if (t->target_oloc.pool_ns) {
seq_printf(s, "%*pE/%*pE\t0x%x",
(int)t->target_oloc.pool_ns->len,
@@ -234,6 +266,73 @@ static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
mutex_unlock(&osd->lock);
}
+static void dump_snapid(struct seq_file *s, u64 snapid)
+{
+ if (snapid == CEPH_NOSNAP)
+ seq_puts(s, "head");
+ else if (snapid == CEPH_SNAPDIR)
+ seq_puts(s, "snapdir");
+ else
+ seq_printf(s, "%llx", snapid);
+}
+
+static void dump_name_escaped(struct seq_file *s, unsigned char *name,
+ size_t len)
+{
+ size_t i;
+
+ for (i = 0; i < len; i++) {
+ if (name[i] == '%' || name[i] == ':' || name[i] == '/' ||
+ name[i] < 32 || name[i] >= 127) {
+ seq_printf(s, "%%%02x", name[i]);
+ } else {
+ seq_putc(s, name[i]);
+ }
+ }
+}
+
+static void dump_hoid(struct seq_file *s, const struct ceph_hobject_id *hoid)
+{
+ if (hoid->snapid == 0 && hoid->hash == 0 && !hoid->is_max &&
+ hoid->pool == S64_MIN) {
+ seq_puts(s, "MIN");
+ return;
+ }
+ if (hoid->is_max) {
+ seq_puts(s, "MAX");
+ return;
+ }
+ seq_printf(s, "%lld:%08x:", hoid->pool, hoid->hash_reverse_bits);
+ dump_name_escaped(s, hoid->nspace, hoid->nspace_len);
+ seq_putc(s, ':');
+ dump_name_escaped(s, hoid->key, hoid->key_len);
+ seq_putc(s, ':');
+ dump_name_escaped(s, hoid->oid, hoid->oid_len);
+ seq_putc(s, ':');
+ dump_snapid(s, hoid->snapid);
+}
+
+static void dump_backoffs(struct seq_file *s, struct ceph_osd *osd)
+{
+ struct rb_node *n;
+
+ mutex_lock(&osd->lock);
+ for (n = rb_first(&osd->o_backoffs_by_id); n; n = rb_next(n)) {
+ struct ceph_osd_backoff *backoff =
+ rb_entry(n, struct ceph_osd_backoff, id_node);
+
+ seq_printf(s, "osd%d\t", osd->o_osd);
+ dump_spgid(s, &backoff->spgid);
+ seq_printf(s, "\t%llu\t", backoff->id);
+ dump_hoid(s, backoff->begin);
+ seq_putc(s, '\t');
+ dump_hoid(s, backoff->end);
+ seq_putc(s, '\n');
+ }
+
+ mutex_unlock(&osd->lock);
+}
+
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
@@ -259,6 +358,13 @@ static int osdc_show(struct seq_file *s, void *pp)
}
dump_linger_requests(s, &osdc->homeless_osd);
+ seq_puts(s, "BACKOFFS\n");
+ for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+ dump_backoffs(s, osd);
+ }
+
up_read(&osdc->lock);
return 0;
}
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 5766a6c896c4..0c31035bbfee 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1174,8 +1174,8 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
* Returns true if the result moves the cursor on to the next piece
* of the data item.
*/
-static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
- size_t bytes)
+static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
{
bool new_piece;
@@ -1207,8 +1207,6 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
new_piece = true;
}
cursor->need_crc = new_piece;
-
- return new_piece;
}
static size_t sizeof_footer(struct ceph_connection *con)
@@ -1290,13 +1288,16 @@ static void prepare_write_message(struct ceph_connection *con)
m->hdr.seq = cpu_to_le64(++con->out_seq);
m->needs_out_seq = false;
}
- WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
+
+ if (con->ops->reencode_message)
+ con->ops->reencode_message(m);
dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
m->data_length);
- BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
+ WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
+ WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
/* tag + hdr + front + middle */
con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
@@ -1577,7 +1578,6 @@ static int write_partial_message_data(struct ceph_connection *con)
size_t page_offset;
size_t length;
bool last_piece;
- bool need_crc;
int ret;
page = ceph_msg_data_next(cursor, &page_offset, &length,
@@ -1592,7 +1592,7 @@ static int write_partial_message_data(struct ceph_connection *con)
}
if (do_datacrc && cursor->need_crc)
crc = ceph_crc32c_page(crc, page, page_offset, length);
- need_crc = ceph_msg_data_advance(cursor, (size_t)ret);
+ ceph_msg_data_advance(cursor, (size_t)ret);
}
dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2036,8 +2036,7 @@ static int process_connect(struct ceph_connection *con)
{
u64 sup_feat = from_msgr(con->msgr)->supported_features;
u64 req_feat = from_msgr(con->msgr)->required_features;
- u64 server_feat = ceph_sanitize_features(
- le64_to_cpu(con->in_reply.features));
+ u64 server_feat = le64_to_cpu(con->in_reply.features);
int ret;
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -2231,10 +2230,18 @@ static void process_ack(struct ceph_connection *con)
struct ceph_msg *m;
u64 ack = le64_to_cpu(con->in_temp_ack);
u64 seq;
+ bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ);
+ struct list_head *list = reconnect ? &con->out_queue : &con->out_sent;
- while (!list_empty(&con->out_sent)) {
- m = list_first_entry(&con->out_sent, struct ceph_msg,
- list_head);
+ /*
+ * In the reconnect case, con_fault() has requeued messages
+ * in out_sent. We should cleanup old messages according to
+ * the reconnect seq.
+ */
+ while (!list_empty(list)) {
+ m = list_first_entry(list, struct ceph_msg, list_head);
+ if (reconnect && m->needs_out_seq)
+ break;
seq = le64_to_cpu(m->hdr.seq);
if (seq > ack)
break;
@@ -2243,6 +2250,7 @@ static void process_ack(struct ceph_connection *con)
m->ack_stamp = jiffies;
ceph_msg_remove(m);
}
+
prepare_read_tag(con);
}
@@ -2299,7 +2307,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
if (do_datacrc)
crc = ceph_crc32c_page(crc, page, page_offset, ret);
- (void) ceph_msg_data_advance(cursor, (size_t)ret);
+ ceph_msg_data_advance(cursor, (size_t)ret);
}
if (do_datacrc)
con->in_data_crc = crc;
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 29a0ef351c5e..875675765531 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -6,6 +6,7 @@
#include <linux/random.h>
#include <linux/sched.h>
+#include <linux/ceph/ceph_features.h>
#include <linux/ceph/mon_client.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/debugfs.h>
@@ -43,15 +44,13 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
int i, err = -EINVAL;
struct ceph_fsid fsid;
u32 epoch, num_mon;
- u16 version;
u32 len;
ceph_decode_32_safe(&p, end, len, bad);
ceph_decode_need(&p, end, len, bad);
dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
-
- ceph_decode_16_safe(&p, end, version, bad);
+ p += sizeof(u16); /* skip version */
ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
ceph_decode_copy(&p, &fsid, sizeof(fsid));
@@ -299,6 +298,10 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
mutex_lock(&monc->mutex);
if (monc->sub_renew_sent) {
+ /*
+ * This is only needed for legacy (infernalis or older)
+ * MONs -- see delayed_work().
+ */
monc->sub_renew_after = monc->sub_renew_sent +
(seconds >> 1) * HZ - 1;
dout("%s sent %lu duration %d renew after %lu\n", __func__,
@@ -957,7 +960,8 @@ static void delayed_work(struct work_struct *work)
__validate_auth(monc);
}
- if (is_auth) {
+ if (is_auth &&
+ !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
unsigned long now = jiffies;
dout("%s renew subs? now %lu renew after %lu\n",
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 924f07c36ddb..86a9737d8e3f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -12,6 +12,7 @@
#include <linux/bio.h>
#endif
+#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/osd_client.h>
#include <linux/ceph/messenger.h>
@@ -49,6 +50,7 @@ static void link_linger(struct ceph_osd *osd,
struct ceph_osd_linger_request *lreq);
static void unlink_linger(struct ceph_osd *osd,
struct ceph_osd_linger_request *lreq);
+static void clear_backoffs(struct ceph_osd *osd);
#if 1
static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
@@ -373,6 +375,7 @@ static void target_copy(struct ceph_osd_request_target *dest,
ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
dest->pgid = src->pgid; /* struct */
+ dest->spgid = src->spgid; /* struct */
dest->pg_num = src->pg_num;
dest->pg_num_mask = src->pg_num_mask;
ceph_osds_copy(&dest->acting, &src->acting);
@@ -384,6 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest,
dest->flags = src->flags;
dest->paused = src->paused;
+ dest->epoch = src->epoch;
+ dest->last_force_resend = src->last_force_resend;
+
dest->osd = src->osd;
}
@@ -537,7 +543,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
}
EXPORT_SYMBOL(ceph_osdc_alloc_request);
-static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
+static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
{
return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
}
@@ -552,17 +558,21 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
/* create request message */
- msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
- msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
+ msg_size = CEPH_ENCODING_START_BLK_LEN +
+ CEPH_PGID_ENCODING_LEN + 1; /* spgid */
+ msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
+ msg_size += CEPH_ENCODING_START_BLK_LEN +
+ sizeof(struct ceph_osd_reqid); /* reqid */
+ msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
+ msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
msg_size += CEPH_ENCODING_START_BLK_LEN +
ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
- msg_size += 1 + 8 + 4 + 4; /* pgid */
msg_size += 4 + req->r_base_oid.name_len; /* oid */
msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
msg_size += 8; /* snapid */
msg_size += 8; /* snap_seq */
msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
- msg_size += 4; /* retry_attempt */
+ msg_size += 4 + 8; /* retry_attempt, features */
if (req->r_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -1010,6 +1020,8 @@ static void osd_init(struct ceph_osd *osd)
RB_CLEAR_NODE(&osd->o_node);
osd->o_requests = RB_ROOT;
osd->o_linger_requests = RB_ROOT;
+ osd->o_backoff_mappings = RB_ROOT;
+ osd->o_backoffs_by_id = RB_ROOT;
INIT_LIST_HEAD(&osd->o_osd_lru);
INIT_LIST_HEAD(&osd->o_keepalive_item);
osd->o_incarnation = 1;
@@ -1021,6 +1033,8 @@ static void osd_cleanup(struct ceph_osd *osd)
WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
+ WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
+ WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
WARN_ON(!list_empty(&osd->o_osd_lru));
WARN_ON(!list_empty(&osd->o_keepalive_item));
@@ -1141,6 +1155,7 @@ static void close_osd(struct ceph_osd *osd)
unlink_linger(osd, lreq);
link_linger(&osdc->homeless_osd, lreq);
}
+ clear_backoffs(osd);
__remove_osd_from_lru(osd);
erase_osd(&osdc->osds, osd);
@@ -1297,7 +1312,7 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
__pool_full(pi);
- WARN_ON(pi->id != t->base_oloc.pool);
+ WARN_ON(pi->id != t->target_oloc.pool);
return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
(osdc->osdmap->epoch < osdc->epoch_barrier);
@@ -1311,19 +1326,21 @@ enum calc_target_result {
static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
struct ceph_osd_request_target *t,
- u32 *last_force_resend,
+ struct ceph_connection *con,
bool any_change)
{
struct ceph_pg_pool_info *pi;
struct ceph_pg pgid, last_pgid;
struct ceph_osds up, acting;
bool force_resend = false;
- bool need_check_tiering = false;
- bool need_resend = false;
+ bool unpaused = false;
+ bool legacy_change;
+ bool split = false;
bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
enum calc_target_result ct_res;
int ret;
+ t->epoch = osdc->osdmap->epoch;
pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
if (!pi) {
t->osd = CEPH_HOMELESS_OSD;
@@ -1332,33 +1349,33 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
}
if (osdc->osdmap->epoch == pi->last_force_request_resend) {
- if (last_force_resend &&
- *last_force_resend < pi->last_force_request_resend) {
- *last_force_resend = pi->last_force_request_resend;
+ if (t->last_force_resend < pi->last_force_request_resend) {
+ t->last_force_resend = pi->last_force_request_resend;
force_resend = true;
- } else if (!last_force_resend) {
+ } else if (t->last_force_resend == 0) {
force_resend = true;
}
}
- if (ceph_oid_empty(&t->target_oid) || force_resend) {
- ceph_oid_copy(&t->target_oid, &t->base_oid);
- need_check_tiering = true;
- }
- if (ceph_oloc_empty(&t->target_oloc) || force_resend) {
- ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
- need_check_tiering = true;
- }
- if (need_check_tiering &&
- (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+ /* apply tiering */
+ ceph_oid_copy(&t->target_oid, &t->base_oid);
+ ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
+ if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
t->target_oloc.pool = pi->read_tier;
if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
t->target_oloc.pool = pi->write_tier;
+
+ pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
+ if (!pi) {
+ t->osd = CEPH_HOMELESS_OSD;
+ ct_res = CALC_TARGET_POOL_DNE;
+ goto out;
+ }
}
- ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid,
- &t->target_oloc, &pgid);
+ ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc,
+ &pgid);
if (ret) {
WARN_ON(ret != -ENOENT);
t->osd = CEPH_HOMELESS_OSD;
@@ -1368,7 +1385,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
last_pgid.pool = pgid.pool;
last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
- ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
+ ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
if (any_change &&
ceph_is_new_interval(&t->acting,
&acting,
@@ -1387,13 +1404,16 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
if (t->paused && !target_should_be_paused(osdc, t, pi)) {
t->paused = false;
- need_resend = true;
+ unpaused = true;
}
+ legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
+ ceph_osds_changed(&t->acting, &acting, any_change);
+ if (t->pg_num)
+ split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
- if (ceph_pg_compare(&t->pgid, &pgid) ||
- ceph_osds_changed(&t->acting, &acting, any_change) ||
- force_resend) {
+ if (legacy_change || force_resend || split) {
t->pgid = pgid; /* struct */
+ ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
ceph_osds_copy(&t->acting, &acting);
ceph_osds_copy(&t->up, &up);
t->size = pi->size;
@@ -1403,15 +1423,342 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
t->sort_bitwise = sort_bitwise;
t->osd = acting.primary;
- need_resend = true;
}
- ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION;
+ if (unpaused || legacy_change || force_resend ||
+ (split && con && CEPH_HAVE_FEATURE(con->peer_features,
+ RESEND_ON_SPLIT)))
+ ct_res = CALC_TARGET_NEED_RESEND;
+ else
+ ct_res = CALC_TARGET_NO_ACTION;
+
out:
dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
return ct_res;
}
+static struct ceph_spg_mapping *alloc_spg_mapping(void)
+{
+ struct ceph_spg_mapping *spg;
+
+ spg = kmalloc(sizeof(*spg), GFP_NOIO);
+ if (!spg)
+ return NULL;
+
+ RB_CLEAR_NODE(&spg->node);
+ spg->backoffs = RB_ROOT;
+ return spg;
+}
+
+static void free_spg_mapping(struct ceph_spg_mapping *spg)
+{
+ WARN_ON(!RB_EMPTY_NODE(&spg->node));
+ WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
+
+ kfree(spg);
+}
+
+/*
+ * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
+ * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is
+ * defined only within a specific spgid; it does not pass anything to
+ * children on split, or to another primary.
+ */
+DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
+ RB_BYPTR, const struct ceph_spg *, node)
+
+static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
+{
+ return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
+}
+
+static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
+ void **pkey, size_t *pkey_len)
+{
+ if (hoid->key_len) {
+ *pkey = hoid->key;
+ *pkey_len = hoid->key_len;
+ } else {
+ *pkey = hoid->oid;
+ *pkey_len = hoid->oid_len;
+ }
+}
+
+static int compare_names(const void *name1, size_t name1_len,
+ const void *name2, size_t name2_len)
+{
+ int ret;
+
+ ret = memcmp(name1, name2, min(name1_len, name2_len));
+ if (!ret) {
+ if (name1_len < name2_len)
+ ret = -1;
+ else if (name1_len > name2_len)
+ ret = 1;
+ }
+ return ret;
+}
+
+static int hoid_compare(const struct ceph_hobject_id *lhs,
+ const struct ceph_hobject_id *rhs)
+{
+ void *effective_key1, *effective_key2;
+ size_t effective_key1_len, effective_key2_len;
+ int ret;
+
+ if (lhs->is_max < rhs->is_max)
+ return -1;
+ if (lhs->is_max > rhs->is_max)
+ return 1;
+
+ if (lhs->pool < rhs->pool)
+ return -1;
+ if (lhs->pool > rhs->pool)
+ return 1;
+
+ if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
+ return -1;
+ if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
+ return 1;
+
+ ret = compare_names(lhs->nspace, lhs->nspace_len,
+ rhs->nspace, rhs->nspace_len);
+ if (ret)
+ return ret;
+
+ hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
+ hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
+ ret = compare_names(effective_key1, effective_key1_len,
+ effective_key2, effective_key2_len);
+ if (ret)
+ return ret;
+
+ ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
+ if (ret)
+ return ret;
+
+ if (lhs->snapid < rhs->snapid)
+ return -1;
+ if (lhs->snapid > rhs->snapid)
+ return 1;
+
+ return 0;
+}
+
+/*
+ * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
+ * compat stuff here.
+ *
+ * Assumes @hoid is zero-initialized.
+ */
+static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
+{
+ u8 struct_v;
+ u32 struct_len;
+ int ret;
+
+ ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
+ &struct_len);
+ if (ret)
+ return ret;
+
+ if (struct_v < 4) {
+ pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
+ goto e_inval;
+ }
+
+ hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
+ GFP_NOIO);
+ if (IS_ERR(hoid->key)) {
+ ret = PTR_ERR(hoid->key);
+ hoid->key = NULL;
+ return ret;
+ }
+
+ hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
+ GFP_NOIO);
+ if (IS_ERR(hoid->oid)) {
+ ret = PTR_ERR(hoid->oid);
+ hoid->oid = NULL;
+ return ret;
+ }
+
+ ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
+ ceph_decode_32_safe(p, end, hoid->hash, e_inval);
+ ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
+
+ hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
+ GFP_NOIO);
+ if (IS_ERR(hoid->nspace)) {
+ ret = PTR_ERR(hoid->nspace);
+ hoid->nspace = NULL;
+ return ret;
+ }
+
+ ceph_decode_64_safe(p, end, hoid->pool, e_inval);
+
+ ceph_hoid_build_hash_cache(hoid);
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
+static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
+{
+ return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
+ 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
+}
+
+static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
+{
+ ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
+ ceph_encode_string(p, end, hoid->key, hoid->key_len);
+ ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
+ ceph_encode_64(p, hoid->snapid);
+ ceph_encode_32(p, hoid->hash);
+ ceph_encode_8(p, hoid->is_max);
+ ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
+ ceph_encode_64(p, hoid->pool);
+}
+
+static void free_hoid(struct ceph_hobject_id *hoid)
+{
+ if (hoid) {
+ kfree(hoid->key);
+ kfree(hoid->oid);
+ kfree(hoid->nspace);
+ kfree(hoid);
+ }
+}
+
+static struct ceph_osd_backoff *alloc_backoff(void)
+{
+ struct ceph_osd_backoff *backoff;
+
+ backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
+ if (!backoff)
+ return NULL;
+
+ RB_CLEAR_NODE(&backoff->spg_node);
+ RB_CLEAR_NODE(&backoff->id_node);
+ return backoff;
+}
+
+static void free_backoff(struct ceph_osd_backoff *backoff)
+{
+ WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
+ WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
+
+ free_hoid(backoff->begin);
+ free_hoid(backoff->end);
+ kfree(backoff);
+}
+
+/*
+ * Within a specific spgid, backoffs are managed by ->begin hoid.
+ */
+DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
+ RB_BYVAL, spg_node);
+
+static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
+ const struct ceph_hobject_id *hoid)
+{
+ struct rb_node *n = root->rb_node;
+
+ while (n) {
+ struct ceph_osd_backoff *cur =
+ rb_entry(n, struct ceph_osd_backoff, spg_node);
+ int cmp;
+
+ cmp = hoid_compare(hoid, cur->begin);
+ if (cmp < 0) {
+ n = n->rb_left;
+ } else if (cmp > 0) {
+ if (hoid_compare(hoid, cur->end) < 0)
+ return cur;
+
+ n = n->rb_right;
+ } else {
+ return cur;
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Each backoff has a unique id within its OSD session.
+ */
+DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
+
+static void clear_backoffs(struct ceph_osd *osd)
+{
+ while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
+ struct ceph_spg_mapping *spg =
+ rb_entry(rb_first(&osd->o_backoff_mappings),
+ struct ceph_spg_mapping, node);
+
+ while (!RB_EMPTY_ROOT(&spg->backoffs)) {
+ struct ceph_osd_backoff *backoff =
+ rb_entry(rb_first(&spg->backoffs),
+ struct ceph_osd_backoff, spg_node);
+
+ erase_backoff(&spg->backoffs, backoff);
+ erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+ free_backoff(backoff);
+ }
+ erase_spg_mapping(&osd->o_backoff_mappings, spg);
+ free_spg_mapping(spg);
+ }
+}
+
+/*
+ * Set up a temporary, non-owning view into @t.
+ */
+static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
+ const struct ceph_osd_request_target *t)
+{
+ hoid->key = NULL;
+ hoid->key_len = 0;
+ hoid->oid = t->target_oid.name;
+ hoid->oid_len = t->target_oid.name_len;
+ hoid->snapid = CEPH_NOSNAP;
+ hoid->hash = t->pgid.seed;
+ hoid->is_max = false;
+ if (t->target_oloc.pool_ns) {
+ hoid->nspace = t->target_oloc.pool_ns->str;
+ hoid->nspace_len = t->target_oloc.pool_ns->len;
+ } else {
+ hoid->nspace = NULL;
+ hoid->nspace_len = 0;
+ }
+ hoid->pool = t->target_oloc.pool;
+ ceph_hoid_build_hash_cache(hoid);
+}
+
+static bool should_plug_request(struct ceph_osd_request *req)
+{
+ struct ceph_osd *osd = req->r_osd;
+ struct ceph_spg_mapping *spg;
+ struct ceph_osd_backoff *backoff;
+ struct ceph_hobject_id hoid;
+
+ spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
+ if (!spg)
+ return false;
+
+ hoid_fill_from_target(&hoid, &req->r_t);
+ backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
+ if (!backoff)
+ return false;
+
+ dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
+ __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
+ backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
+ return true;
+}
+
static void setup_request_data(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
@@ -1483,7 +1830,37 @@ static void setup_request_data(struct ceph_osd_request *req,
WARN_ON(data_len != msg->data_length);
}
-static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void encode_pgid(void **p, const struct ceph_pg *pgid)
+{
+ ceph_encode_8(p, 1);
+ ceph_encode_64(p, pgid->pool);
+ ceph_encode_32(p, pgid->seed);
+ ceph_encode_32(p, -1); /* preferred */
+}
+
+static void encode_spgid(void **p, const struct ceph_spg *spgid)
+{
+ ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
+ encode_pgid(p, &spgid->pgid);
+ ceph_encode_8(p, spgid->shard);
+}
+
+static void encode_oloc(void **p, void *end,
+ const struct ceph_object_locator *oloc)
+{
+ ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
+ ceph_encode_64(p, oloc->pool);
+ ceph_encode_32(p, -1); /* preferred */
+ ceph_encode_32(p, 0); /* key len */
+ if (oloc->pool_ns)
+ ceph_encode_string(p, end, oloc->pool_ns->str,
+ oloc->pool_ns->len);
+ else
+ ceph_encode_32(p, 0);
+}
+
+static void encode_request_partial(struct ceph_osd_request *req,
+ struct ceph_msg *msg)
{
void *p = msg->front.iov_base;
void *const end = p + msg->front_alloc_len;
@@ -1500,38 +1877,27 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
setup_request_data(req, msg);
- ceph_encode_32(&p, 1); /* client_inc, always 1 */
+ encode_spgid(&p, &req->r_t.spgid); /* actual spg */
+ ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
ceph_encode_32(&p, req->r_flags);
- ceph_encode_timespec(p, &req->r_mtime);
- p += sizeof(struct ceph_timespec);
- /* reassert_version */
- memset(p, 0, sizeof(struct ceph_eversion));
- p += sizeof(struct ceph_eversion);
-
- /* oloc */
- ceph_start_encoding(&p, 5, 4,
- ceph_oloc_encoding_size(&req->r_t.target_oloc));
- ceph_encode_64(&p, req->r_t.target_oloc.pool);
- ceph_encode_32(&p, -1); /* preferred */
- ceph_encode_32(&p, 0); /* key len */
- if (req->r_t.target_oloc.pool_ns)
- ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
- req->r_t.target_oloc.pool_ns->len);
- else
- ceph_encode_32(&p, 0);
+ /* reqid */
+ ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
+ memset(p, 0, sizeof(struct ceph_osd_reqid));
+ p += sizeof(struct ceph_osd_reqid);
- /* pgid */
- ceph_encode_8(&p, 1);
- ceph_encode_64(&p, req->r_t.pgid.pool);
- ceph_encode_32(&p, req->r_t.pgid.seed);
- ceph_encode_32(&p, -1); /* preferred */
+ /* trace */
+ memset(p, 0, sizeof(struct ceph_blkin_trace_info));
+ p += sizeof(struct ceph_blkin_trace_info);
+
+ ceph_encode_32(&p, 0); /* client_inc, always 0 */
+ ceph_encode_timespec(p, &req->r_mtime);
+ p += sizeof(struct ceph_timespec);
- /* oid */
- ceph_encode_32(&p, req->r_t.target_oid.name_len);
- memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
- p += req->r_t.target_oid.name_len;
+ encode_oloc(&p, end, &req->r_t.target_oloc);
+ ceph_encode_string(&p, end, req->r_t.target_oid.name,
+ req->r_t.target_oid.name_len);
/* ops, can imply data */
ceph_encode_16(&p, req->r_num_ops);
@@ -1552,11 +1918,10 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
}
ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
+ BUG_ON(p != end - 8); /* space for features */
- BUG_ON(p > end);
- msg->front.iov_len = p - msg->front.iov_base;
- msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
- msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
+ /* front_len is finalized in encode_request_finish() */
msg->hdr.data_len = cpu_to_le32(data_len);
/*
* The header "data_off" is a hint to the receiver allowing it
@@ -1565,9 +1930,99 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
*/
msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
- dout("%s req %p oid %s oid_len %d front %zu data %u\n", __func__,
- req, req->r_t.target_oid.name, req->r_t.target_oid.name_len,
- msg->front.iov_len, data_len);
+ dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
+ req->r_t.target_oid.name, req->r_t.target_oid.name_len);
+}
+
+static void encode_request_finish(struct ceph_msg *msg)
+{
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front_alloc_len;
+
+ if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
+ /* luminous OSD -- encode features and be done */
+ p = end - 8;
+ ceph_encode_64(&p, msg->con->peer_features);
+ } else {
+ struct {
+ char spgid[CEPH_ENCODING_START_BLK_LEN +
+ CEPH_PGID_ENCODING_LEN + 1];
+ __le32 hash;
+ __le32 epoch;
+ __le32 flags;
+ char reqid[CEPH_ENCODING_START_BLK_LEN +
+ sizeof(struct ceph_osd_reqid)];
+ char trace[sizeof(struct ceph_blkin_trace_info)];
+ __le32 client_inc;
+ struct ceph_timespec mtime;
+ } __packed head;
+ struct ceph_pg pgid;
+ void *oloc, *oid, *tail;
+ int oloc_len, oid_len, tail_len;
+ int len;
+
+ /*
+ * Pre-luminous OSD -- reencode v8 into v4 using @head
+ * as a temporary buffer. Encode the raw PG; the rest
+ * is just a matter of moving oloc, oid and tail blobs
+ * around.
+ */
+ memcpy(&head, p, sizeof(head));
+ p += sizeof(head);
+
+ oloc = p;
+ p += CEPH_ENCODING_START_BLK_LEN;
+ pgid.pool = ceph_decode_64(&p);
+ p += 4 + 4; /* preferred, key len */
+ len = ceph_decode_32(&p);
+ p += len; /* nspace */
+ oloc_len = p - oloc;
+
+ oid = p;
+ len = ceph_decode_32(&p);
+ p += len;
+ oid_len = p - oid;
+
+ tail = p;
+ tail_len = (end - p) - 8;
+
+ p = msg->front.iov_base;
+ ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
+ ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
+ ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
+ ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
+
+ /* reassert_version */
+ memset(p, 0, sizeof(struct ceph_eversion));
+ p += sizeof(struct ceph_eversion);
+
+ BUG_ON(p >= oloc);
+ memmove(p, oloc, oloc_len);
+ p += oloc_len;
+
+ pgid.seed = le32_to_cpu(head.hash);
+ encode_pgid(&p, &pgid); /* raw pg */
+
+ BUG_ON(p >= oid);
+ memmove(p, oid, oid_len);
+ p += oid_len;
+
+ /* tail -- ops, snapid, snapc, retry_attempt */
+ BUG_ON(p >= tail);
+ memmove(p, tail, tail_len);
+ p += tail_len;
+
+ msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
+ }
+
+ BUG_ON(p > end);
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+
+ dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
+ le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
+ le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
+ le16_to_cpu(msg->hdr.version));
}
/*
@@ -1580,6 +2035,10 @@ static void send_request(struct ceph_osd_request *req)
verify_osd_locked(osd);
WARN_ON(osd->o_osd != req->r_t.osd);
+ /* backoff? */
+ if (should_plug_request(req))
+ return;
+
/*
* We may have a previously queued request message hanging
* around. Cancel it to avoid corrupting the msgr.
@@ -1593,11 +2052,13 @@ static void send_request(struct ceph_osd_request *req)
else
WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
- encode_request(req, req->r_request);
+ encode_request_partial(req, req->r_request);
- dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
+ dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
__func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
- req->r_t.osd, req->r_flags, req->r_attempts);
+ req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
+ req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
+ req->r_attempts);
req->r_t.paused = false;
req->r_stamp = jiffies;
@@ -1645,7 +2106,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
again:
- ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+ ct_res = calc_target(osdc, &req->r_t, NULL, false);
if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
goto promote;
@@ -1737,13 +2198,12 @@ static void submit_request(struct ceph_osd_request *req, bool wrlocked)
static void finish_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
- struct ceph_osd *osd = req->r_osd;
- verify_osd_locked(osd);
+ WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
- WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
- unlink_request(osd, req);
+ if (req->r_osd)
+ unlink_request(req->r_osd, req);
atomic_dec(&osdc->num_requests);
/*
@@ -2441,7 +2901,7 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
struct ceph_osd_client *osdc = lreq->osdc;
struct ceph_osd *osd;
- calc_target(osdc, &lreq->t, &lreq->last_force_resend, false);
+ calc_target(osdc, &lreq->t, NULL, false);
osd = lookup_create_osd(osdc, lreq->t.osd, true);
link_linger(osd, lreq);
@@ -3059,7 +3519,7 @@ recalc_linger_target(struct ceph_osd_linger_request *lreq)
struct ceph_osd_client *osdc = lreq->osdc;
enum calc_target_result ct_res;
- ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true);
+ ct_res = calc_target(osdc, &lreq->t, NULL, true);
if (ct_res == CALC_TARGET_NEED_RESEND) {
struct ceph_osd *osd;
@@ -3117,6 +3577,7 @@ static void scan_requests(struct ceph_osd *osd,
list_add_tail(&lreq->scan_item, need_resend_linger);
break;
case CALC_TARGET_POOL_DNE:
+ list_del_init(&lreq->scan_item);
check_linger_pool_dne(lreq);
break;
}
@@ -3130,8 +3591,8 @@ static void scan_requests(struct ceph_osd *osd,
n = rb_next(n); /* unlink_request(), check_pool_dne() */
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
- ct_res = calc_target(osdc, &req->r_t,
- &req->r_last_force_resend, false);
+ ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
+ false);
switch (ct_res) {
case CALC_TARGET_NO_ACTION:
force_resend_writes = cleared_full ||
@@ -3229,8 +3690,25 @@ static void kick_requests(struct ceph_osd_client *osdc,
struct list_head *need_resend_linger)
{
struct ceph_osd_linger_request *lreq, *nlreq;
+ enum calc_target_result ct_res;
struct rb_node *n;
+ /* make sure need_resend targets reflect latest map */
+ for (n = rb_first(need_resend); n; ) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ n = rb_next(n);
+
+ if (req->r_t.epoch < osdc->osdmap->epoch) {
+ ct_res = calc_target(osdc, &req->r_t, NULL, false);
+ if (ct_res == CALC_TARGET_POOL_DNE) {
+ erase_request(need_resend, req);
+ check_pool_dne(req);
+ }
+ }
+ }
+
for (n = rb_first(need_resend); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
@@ -3239,8 +3717,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
n = rb_next(n);
erase_request(need_resend, req); /* before link_request() */
- WARN_ON(req->r_osd);
- calc_target(osdc, &req->r_t, NULL, false);
osd = lookup_create_osd(osdc, req->r_t.osd, true);
link_request(osd, req);
if (!req->r_linger) {
@@ -3383,6 +3859,8 @@ static void kick_osd_requests(struct ceph_osd *osd)
{
struct rb_node *n;
+ clear_backoffs(osd);
+
for (n = rb_first(&osd->o_requests); n; ) {
struct ceph_osd_request *req =
rb_entry(n, struct ceph_osd_request, r_node);
@@ -3428,6 +3906,261 @@ out_unlock:
up_write(&osdc->lock);
}
+struct MOSDBackoff {
+ struct ceph_spg spgid;
+ u32 map_epoch;
+ u8 op;
+ u64 id;
+ struct ceph_hobject_id *begin;
+ struct ceph_hobject_id *end;
+};
+
+static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
+{
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front.iov_len;
+ u8 struct_v;
+ u32 struct_len;
+ int ret;
+
+ ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
+ if (ret)
+ return ret;
+
+ ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
+ if (ret)
+ return ret;
+
+ ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
+ ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
+ ceph_decode_8_safe(&p, end, m->op, e_inval);
+ ceph_decode_64_safe(&p, end, m->id, e_inval);
+
+ m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
+ if (!m->begin)
+ return -ENOMEM;
+
+ ret = decode_hoid(&p, end, m->begin);
+ if (ret) {
+ free_hoid(m->begin);
+ return ret;
+ }
+
+ m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
+ if (!m->end) {
+ free_hoid(m->begin);
+ return -ENOMEM;
+ }
+
+ ret = decode_hoid(&p, end, m->end);
+ if (ret) {
+ free_hoid(m->begin);
+ free_hoid(m->end);
+ return ret;
+ }
+
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
+static struct ceph_msg *create_backoff_message(
+ const struct ceph_osd_backoff *backoff,
+ u32 map_epoch)
+{
+ struct ceph_msg *msg;
+ void *p, *end;
+ int msg_size;
+
+ msg_size = CEPH_ENCODING_START_BLK_LEN +
+ CEPH_PGID_ENCODING_LEN + 1; /* spgid */
+ msg_size += 4 + 1 + 8; /* map_epoch, op, id */
+ msg_size += CEPH_ENCODING_START_BLK_LEN +
+ hoid_encoding_size(backoff->begin);
+ msg_size += CEPH_ENCODING_START_BLK_LEN +
+ hoid_encoding_size(backoff->end);
+
+ msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
+ if (!msg)
+ return NULL;
+
+ p = msg->front.iov_base;
+ end = p + msg->front_alloc_len;
+
+ encode_spgid(&p, &backoff->spgid);
+ ceph_encode_32(&p, map_epoch);
+ ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
+ ceph_encode_64(&p, backoff->id);
+ encode_hoid(&p, end, backoff->begin);
+ encode_hoid(&p, end, backoff->end);
+ BUG_ON(p != end);
+
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+
+ return msg;
+}
+
+static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
+{
+ struct ceph_spg_mapping *spg;
+ struct ceph_osd_backoff *backoff;
+ struct ceph_msg *msg;
+
+ dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
+ m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
+
+ spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
+ if (!spg) {
+ spg = alloc_spg_mapping();
+ if (!spg) {
+ pr_err("%s failed to allocate spg\n", __func__);
+ return;
+ }
+ spg->spgid = m->spgid; /* struct */
+ insert_spg_mapping(&osd->o_backoff_mappings, spg);
+ }
+
+ backoff = alloc_backoff();
+ if (!backoff) {
+ pr_err("%s failed to allocate backoff\n", __func__);
+ return;
+ }
+ backoff->spgid = m->spgid; /* struct */
+ backoff->id = m->id;
+ backoff->begin = m->begin;
+ m->begin = NULL; /* backoff now owns this */
+ backoff->end = m->end;
+ m->end = NULL; /* ditto */
+
+ insert_backoff(&spg->backoffs, backoff);
+ insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+
+ /*
+ * Ack with original backoff's epoch so that the OSD can
+ * discard this if there was a PG split.
+ */
+ msg = create_backoff_message(backoff, m->map_epoch);
+ if (!msg) {
+ pr_err("%s failed to allocate msg\n", __func__);
+ return;
+ }
+ ceph_con_send(&osd->o_con, msg);
+}
+
+static bool target_contained_by(const struct ceph_osd_request_target *t,
+ const struct ceph_hobject_id *begin,
+ const struct ceph_hobject_id *end)
+{
+ struct ceph_hobject_id hoid;
+ int cmp;
+
+ hoid_fill_from_target(&hoid, t);
+ cmp = hoid_compare(&hoid, begin);
+ return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
+}
+
+static void handle_backoff_unblock(struct ceph_osd *osd,
+ const struct MOSDBackoff *m)
+{
+ struct ceph_spg_mapping *spg;
+ struct ceph_osd_backoff *backoff;
+ struct rb_node *n;
+
+ dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
+ m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
+
+ backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
+ if (!backoff) {
+ pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
+ __func__, osd->o_osd, m->spgid.pgid.pool,
+ m->spgid.pgid.seed, m->spgid.shard, m->id);
+ return;
+ }
+
+ if (hoid_compare(backoff->begin, m->begin) &&
+ hoid_compare(backoff->end, m->end)) {
+ pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
+ __func__, osd->o_osd, m->spgid.pgid.pool,
+ m->spgid.pgid.seed, m->spgid.shard, m->id);
+ /* unblock it anyway... */
+ }
+
+ spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
+ BUG_ON(!spg);
+
+ erase_backoff(&spg->backoffs, backoff);
+ erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+ free_backoff(backoff);
+
+ if (RB_EMPTY_ROOT(&spg->backoffs)) {
+ erase_spg_mapping(&osd->o_backoff_mappings, spg);
+ free_spg_mapping(spg);
+ }
+
+ for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+ struct ceph_osd_request *req =
+ rb_entry(n, struct ceph_osd_request, r_node);
+
+ if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
+ /*
+ * Match against @m, not @backoff -- the PG may
+ * have split on the OSD.
+ */
+ if (target_contained_by(&req->r_t, m->begin, m->end)) {
+ /*
+ * If no other installed backoff applies,
+ * resend.
+ */
+ send_request(req);
+ }
+ }
+ }
+}
+
+static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
+{
+ struct ceph_osd_client *osdc = osd->o_osdc;
+ struct MOSDBackoff m;
+ int ret;
+
+ down_read(&osdc->lock);
+ if (!osd_registered(osd)) {
+ dout("%s osd%d unknown\n", __func__, osd->o_osd);
+ up_read(&osdc->lock);
+ return;
+ }
+ WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
+
+ mutex_lock(&osd->lock);
+ ret = decode_MOSDBackoff(msg, &m);
+ if (ret) {
+ pr_err("failed to decode MOSDBackoff: %d\n", ret);
+ ceph_msg_dump(msg);
+ goto out_unlock;
+ }
+
+ switch (m.op) {
+ case CEPH_OSD_BACKOFF_OP_BLOCK:
+ handle_backoff_block(osd, &m);
+ break;
+ case CEPH_OSD_BACKOFF_OP_UNBLOCK:
+ handle_backoff_unblock(osd, &m);
+ break;
+ default:
+ pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
+ }
+
+ free_hoid(m.begin);
+ free_hoid(m.end);
+
+out_unlock:
+ mutex_unlock(&osd->lock);
+ up_read(&osdc->lock);
+}
+
/*
* Process osd watch notifications
*/
@@ -4365,6 +5098,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
case CEPH_MSG_OSD_OPREPLY:
handle_reply(osd, msg);
break;
+ case CEPH_MSG_OSD_BACKOFF:
+ handle_backoff(osd, msg);
+ break;
case CEPH_MSG_WATCH_NOTIFY:
handle_watch_notify(osdc, msg);
break;
@@ -4487,6 +5223,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
*skip = 0;
switch (type) {
case CEPH_MSG_OSD_MAP:
+ case CEPH_MSG_OSD_BACKOFF:
case CEPH_MSG_WATCH_NOTIFY:
return alloc_msg_with_page_vector(hdr);
case CEPH_MSG_OSD_OPREPLY:
@@ -4571,6 +5308,11 @@ static int invalidate_authorizer(struct ceph_connection *con)
return ceph_monc_validate_auth(&osdc->client->monc);
}
+static void osd_reencode_message(struct ceph_msg *msg)
+{
+ encode_request_finish(msg);
+}
+
static int osd_sign_message(struct ceph_msg *msg)
{
struct ceph_osd *o = msg->con->private;
@@ -4595,6 +5337,7 @@ static const struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.invalidate_authorizer = invalidate_authorizer,
.alloc_msg = alloc_msg,
+ .reencode_message = osd_reencode_message,
.sign_message = osd_sign_message,
.check_message_signature = osd_check_message_signature,
.fault = osd_fault,
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index ffe9e904d4d1..864789c5974e 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -11,7 +11,7 @@
#include <linux/crush/hash.h>
#include <linux/crush/mapper.h>
-char *ceph_osdmap_state_str(char *str, int len, int state)
+char *ceph_osdmap_state_str(char *str, int len, u32 state)
{
if (!len)
return str;
@@ -138,19 +138,175 @@ bad:
return -EINVAL;
}
-static int skip_name_map(void **p, void *end)
+static struct crush_choose_arg_map *alloc_choose_arg_map(void)
{
- int len;
- ceph_decode_32_safe(p, end, len ,bad);
- while (len--) {
- int strlen;
- *p += sizeof(u32);
- ceph_decode_32_safe(p, end, strlen, bad);
- *p += strlen;
+ struct crush_choose_arg_map *arg_map;
+
+ arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO);
+ if (!arg_map)
+ return NULL;
+
+ RB_CLEAR_NODE(&arg_map->node);
+ return arg_map;
}
- return 0;
-bad:
- return -EINVAL;
+
+static void free_choose_arg_map(struct crush_choose_arg_map *arg_map)
+{
+ if (arg_map) {
+ int i, j;
+
+ WARN_ON(!RB_EMPTY_NODE(&arg_map->node));
+
+ for (i = 0; i < arg_map->size; i++) {
+ struct crush_choose_arg *arg = &arg_map->args[i];
+
+ for (j = 0; j < arg->weight_set_size; j++)
+ kfree(arg->weight_set[j].weights);
+ kfree(arg->weight_set);
+ kfree(arg->ids);
+ }
+ kfree(arg_map->args);
+ kfree(arg_map);
+ }
+}
+
+DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index,
+ node);
+
+void clear_choose_args(struct crush_map *c)
+{
+ while (!RB_EMPTY_ROOT(&c->choose_args)) {
+ struct crush_choose_arg_map *arg_map =
+ rb_entry(rb_first(&c->choose_args),
+ struct crush_choose_arg_map, node);
+
+ erase_choose_arg_map(&c->choose_args, arg_map);
+ free_choose_arg_map(arg_map);
+ }
+}
+
+static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen)
+{
+ u32 *a = NULL;
+ u32 len;
+ int ret;
+
+ ceph_decode_32_safe(p, end, len, e_inval);
+ if (len) {
+ u32 i;
+
+ a = kmalloc_array(len, sizeof(u32), GFP_NOIO);
+ if (!a) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ ceph_decode_need(p, end, len * sizeof(u32), e_inval);
+ for (i = 0; i < len; i++)
+ a[i] = ceph_decode_32(p);
+ }
+
+ *plen = len;
+ return a;
+
+e_inval:
+ ret = -EINVAL;
+fail:
+ kfree(a);
+ return ERR_PTR(ret);
+}
+
+/*
+ * Assumes @arg is zero-initialized.
+ */
+static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg)
+{
+ int ret;
+
+ ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval);
+ if (arg->weight_set_size) {
+ u32 i;
+
+ arg->weight_set = kmalloc_array(arg->weight_set_size,
+ sizeof(*arg->weight_set),
+ GFP_NOIO);
+ if (!arg->weight_set)
+ return -ENOMEM;
+
+ for (i = 0; i < arg->weight_set_size; i++) {
+ struct crush_weight_set *w = &arg->weight_set[i];
+
+ w->weights = decode_array_32_alloc(p, end, &w->size);
+ if (IS_ERR(w->weights)) {
+ ret = PTR_ERR(w->weights);
+ w->weights = NULL;
+ return ret;
+ }
+ }
+ }
+
+ arg->ids = decode_array_32_alloc(p, end, &arg->ids_size);
+ if (IS_ERR(arg->ids)) {
+ ret = PTR_ERR(arg->ids);
+ arg->ids = NULL;
+ return ret;
+ }
+
+ return 0;
+
+e_inval:
+ return -EINVAL;
+}
+
+static int decode_choose_args(void **p, void *end, struct crush_map *c)
+{
+ struct crush_choose_arg_map *arg_map = NULL;
+ u32 num_choose_arg_maps, num_buckets;
+ int ret;
+
+ ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval);
+ while (num_choose_arg_maps--) {
+ arg_map = alloc_choose_arg_map();
+ if (!arg_map) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ ceph_decode_64_safe(p, end, arg_map->choose_args_index,
+ e_inval);
+ arg_map->size = c->max_buckets;
+ arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args),
+ GFP_NOIO);
+ if (!arg_map->args) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ ceph_decode_32_safe(p, end, num_buckets, e_inval);
+ while (num_buckets--) {
+ struct crush_choose_arg *arg;
+ u32 bucket_index;
+
+ ceph_decode_32_safe(p, end, bucket_index, e_inval);
+ if (bucket_index >= arg_map->size)
+ goto e_inval;
+
+ arg = &arg_map->args[bucket_index];
+ ret = decode_choose_arg(p, end, arg);
+ if (ret)
+ goto fail;
+ }
+
+ insert_choose_arg_map(&c->choose_args, arg_map);
+ }
+
+ return 0;
+
+e_inval:
+ ret = -EINVAL;
+fail:
+ free_choose_arg_map(arg_map);
+ return ret;
}
static void crush_finalize(struct crush_map *c)
@@ -187,7 +343,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
void **p = &pbyval;
void *start = pbyval;
u32 magic;
- u32 num_name_maps;
dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
@@ -195,6 +350,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
if (c == NULL)
return ERR_PTR(-ENOMEM);
+ c->choose_args = RB_ROOT;
+
/* set tunables to default values */
c->choose_local_tries = 2;
c->choose_local_fallback_tries = 5;
@@ -317,6 +474,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
u32 yes;
struct crush_rule *r;
+ err = -EINVAL;
ceph_decode_32_safe(p, end, yes, bad);
if (!yes) {
dout("crush_decode NO rule %d off %x %p to %p\n",
@@ -352,12 +510,9 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
}
}
- /* ignore trailing name maps. */
- for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) {
- err = skip_name_map(p, end);
- if (err < 0)
- goto done;
- }
+ ceph_decode_skip_map(p, end, 32, string, bad); /* type_map */
+ ceph_decode_skip_map(p, end, 32, string, bad); /* name_map */
+ ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
/* tunables */
ceph_decode_need(p, end, 3*sizeof(u32), done);
@@ -390,6 +545,21 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
dout("crush decode tunable chooseleaf_stable = %d\n",
c->chooseleaf_stable);
+ if (*p != end) {
+ /* class_map */
+ ceph_decode_skip_map(p, end, 32, 32, bad);
+ /* class_name */
+ ceph_decode_skip_map(p, end, 32, string, bad);
+ /* class_bucket */
+ ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad);
+ }
+
+ if (*p != end) {
+ err = decode_choose_args(p, end, c);
+ if (err)
+ goto bad;
+ }
+
done:
crush_finalize(c);
dout("crush_decode success\n");
@@ -417,75 +587,49 @@ int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
return 0;
}
-/*
- * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
- * to a set of osds) and primary_temp (explicit primary setting)
- */
-static int __insert_pg_mapping(struct ceph_pg_mapping *new,
- struct rb_root *root)
+int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
{
- struct rb_node **p = &root->rb_node;
- struct rb_node *parent = NULL;
- struct ceph_pg_mapping *pg = NULL;
- int c;
+ int ret;
- dout("__insert_pg_mapping %llx %p\n", *(u64 *)&new->pgid, new);
- while (*p) {
- parent = *p;
- pg = rb_entry(parent, struct ceph_pg_mapping, node);
- c = ceph_pg_compare(&new->pgid, &pg->pgid);
- if (c < 0)
- p = &(*p)->rb_left;
- else if (c > 0)
- p = &(*p)->rb_right;
- else
- return -EEXIST;
- }
+ ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
+ if (ret)
+ return ret;
+
+ if (lhs->shard < rhs->shard)
+ return -1;
+ if (lhs->shard > rhs->shard)
+ return 1;
- rb_link_node(&new->node, parent, p);
- rb_insert_color(&new->node, root);
return 0;
}
-static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root,
- struct ceph_pg pgid)
+static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len)
{
- struct rb_node *n = root->rb_node;
struct ceph_pg_mapping *pg;
- int c;
- while (n) {
- pg = rb_entry(n, struct ceph_pg_mapping, node);
- c = ceph_pg_compare(&pgid, &pg->pgid);
- if (c < 0) {
- n = n->rb_left;
- } else if (c > 0) {
- n = n->rb_right;
- } else {
- dout("__lookup_pg_mapping %lld.%x got %p\n",
- pgid.pool, pgid.seed, pg);
- return pg;
- }
- }
- return NULL;
+ pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO);
+ if (!pg)
+ return NULL;
+
+ RB_CLEAR_NODE(&pg->node);
+ return pg;
}
-static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid)
+static void free_pg_mapping(struct ceph_pg_mapping *pg)
{
- struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid);
+ WARN_ON(!RB_EMPTY_NODE(&pg->node));
- if (pg) {
- dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed,
- pg);
- rb_erase(&pg->node, root);
- kfree(pg);
- return 0;
- }
- dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed);
- return -ENOENT;
+ kfree(pg);
}
/*
+ * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
+ * to a set of osds) and primary_temp (explicit primary setting)
+ */
+DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare,
+ RB_BYPTR, const struct ceph_pg *, node)
+
+/*
* rbtree of pg pool info
*/
static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
@@ -681,11 +825,48 @@ static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
*p += len;
}
+ /*
+ * last_force_op_resend_preluminous, will be overridden if the
+ * map was encoded with RESEND_ON_SPLIT
+ */
if (ev >= 15)
pi->last_force_request_resend = ceph_decode_32(p);
else
pi->last_force_request_resend = 0;
+ if (ev >= 16)
+ *p += 4; /* skip min_read_recency_for_promote */
+
+ if (ev >= 17)
+ *p += 8; /* skip expected_num_objects */
+
+ if (ev >= 19)
+ *p += 4; /* skip cache_target_dirty_high_ratio_micro */
+
+ if (ev >= 20)
+ *p += 4; /* skip min_write_recency_for_promote */
+
+ if (ev >= 21)
+ *p += 1; /* skip use_gmt_hitset */
+
+ if (ev >= 22)
+ *p += 1; /* skip fast_read */
+
+ if (ev >= 23) {
+ *p += 4; /* skip hit_set_grade_decay_rate */
+ *p += 4; /* skip hit_set_search_last_n */
+ }
+
+ if (ev >= 24) {
+ /* skip opts */
+ *p += 1 + 1; /* versions */
+ len = ceph_decode_32(p);
+ *p += len;
+ }
+
+ if (ev >= 25)
+ pi->last_force_request_resend = ceph_decode_32(p);
+
/* ignore the rest */
*p = pool_end;
@@ -742,6 +923,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
map->pool_max = -1;
map->pg_temp = RB_ROOT;
map->primary_temp = RB_ROOT;
+ map->pg_upmap = RB_ROOT;
+ map->pg_upmap_items = RB_ROOT;
mutex_init(&map->crush_workspace_mutex);
return map;
@@ -756,14 +939,28 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
struct ceph_pg_mapping *pg =
rb_entry(rb_first(&map->pg_temp),
struct ceph_pg_mapping, node);
- rb_erase(&pg->node, &map->pg_temp);
- kfree(pg);
+ erase_pg_mapping(&map->pg_temp, pg);
+ free_pg_mapping(pg);
}
while (!RB_EMPTY_ROOT(&map->primary_temp)) {
struct ceph_pg_mapping *pg =
rb_entry(rb_first(&map->primary_temp),
struct ceph_pg_mapping, node);
- rb_erase(&pg->node, &map->primary_temp);
+ erase_pg_mapping(&map->primary_temp, pg);
+ free_pg_mapping(pg);
+ }
+ while (!RB_EMPTY_ROOT(&map->pg_upmap)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(rb_first(&map->pg_upmap),
+ struct ceph_pg_mapping, node);
+ rb_erase(&pg->node, &map->pg_upmap);
+ kfree(pg);
+ }
+ while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) {
+ struct ceph_pg_mapping *pg =
+ rb_entry(rb_first(&map->pg_upmap_items),
+ struct ceph_pg_mapping, node);
+ rb_erase(&pg->node, &map->pg_upmap_items);
kfree(pg);
}
while (!RB_EMPTY_ROOT(&map->pg_pools)) {
@@ -787,7 +984,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
*/
static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
{
- u8 *state;
+ u32 *state;
u32 *weight;
struct ceph_entity_addr *addr;
int i;
@@ -963,47 +1160,40 @@ static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map)
return __decode_pools(p, end, map, true);
}
-static int __decode_pg_temp(void **p, void *end, struct ceph_osdmap *map,
- bool incremental)
+typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool);
+
+static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root,
+ decode_mapping_fn_t fn, bool incremental)
{
u32 n;
+ WARN_ON(!incremental && !fn);
+
ceph_decode_32_safe(p, end, n, e_inval);
while (n--) {
+ struct ceph_pg_mapping *pg;
struct ceph_pg pgid;
- u32 len, i;
int ret;
ret = ceph_decode_pgid(p, end, &pgid);
if (ret)
return ret;
- ceph_decode_32_safe(p, end, len, e_inval);
-
- ret = __remove_pg_mapping(&map->pg_temp, pgid);
- BUG_ON(!incremental && ret != -ENOENT);
-
- if (!incremental || len > 0) {
- struct ceph_pg_mapping *pg;
-
- ceph_decode_need(p, end, len*sizeof(u32), e_inval);
-
- if (len > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
- return -EINVAL;
-
- pg = kzalloc(sizeof(*pg) + len*sizeof(u32), GFP_NOFS);
- if (!pg)
- return -ENOMEM;
+ pg = lookup_pg_mapping(mapping_root, &pgid);
+ if (pg) {
+ WARN_ON(!incremental);
+ erase_pg_mapping(mapping_root, pg);
+ free_pg_mapping(pg);
+ }
- pg->pgid = pgid;
- pg->pg_temp.len = len;
- for (i = 0; i < len; i++)
- pg->pg_temp.osds[i] = ceph_decode_32(p);
+ if (fn) {
+ pg = fn(p, end, incremental);
+ if (IS_ERR(pg))
+ return PTR_ERR(pg);
- ret = __insert_pg_mapping(pg, &map->pg_temp);
- if (ret) {
- kfree(pg);
- return ret;
+ if (pg) {
+ pg->pgid = pgid; /* struct */
+ insert_pg_mapping(mapping_root, pg);
}
}
}
@@ -1014,69 +1204,77 @@ e_inval:
return -EINVAL;
}
+static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end,
+ bool incremental)
+{
+ struct ceph_pg_mapping *pg;
+ u32 len, i;
+
+ ceph_decode_32_safe(p, end, len, e_inval);
+ if (len == 0 && incremental)
+ return NULL; /* new_pg_temp: [] to remove */
+ if (len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32))
+ return ERR_PTR(-EINVAL);
+
+ ceph_decode_need(p, end, len * sizeof(u32), e_inval);
+ pg = alloc_pg_mapping(len * sizeof(u32));
+ if (!pg)
+ return ERR_PTR(-ENOMEM);
+
+ pg->pg_temp.len = len;
+ for (i = 0; i < len; i++)
+ pg->pg_temp.osds[i] = ceph_decode_32(p);
+
+ return pg;
+
+e_inval:
+ return ERR_PTR(-EINVAL);
+}
+
static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map)
{
- return __decode_pg_temp(p, end, map, false);
+ return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
+ false);
}
static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map)
{
- return __decode_pg_temp(p, end, map, true);
+ return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp,
+ true);
}
-static int __decode_primary_temp(void **p, void *end, struct ceph_osdmap *map,
- bool incremental)
+static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end,
+ bool incremental)
{
- u32 n;
-
- ceph_decode_32_safe(p, end, n, e_inval);
- while (n--) {
- struct ceph_pg pgid;
- u32 osd;
- int ret;
-
- ret = ceph_decode_pgid(p, end, &pgid);
- if (ret)
- return ret;
-
- ceph_decode_32_safe(p, end, osd, e_inval);
-
- ret = __remove_pg_mapping(&map->primary_temp, pgid);
- BUG_ON(!incremental && ret != -ENOENT);
-
- if (!incremental || osd != (u32)-1) {
- struct ceph_pg_mapping *pg;
-
- pg = kzalloc(sizeof(*pg), GFP_NOFS);
- if (!pg)
- return -ENOMEM;
+ struct ceph_pg_mapping *pg;
+ u32 osd;
- pg->pgid = pgid;
- pg->primary_temp.osd = osd;
+ ceph_decode_32_safe(p, end, osd, e_inval);
+ if (osd == (u32)-1 && incremental)
+ return NULL; /* new_primary_temp: -1 to remove */
- ret = __insert_pg_mapping(pg, &map->primary_temp);
- if (ret) {
- kfree(pg);
- return ret;
- }
- }
- }
+ pg = alloc_pg_mapping(0);
+ if (!pg)
+ return ERR_PTR(-ENOMEM);
- return 0;
+ pg->primary_temp.osd = osd;
+ return pg;
e_inval:
- return -EINVAL;
+ return ERR_PTR(-EINVAL);
}
static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map)
{
- return __decode_primary_temp(p, end, map, false);
+ return decode_pg_mapping(p, end, &map->primary_temp,
+ __decode_primary_temp, false);
}
static int decode_new_primary_temp(void **p, void *end,
struct ceph_osdmap *map)
{
- return __decode_primary_temp(p, end, map, true);
+ return decode_pg_mapping(p, end, &map->primary_temp,
+ __decode_primary_temp, true);
}
u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd)
@@ -1167,6 +1365,75 @@ e_inval:
return -EINVAL;
}
+static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end,
+ bool __unused)
+{
+ return __decode_pg_temp(p, end, false);
+}
+
+static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
+ false);
+}
+
+static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap,
+ true);
+}
+
+static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true);
+}
+
+static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end,
+ bool __unused)
+{
+ struct ceph_pg_mapping *pg;
+ u32 len, i;
+
+ ceph_decode_32_safe(p, end, len, e_inval);
+ if (len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32)))
+ return ERR_PTR(-EINVAL);
+
+ ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval);
+ pg = kzalloc(sizeof(*pg) + 2 * len * sizeof(u32), GFP_NOIO);
+ if (!pg)
+ return ERR_PTR(-ENOMEM);
+
+ pg->pg_upmap_items.len = len;
+ for (i = 0; i < len; i++) {
+ pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p);
+ pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p);
+ }
+
+ return pg;
+
+e_inval:
+ return ERR_PTR(-EINVAL);
+}
+
+static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap_items,
+ __decode_pg_upmap_items, false);
+}
+
+static int decode_new_pg_upmap_items(void **p, void *end,
+ struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap_items,
+ __decode_pg_upmap_items, true);
+}
+
+static int decode_old_pg_upmap_items(void **p, void *end,
+ struct ceph_osdmap *map)
+{
+ return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true);
+}
+
/*
* decode a full map.
*/
@@ -1217,13 +1484,21 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
/* osd_state, osd_weight, osd_addrs->client_addr */
ceph_decode_need(p, end, 3*sizeof(u32) +
- map->max_osd*(1 + sizeof(*map->osd_weight) +
+ map->max_osd*((struct_v >= 5 ? sizeof(u32) :
+ sizeof(u8)) +
+ sizeof(*map->osd_weight) +
sizeof(*map->osd_addr)), e_inval);
if (ceph_decode_32(p) != map->max_osd)
goto e_inval;
- ceph_decode_copy(p, map->osd_state, map->max_osd);
+ if (struct_v >= 5) {
+ for (i = 0; i < map->max_osd; i++)
+ map->osd_state[i] = ceph_decode_32(p);
+ } else {
+ for (i = 0; i < map->max_osd; i++)
+ map->osd_state[i] = ceph_decode_8(p);
+ }
if (ceph_decode_32(p) != map->max_osd)
goto e_inval;
@@ -1256,9 +1531,7 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
if (err)
goto bad;
} else {
- /* XXX can this happen? */
- kfree(map->osd_primary_affinity);
- map->osd_primary_affinity = NULL;
+ WARN_ON(map->osd_primary_affinity);
}
/* crush */
@@ -1267,6 +1540,26 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
if (err)
goto bad;
+ *p += len;
+ if (struct_v >= 3) {
+ /* erasure_code_profiles */
+ ceph_decode_skip_map_of_map(p, end, string, string, string,
+ bad);
+ }
+
+ if (struct_v >= 4) {
+ err = decode_pg_upmap(p, end, map);
+ if (err)
+ goto bad;
+
+ err = decode_pg_upmap_items(p, end, map);
+ if (err)
+ goto bad;
+ } else {
+ WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap));
+ WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items));
+ }
+
/* ignore the rest */
*p = end;
@@ -1313,7 +1606,7 @@ struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
* new_up_client: { osd=6, addr=... } # set osd_state and addr
* new_state: { osd=6, xorstate=EXISTS } # clear osd_state
*/
-static int decode_new_up_state_weight(void **p, void *end,
+static int decode_new_up_state_weight(void **p, void *end, u8 struct_v,
struct ceph_osdmap *map)
{
void *new_up_client;
@@ -1329,7 +1622,7 @@ static int decode_new_up_state_weight(void **p, void *end,
new_state = *p;
ceph_decode_32_safe(p, end, len, e_inval);
- len *= sizeof(u32) + sizeof(u8);
+ len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8));
ceph_decode_need(p, end, len, e_inval);
*p += len;
@@ -1365,11 +1658,14 @@ static int decode_new_up_state_weight(void **p, void *end,
len = ceph_decode_32(p);
while (len--) {
s32 osd;
- u8 xorstate;
+ u32 xorstate;
int ret;
osd = ceph_decode_32(p);
- xorstate = ceph_decode_8(p);
+ if (struct_v >= 5)
+ xorstate = ceph_decode_32(p);
+ else
+ xorstate = ceph_decode_8(p);
if (xorstate == 0)
xorstate = CEPH_OSD_UP;
BUG_ON(osd >= map->max_osd);
@@ -1503,7 +1799,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
}
/* new_up_client, new_state, new_weight */
- err = decode_new_up_state_weight(p, end, map);
+ err = decode_new_up_state_weight(p, end, struct_v, map);
if (err)
goto bad;
@@ -1526,6 +1822,32 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
goto bad;
}
+ if (struct_v >= 3) {
+ /* new_erasure_code_profiles */
+ ceph_decode_skip_map_of_map(p, end, string, string, string,
+ bad);
+ /* old_erasure_code_profiles */
+ ceph_decode_skip_set(p, end, string, bad);
+ }
+
+ if (struct_v >= 4) {
+ err = decode_new_pg_upmap(p, end, map);
+ if (err)
+ goto bad;
+
+ err = decode_old_pg_upmap(p, end, map);
+ if (err)
+ goto bad;
+
+ err = decode_new_pg_upmap_items(p, end, map);
+ if (err)
+ goto bad;
+
+ err = decode_old_pg_upmap_items(p, end, map);
+ if (err)
+ goto bad;
+ }
+
/* ignore the rest */
*p = end;
@@ -1546,12 +1868,13 @@ bad:
void ceph_oloc_copy(struct ceph_object_locator *dest,
const struct ceph_object_locator *src)
{
- WARN_ON(!ceph_oloc_empty(dest));
- WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
+ ceph_oloc_destroy(dest);
dest->pool = src->pool;
if (src->pool_ns)
dest->pool_ns = ceph_get_string(src->pool_ns);
+ else
+ dest->pool_ns = NULL;
}
EXPORT_SYMBOL(ceph_oloc_copy);
@@ -1564,14 +1887,15 @@ EXPORT_SYMBOL(ceph_oloc_destroy);
void ceph_oid_copy(struct ceph_object_id *dest,
const struct ceph_object_id *src)
{
- WARN_ON(!ceph_oid_empty(dest));
+ ceph_oid_destroy(dest);
if (src->name != src->inline_name) {
/* very rare, see ceph_object_id definition */
dest->name = kmalloc(src->name_len + 1,
GFP_NOIO | __GFP_NOFAIL);
+ } else {
+ dest->name = dest->inline_name;
}
-
memcpy(dest->name, src->name, src->name_len + 1);
dest->name_len = src->name_len;
}
@@ -1713,9 +2037,8 @@ void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
dest->primary = src->primary;
}
-static bool is_split(const struct ceph_pg *pgid,
- u32 old_pg_num,
- u32 new_pg_num)
+bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num,
+ u32 new_pg_num)
{
int old_bits = calc_bits_of(old_pg_num);
int old_mask = (1 << old_bits) - 1;
@@ -1760,7 +2083,7 @@ bool ceph_is_new_interval(const struct ceph_osds *old_acting,
!osds_equal(old_up, new_up) ||
old_size != new_size ||
old_min_size != new_min_size ||
- is_split(pgid, old_pg_num, new_pg_num) ||
+ ceph_pg_is_split(pgid, old_pg_num, new_pg_num) ||
old_sort_bitwise != new_sort_bitwise;
}
@@ -1884,16 +2207,12 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping);
* Should only be called with target_oid and target_oloc (as opposed to
* base_oid and base_oloc), since tiering isn't taken into account.
*/
-int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
- struct ceph_object_id *oid,
- struct ceph_object_locator *oloc,
- struct ceph_pg *raw_pgid)
+int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
+ const struct ceph_object_id *oid,
+ const struct ceph_object_locator *oloc,
+ struct ceph_pg *raw_pgid)
{
- struct ceph_pg_pool_info *pi;
-
- pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
- if (!pi)
- return -ENOENT;
+ WARN_ON(pi->id != oloc->pool);
if (!oloc->pool_ns) {
raw_pgid->pool = oloc->pool;
@@ -1925,6 +2244,20 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
}
return 0;
}
+
+int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
+ const struct ceph_object_id *oid,
+ const struct ceph_object_locator *oloc,
+ struct ceph_pg *raw_pgid)
+{
+ struct ceph_pg_pool_info *pi;
+
+ pi = ceph_pg_pool_by_id(osdmap, oloc->pool);
+ if (!pi)
+ return -ENOENT;
+
+ return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
+}
EXPORT_SYMBOL(ceph_object_locator_to_pg);
/*
@@ -1969,23 +2302,57 @@ static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
int *result, int result_max,
- const __u32 *weight, int weight_max)
+ const __u32 *weight, int weight_max,
+ u64 choose_args_index)
{
+ struct crush_choose_arg_map *arg_map;
int r;
BUG_ON(result_max > CEPH_PG_MAX_SIZE);
+ arg_map = lookup_choose_arg_map(&map->crush->choose_args,
+ choose_args_index);
+
mutex_lock(&map->crush_workspace_mutex);
r = crush_do_rule(map->crush, ruleno, x, result, result_max,
- weight, weight_max, map->crush_workspace);
+ weight, weight_max, map->crush_workspace,
+ arg_map ? arg_map->args : NULL);
mutex_unlock(&map->crush_workspace_mutex);
return r;
}
+static void remove_nonexistent_osds(struct ceph_osdmap *osdmap,
+ struct ceph_pg_pool_info *pi,
+ struct ceph_osds *set)
+{
+ int i;
+
+ if (ceph_can_shift_osds(pi)) {
+ int removed = 0;
+
+ /* shift left */
+ for (i = 0; i < set->size; i++) {
+ if (!ceph_osd_exists(osdmap, set->osds[i])) {
+ removed++;
+ continue;
+ }
+ if (removed)
+ set->osds[i - removed] = set->osds[i];
+ }
+ set->size -= removed;
+ } else {
+ /* set dne devices to NONE */
+ for (i = 0; i < set->size; i++) {
+ if (!ceph_osd_exists(osdmap, set->osds[i]))
+ set->osds[i] = CRUSH_ITEM_NONE;
+ }
+ }
+}
+
/*
- * Calculate raw set (CRUSH output) for given PG. The result may
- * contain nonexistent OSDs. ->primary is undefined for a raw set.
+ * Calculate raw set (CRUSH output) for given PG and filter out
+ * nonexistent OSDs. ->primary is undefined for a raw set.
*
* Placement seed (CRUSH input) is returned through @ppps.
*/
@@ -2019,7 +2386,7 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
}
len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
- osdmap->osd_weight, osdmap->max_osd);
+ osdmap->osd_weight, osdmap->max_osd, pi->id);
if (len < 0) {
pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
len, ruleno, pi->id, pi->crush_ruleset, pi->type,
@@ -2028,6 +2395,70 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
}
raw->size = len;
+ remove_nonexistent_osds(osdmap, pi, raw);
+}
+
+/* apply pg_upmap[_items] mappings */
+static void apply_upmap(struct ceph_osdmap *osdmap,
+ const struct ceph_pg *pgid,
+ struct ceph_osds *raw)
+{
+ struct ceph_pg_mapping *pg;
+ int i, j;
+
+ pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid);
+ if (pg) {
+ /* make sure targets aren't marked out */
+ for (i = 0; i < pg->pg_upmap.len; i++) {
+ int osd = pg->pg_upmap.osds[i];
+
+ if (osd != CRUSH_ITEM_NONE &&
+ osd < osdmap->max_osd &&
+ osdmap->osd_weight[osd] == 0) {
+ /* reject/ignore explicit mapping */
+ return;
+ }
+ }
+ for (i = 0; i < pg->pg_upmap.len; i++)
+ raw->osds[i] = pg->pg_upmap.osds[i];
+ raw->size = pg->pg_upmap.len;
+ return;
+ }
+
+ pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid);
+ if (pg) {
+ /*
+ * Note: this approach does not allow a bidirectional swap,
+ * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
+ */
+ for (i = 0; i < pg->pg_upmap_items.len; i++) {
+ int from = pg->pg_upmap_items.from_to[i][0];
+ int to = pg->pg_upmap_items.from_to[i][1];
+ int pos = -1;
+ bool exists = false;
+
+ /* make sure replacement doesn't already appear */
+ for (j = 0; j < raw->size; j++) {
+ int osd = raw->osds[j];
+
+ if (osd == to) {
+ exists = true;
+ break;
+ }
+ /* ignore mapping if target is marked out */
+ if (osd == from && pos < 0 &&
+ !(to != CRUSH_ITEM_NONE &&
+ to < osdmap->max_osd &&
+ osdmap->osd_weight[to] == 0)) {
+ pos = j;
+ }
+ }
+ if (!exists && pos >= 0) {
+ raw->osds[pos] = to;
+ return;
+ }
+ }
+ }
}
/*
@@ -2150,18 +2581,16 @@ static void apply_primary_affinity(struct ceph_osdmap *osdmap,
*/
static void get_temp_osds(struct ceph_osdmap *osdmap,
struct ceph_pg_pool_info *pi,
- const struct ceph_pg *raw_pgid,
+ const struct ceph_pg *pgid,
struct ceph_osds *temp)
{
- struct ceph_pg pgid;
struct ceph_pg_mapping *pg;
int i;
- raw_pg_to_pg(pi, raw_pgid, &pgid);
ceph_osds_init(temp);
/* pg_temp? */
- pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
+ pg = lookup_pg_mapping(&osdmap->pg_temp, pgid);
if (pg) {
for (i = 0; i < pg->pg_temp.len; i++) {
if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
@@ -2184,7 +2613,7 @@ static void get_temp_osds(struct ceph_osdmap *osdmap,
}
/* primary_temp? */
- pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
+ pg = lookup_pg_mapping(&osdmap->primary_temp, pgid);
if (pg)
temp->primary = pg->primary_temp.osd;
}
@@ -2197,43 +2626,75 @@ static void get_temp_osds(struct ceph_osdmap *osdmap,
* resend a request.
*/
void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
+ struct ceph_pg_pool_info *pi,
const struct ceph_pg *raw_pgid,
struct ceph_osds *up,
struct ceph_osds *acting)
{
- struct ceph_pg_pool_info *pi;
+ struct ceph_pg pgid;
u32 pps;
- pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
- if (!pi) {
- ceph_osds_init(up);
- ceph_osds_init(acting);
- goto out;
- }
+ WARN_ON(pi->id != raw_pgid->pool);
+ raw_pg_to_pg(pi, raw_pgid, &pgid);
pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
+ apply_upmap(osdmap, &pgid, up);
raw_to_up_osds(osdmap, pi, up);
apply_primary_affinity(osdmap, pi, pps, up);
- get_temp_osds(osdmap, pi, raw_pgid, acting);
+ get_temp_osds(osdmap, pi, &pgid, acting);
if (!acting->size) {
memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
acting->size = up->size;
if (acting->primary == -1)
acting->primary = up->primary;
}
-out:
WARN_ON(!osds_valid(up) || !osds_valid(acting));
}
+bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
+ struct ceph_pg_pool_info *pi,
+ const struct ceph_pg *raw_pgid,
+ struct ceph_spg *spgid)
+{
+ struct ceph_pg pgid;
+ struct ceph_osds up, acting;
+ int i;
+
+ WARN_ON(pi->id != raw_pgid->pool);
+ raw_pg_to_pg(pi, raw_pgid, &pgid);
+
+ if (ceph_can_shift_osds(pi)) {
+ spgid->pgid = pgid; /* struct */
+ spgid->shard = CEPH_SPG_NOSHARD;
+ return true;
+ }
+
+ ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting);
+ for (i = 0; i < acting.size; i++) {
+ if (acting.osds[i] == acting.primary) {
+ spgid->pgid = pgid; /* struct */
+ spgid->shard = i;
+ return true;
+ }
+ }
+
+ return false;
+}
+
/*
* Return acting primary for given PG, or -1 if none.
*/
int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
const struct ceph_pg *raw_pgid)
{
+ struct ceph_pg_pool_info *pi;
struct ceph_osds up, acting;
- ceph_pg_to_up_acting_osds(osdmap, raw_pgid, &up, &acting);
+ pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
+ if (!pi)
+ return -1;
+
+ ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting);
return acting.primary;
}
EXPORT_SYMBOL(ceph_pg_to_acting_primary);