diff options
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 40 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 6 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 195 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 38 |
4 files changed, 247 insertions, 32 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 7fd102960a81..9951c81b82ed 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -512,23 +512,26 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, } /** - * rpcrdma_unmap_sges - DMA-unmap Send buffers - * @ia: interface adapter (device) - * @req: req with possibly some SGEs to be DMA unmapped + * rpcrdma_unmap_sendctx - DMA-unmap Send buffers + * @sc: sendctx containing SGEs to unmap * */ void -rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req) +rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc) { + struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia; struct ib_sge *sge; unsigned int count; + dprintk("RPC: %s: unmapping %u sges for sc=%p\n", + __func__, sc->sc_unmap_count, sc); + /* The first two SGEs contain the transport header and * the inline buffer. These are always left mapped so * they can be cheaply re-used. */ - sge = &req->rl_send_sge[2]; - for (count = req->rl_mapped_sges; count--; sge++) + sge = &sc->sc_sges[2]; + for (count = sc->sc_unmap_count; count; ++sge, --count) ib_dma_unmap_page(ia->ri_device, sge->addr, sge->length, DMA_TO_DEVICE); } @@ -539,8 +542,9 @@ static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req, u32 len) { + struct rpcrdma_sendctx *sc = req->rl_sendctx; struct rpcrdma_regbuf *rb = req->rl_rdmabuf; - struct ib_sge *sge = &req->rl_send_sge[0]; + struct ib_sge *sge = sc->sc_sges; if (!rpcrdma_dma_map_regbuf(ia, rb)) goto out_regbuf; @@ -550,7 +554,7 @@ rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req, ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); - req->rl_send_wr.num_sge++; + sc->sc_wr.num_sge++; return true; out_regbuf: @@ -565,10 +569,11 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) { + struct rpcrdma_sendctx *sc = req->rl_sendctx; unsigned int sge_no, page_base, len, remaining; struct rpcrdma_regbuf *rb = req->rl_sendbuf; struct ib_device *device = ia->ri_device; - struct ib_sge *sge = req->rl_send_sge; + struct ib_sge *sge = sc->sc_sges; u32 lkey = ia->ri_pd->local_dma_lkey; struct page *page, **ppages; @@ -631,7 +636,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, sge[sge_no].length = len; sge[sge_no].lkey = lkey; - req->rl_mapped_sges++; + sc->sc_unmap_count++; ppages++; remaining -= len; page_base = 0; @@ -657,11 +662,11 @@ map_tail: goto out_mapping_err; sge[sge_no].length = len; sge[sge_no].lkey = lkey; - req->rl_mapped_sges++; + sc->sc_unmap_count++; } out: - req->rl_send_wr.num_sge += sge_no; + sc->sc_wr.num_sge += sge_no; return true; out_regbuf: @@ -669,12 +674,12 @@ out_regbuf: return false; out_mapping_overflow: - rpcrdma_unmap_sges(ia, req); + rpcrdma_unmap_sendctx(sc); pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); return false; out_mapping_err: - rpcrdma_unmap_sges(ia, req); + rpcrdma_unmap_sendctx(sc); pr_err("rpcrdma: Send mapping error\n"); return false; } @@ -694,8 +699,11 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, u32 hdrlen, struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) { - req->rl_send_wr.num_sge = 0; - req->rl_mapped_sges = 0; + req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf); + if (!req->rl_sendctx) + return -ENOBUFS; + req->rl_sendctx->sc_wr.num_sge = 0; + req->rl_sendctx->sc_unmap_count = 0; if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen)) return -EIO; diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index eb46d2479b09..7be6e2519197 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -687,7 +687,6 @@ xprt_rdma_free(struct rpc_task *task) if (!list_empty(&req->rl_registered)) ia->ri_ops->ro_unmap_sync(r_xprt, &req->rl_registered); - rpcrdma_unmap_sges(ia, req); rpcrdma_buffer_put(req); } @@ -790,11 +789,12 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) r_xprt->rx_stats.failed_marshal_count, r_xprt->rx_stats.bad_reply_count, r_xprt->rx_stats.nomsg_call_count); - seq_printf(seq, "%lu %lu %lu %lu\n", + seq_printf(seq, "%lu %lu %lu %lu %lu\n", r_xprt->rx_stats.mrs_recovered, r_xprt->rx_stats.mrs_orphaned, r_xprt->rx_stats.mrs_allocated, - r_xprt->rx_stats.local_inv_needed); + r_xprt->rx_stats.local_inv_needed, + r_xprt->rx_stats.empty_sendctx_q); } static int diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 247b00b715c2..1bf7b1ee5699 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -52,6 +52,8 @@ #include <linux/prefetch.h> #include <linux/sunrpc/addr.h> #include <linux/sunrpc/svc_rdma.h> + +#include <asm-generic/barrier.h> #include <asm/bitops.h> #include <rdma/ib_cm.h> @@ -126,11 +128,17 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) { + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_sendctx *sc = + container_of(cqe, struct rpcrdma_sendctx, sc_cqe); + /* WARNING: Only wr_cqe and status are reliable at this point */ if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) pr_err("rpcrdma: Send: %s (%u/0x%x)\n", ib_wc_status_msg(wc->status), wc->status, wc->vendor_err); + + rpcrdma_sendctx_put_locked(sc); } /** @@ -542,6 +550,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.cap.max_recv_sge); /* set trigger for requesting send completion */ + ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH, + cdata->max_requests >> 2); + ep->rep_send_count = ep->rep_send_batch; ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; if (ep->rep_cqinit <= 2) ep->rep_cqinit = 0; /* always signal? */ @@ -824,6 +835,168 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ib_drain_qp(ia->ri_id->qp); } +/* Fixed-size circular FIFO queue. This implementation is wait-free and + * lock-free. + * + * Consumer is the code path that posts Sends. This path dequeues a + * sendctx for use by a Send operation. Multiple consumer threads + * are serialized by the RPC transport lock, which allows only one + * ->send_request call at a time. + * + * Producer is the code path that handles Send completions. This path + * enqueues a sendctx that has been completed. Multiple producer + * threads are serialized by the ib_poll_cq() function. + */ + +/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced + * queue activity, and ib_drain_qp has flushed all remaining Send + * requests. + */ +static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) +{ + unsigned long i; + + for (i = 0; i <= buf->rb_sc_last; i++) + kfree(buf->rb_sc_ctxs[i]); + kfree(buf->rb_sc_ctxs); +} + +static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) +{ + struct rpcrdma_sendctx *sc; + + sc = kzalloc(sizeof(*sc) + + ia->ri_max_send_sges * sizeof(struct ib_sge), + GFP_KERNEL); + if (!sc) + return NULL; + + sc->sc_wr.wr_cqe = &sc->sc_cqe; + sc->sc_wr.sg_list = sc->sc_sges; + sc->sc_wr.opcode = IB_WR_SEND; + sc->sc_cqe.done = rpcrdma_wc_send; + return sc; +} + +static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_sendctx *sc; + unsigned long i; + + /* Maximum number of concurrent outstanding Send WRs. Capping + * the circular queue size stops Send Queue overflow by causing + * the ->send_request call to fail temporarily before too many + * Sends are posted. + */ + i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; + dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); + buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); + if (!buf->rb_sc_ctxs) + return -ENOMEM; + + buf->rb_sc_last = i - 1; + for (i = 0; i <= buf->rb_sc_last; i++) { + sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); + if (!sc) + goto out_destroy; + + sc->sc_xprt = r_xprt; + buf->rb_sc_ctxs[i] = sc; + } + + return 0; + +out_destroy: + rpcrdma_sendctxs_destroy(buf); + return -ENOMEM; +} + +/* The sendctx queue is not guaranteed to have a size that is a + * power of two, thus the helpers in circ_buf.h cannot be used. + * The other option is to use modulus (%), which can be expensive. + */ +static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, + unsigned long item) +{ + return likely(item < buf->rb_sc_last) ? item + 1 : 0; +} + +/** + * rpcrdma_sendctx_get_locked - Acquire a send context + * @buf: transport buffers from which to acquire an unused context + * + * Returns pointer to a free send completion context; or NULL if + * the queue is empty. + * + * Usage: Called to acquire an SGE array before preparing a Send WR. + * + * The caller serializes calls to this function (per rpcrdma_buffer), + * and provides an effective memory barrier that flushes the new value + * of rb_sc_head. + */ +struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_xprt *r_xprt; + struct rpcrdma_sendctx *sc; + unsigned long next_head; + + next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); + + if (next_head == READ_ONCE(buf->rb_sc_tail)) + goto out_emptyq; + + /* ORDER: item must be accessed _before_ head is updated */ + sc = buf->rb_sc_ctxs[next_head]; + + /* Releasing the lock in the caller acts as a memory + * barrier that flushes rb_sc_head. + */ + buf->rb_sc_head = next_head; + + return sc; + +out_emptyq: + /* The queue is "empty" if there have not been enough Send + * completions recently. This is a sign the Send Queue is + * backing up. Cause the caller to pause and try again. + */ + dprintk("RPC: %s: empty sendctx queue\n", __func__); + r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); + r_xprt->rx_stats.empty_sendctx_q++; + return NULL; +} + +/** + * rpcrdma_sendctx_put_locked - Release a send context + * @sc: send context to release + * + * Usage: Called from Send completion to return a sendctxt + * to the queue. + * + * The caller serializes calls to this function (per rpcrdma_buffer). + */ +void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) +{ + struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; + unsigned long next_tail; + + /* Unmap SGEs of previously completed by unsignaled + * Sends by walking up the queue until @sc is found. + */ + next_tail = buf->rb_sc_tail; + do { + next_tail = rpcrdma_sendctx_next(buf, next_tail); + + /* ORDER: item must be accessed _before_ tail is updated */ + rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]); + + } while (buf->rb_sc_ctxs[next_tail] != sc); + + /* Paired with READ_ONCE */ + smp_store_release(&buf->rb_sc_tail, next_tail); +} + static void rpcrdma_mr_recovery_worker(struct work_struct *work) { @@ -919,13 +1092,8 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) spin_lock(&buffer->rb_reqslock); list_add(&req->rl_all, &buffer->rb_allreqs); spin_unlock(&buffer->rb_reqslock); - req->rl_cqe.done = rpcrdma_wc_send; req->rl_buffer = &r_xprt->rx_buf; INIT_LIST_HEAD(&req->rl_registered); - req->rl_send_wr.next = NULL; - req->rl_send_wr.wr_cqe = &req->rl_cqe; - req->rl_send_wr.sg_list = req->rl_send_sge; - req->rl_send_wr.opcode = IB_WR_SEND; return req; } @@ -1017,6 +1185,10 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) list_add(&rep->rr_list, &buf->rb_recv_bufs); } + rc = rpcrdma_sendctxs_create(r_xprt); + if (rc) + goto out; + return 0; out: rpcrdma_buffer_destroy(buf); @@ -1093,6 +1265,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) cancel_delayed_work_sync(&buf->rb_recovery_worker); cancel_delayed_work_sync(&buf->rb_refresh_worker); + rpcrdma_sendctxs_destroy(buf); + while (!list_empty(&buf->rb_recv_bufs)) { struct rpcrdma_rep *rep; @@ -1208,7 +1382,6 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) struct rpcrdma_buffer *buffers = req->rl_buffer; struct rpcrdma_rep *rep = req->rl_reply; - req->rl_send_wr.num_sge = 0; req->rl_reply = NULL; spin_lock(&buffers->rb_lock); @@ -1340,7 +1513,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_req *req) { - struct ib_send_wr *send_wr = &req->rl_send_wr; + struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; struct ib_send_wr *send_wr_fail; int rc; @@ -1354,7 +1527,13 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, dprintk("RPC: %s: posting %d s/g entries\n", __func__, send_wr->num_sge); - rpcrdma_set_signaled(ep, send_wr); + if (!ep->rep_send_count) { + send_wr->send_flags |= IB_SEND_SIGNALED; + ep->rep_send_count = ep->rep_send_batch; + } else { + send_wr->send_flags &= ~IB_SEND_SIGNALED; + --ep->rep_send_count; + } rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); if (rc) goto out_postsend_err; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 0b8ca5e5c706..537cfabe47d1 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -93,6 +93,8 @@ enum { */ struct rpcrdma_ep { + unsigned int rep_send_count; + unsigned int rep_send_batch; atomic_t rep_cqcount; int rep_cqinit; int rep_connected; @@ -232,6 +234,27 @@ struct rpcrdma_rep { struct ib_recv_wr rr_recv_wr; }; +/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes + */ +struct rpcrdma_xprt; +struct rpcrdma_sendctx { + struct ib_send_wr sc_wr; + struct ib_cqe sc_cqe; + struct rpcrdma_xprt *sc_xprt; + unsigned int sc_unmap_count; + struct ib_sge sc_sges[]; +}; + +/* Limit the number of SGEs that can be unmapped during one + * Send completion. This caps the amount of work a single + * completion can do before returning to the provider. + * + * Setting this to zero disables Send completion batching. + */ +enum { + RPCRDMA_MAX_SEND_BATCH = 7, +}; + /* * struct rpcrdma_mw - external memory region metadata * @@ -343,19 +366,16 @@ enum { struct rpcrdma_buffer; struct rpcrdma_req { struct list_head rl_list; - unsigned int rl_mapped_sges; unsigned int rl_connect_cookie; struct rpcrdma_buffer *rl_buffer; struct rpcrdma_rep *rl_reply; struct xdr_stream rl_stream; struct xdr_buf rl_hdrbuf; - struct ib_send_wr rl_send_wr; - struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES]; + struct rpcrdma_sendctx *rl_sendctx; struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */ struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */ struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ - struct ib_cqe rl_cqe; struct list_head rl_all; bool rl_backchannel; @@ -402,6 +422,11 @@ struct rpcrdma_buffer { struct list_head rb_mws; struct list_head rb_all; + unsigned long rb_sc_head; + unsigned long rb_sc_tail; + unsigned long rb_sc_last; + struct rpcrdma_sendctx **rb_sc_ctxs; + spinlock_t rb_lock; /* protect buf lists */ int rb_send_count, rb_recv_count; struct list_head rb_send_bufs; @@ -456,6 +481,7 @@ struct rpcrdma_stats { unsigned long mrs_recovered; unsigned long mrs_orphaned; unsigned long mrs_allocated; + unsigned long empty_sendctx_q; /* accessed when receiving a reply */ unsigned long long total_rdma_reply; @@ -557,6 +583,8 @@ struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); void rpcrdma_destroy_req(struct rpcrdma_req *); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); +struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf); +void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *); void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *); @@ -617,7 +645,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, u32 hdrlen, struct xdr_buf *xdr, enum rpcrdma_chunktype rtype); -void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *); +void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc); int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); |