diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/clnt.c | 21 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 3 | ||||
-rw-r--r-- | net/sunrpc/xdr.c | 11 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 22 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 53 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 413 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 33 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 194 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 18 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 2 |
11 files changed, 435 insertions, 337 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index f7f78566be46..a3379765605d 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -591,6 +591,9 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args) xprt->resvport = 1; if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT) xprt->resvport = 0; + xprt->reuseport = 0; + if (args->flags & RPC_CLNT_CREATE_REUSEPORT) + xprt->reuseport = 1; clnt = rpc_create_xprt(args, xprt); if (IS_ERR(clnt) || args->nconnect <= 1) @@ -1676,8 +1679,6 @@ call_reserveresult(struct rpc_task *task) return; } - printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n", - __func__, status); rpc_call_rpcerror(task, -EIO); return; } @@ -1686,11 +1687,8 @@ call_reserveresult(struct rpc_task *task) * Even though there was an error, we may have acquired * a request slot somehow. Make sure not to leak it. */ - if (task->tk_rqstp) { - printk(KERN_ERR "%s: status=%d, request allocated anyway\n", - __func__, status); + if (task->tk_rqstp) xprt_release(task); - } switch (status) { case -ENOMEM: @@ -1699,14 +1697,9 @@ call_reserveresult(struct rpc_task *task) case -EAGAIN: /* woken up; retry */ task->tk_action = call_retry_reserve; return; - case -EIO: /* probably a shutdown */ - break; default: - printk(KERN_ERR "%s: unrecognized error %d, exiting\n", - __func__, status); - break; + rpc_call_rpcerror(task, status); } - rpc_call_rpcerror(task, status); } /* @@ -2906,7 +2899,7 @@ int rpc_clnt_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt; unsigned long connect_timeout; unsigned long reconnect_timeout; - unsigned char resvport; + unsigned char resvport, reuseport; int ret = 0; rcu_read_lock(); @@ -2918,6 +2911,7 @@ int rpc_clnt_add_xprt(struct rpc_clnt *clnt, return -EAGAIN; } resvport = xprt->resvport; + reuseport = xprt->reuseport; connect_timeout = xprt->connect_timeout; reconnect_timeout = xprt->max_reconnect_timeout; rcu_read_unlock(); @@ -2928,6 +2922,7 @@ int rpc_clnt_add_xprt(struct rpc_clnt *clnt, goto out_put_switch; } xprt->resvport = resvport; + xprt->reuseport = reuseport; if (xprt->ops->set_connect_timeout != NULL) xprt->ops->set_connect_timeout(xprt, connect_timeout, diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 360afe153193..9c79548c6847 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -260,7 +260,7 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c rpc_reset_waitqueue_priority(queue); queue->qlen = 0; queue->timer_list.expires = 0; - INIT_DEFERRABLE_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn); + INIT_DELAYED_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn); INIT_LIST_HEAD(&queue->timer_list.list); rpc_assign_waitqueue_name(queue, qname); } @@ -824,6 +824,7 @@ rpc_reset_task_statistics(struct rpc_task *task) */ void rpc_exit_task(struct rpc_task *task) { + trace_rpc_task_end(task, task->tk_action); task->tk_action = NULL; if (task->tk_ops->rpc_count_stats) task->tk_ops->rpc_count_stats(task, task->tk_calldata); diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 14ba9e72a204..f3104be8ff5d 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -436,13 +436,12 @@ xdr_shrink_bufhead(struct xdr_buf *buf, size_t len) } /** - * xdr_shrink_pagelen + * xdr_shrink_pagelen - shrinks buf->pages by up to @len bytes * @buf: xdr_buf * @len: bytes to remove from buf->pages * - * Shrinks XDR buffer's page array buf->pages by - * 'len' bytes. The extra data is not lost, but is instead - * moved into the tail. + * The extra data is not lost, but is instead moved into buf->tail. + * Returns the actual number of bytes moved. */ static unsigned int xdr_shrink_pagelen(struct xdr_buf *buf, size_t len) @@ -455,8 +454,8 @@ xdr_shrink_pagelen(struct xdr_buf *buf, size_t len) result = 0; tail = buf->tail; - BUG_ON (len > pglen); - + if (len > buf->page_len) + len = buf-> page_len; tailbuf_len = buf->buflen - buf->head->iov_len - buf->page_len; /* Shift the tail first */ diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 41df4c507193..1aafe8d3f3f4 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -205,20 +205,20 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) - return 1; + goto out_locked; goto out_sleep; } if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) goto out_unlock; xprt->snd_task = task; +out_locked: + trace_xprt_reserve_xprt(xprt, task); return 1; out_unlock: xprt_clear_locked(xprt); out_sleep: - dprintk("RPC: %5u failed to lock transport %p\n", - task->tk_pid, xprt); task->tk_status = -EAGAIN; if (RPC_IS_SOFT(task)) rpc_sleep_on_timeout(&xprt->sending, task, NULL, @@ -269,23 +269,22 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) - return 1; + goto out_locked; goto out_sleep; } if (req == NULL) { xprt->snd_task = task; - return 1; + goto out_locked; } if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) goto out_unlock; if (!xprt_need_congestion_window_wait(xprt)) { xprt->snd_task = task; - return 1; + goto out_locked; } out_unlock: xprt_clear_locked(xprt); out_sleep: - dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); task->tk_status = -EAGAIN; if (RPC_IS_SOFT(task)) rpc_sleep_on_timeout(&xprt->sending, task, NULL, @@ -293,6 +292,9 @@ out_sleep: else rpc_sleep_on(&xprt->sending, task, NULL); return 0; +out_locked: + trace_xprt_reserve_cong(xprt, task); + return 1; } EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); @@ -357,6 +359,7 @@ void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) xprt_clear_locked(xprt); __xprt_lock_write_next(xprt); } + trace_xprt_release_xprt(xprt, task); } EXPORT_SYMBOL_GPL(xprt_release_xprt); @@ -374,6 +377,7 @@ void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) xprt_clear_locked(xprt); __xprt_lock_write_next_cong(xprt); } + trace_xprt_release_cong(xprt, task); } EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); @@ -395,8 +399,7 @@ __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) { if (req->rq_cong) return 1; - dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", - req->rq_task->tk_pid, xprt->cong, xprt->cwnd); + trace_xprt_get_cong(xprt, req->rq_task); if (RPCXPRT_CONGESTED(xprt)) { xprt_set_congestion_window_wait(xprt); return 0; @@ -418,6 +421,7 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) req->rq_cong = 0; xprt->cong -= RPC_CWNDSCALE; xprt_test_and_clear_congestion_window_wait(xprt); + trace_xprt_put_cong(xprt, req->rq_task); __xprt_lock_write_next_cong(xprt); } diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index b458bf53ca69..9d02eae353c6 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -79,7 +79,7 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) *p = xdr_zero; if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, - &rqst->rq_snd_buf, rpcrdma_noch)) + &rqst->rq_snd_buf, rpcrdma_noch_pullup)) return -EIO; trace_xprtrdma_cb_reply(rqst); diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 30065a28628c..523722be6a16 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -36,8 +36,8 @@ * connect worker from running concurrently. * * When the underlying transport disconnects, MRs that are in flight - * are flushed and are likely unusable. Thus all flushed MRs are - * destroyed. New MRs are created on demand. + * are flushed and are likely unusable. Thus all MRs are destroyed. + * New MRs are created on demand. */ #include <linux/sunrpc/rpc_rdma.h> @@ -88,8 +88,10 @@ void frwr_release_mr(struct rpcrdma_mr *mr) kfree(mr); } -static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) +static void frwr_mr_recycle(struct rpcrdma_mr *mr) { + struct rpcrdma_xprt *r_xprt = mr->mr_xprt; + trace_xprtrdma_mr_recycle(mr); if (mr->mr_dir != DMA_NONE) { @@ -107,32 +109,6 @@ static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) frwr_release_mr(mr); } -/* MRs are dynamically allocated, so simply clean up and release the MR. - * A replacement MR will subsequently be allocated on demand. - */ -static void -frwr_mr_recycle_worker(struct work_struct *work) -{ - struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, - mr_recycle); - - frwr_mr_recycle(mr->mr_xprt, mr); -} - -/* frwr_recycle - Discard MRs - * @req: request to reset - * - * Used after a reconnect. These MRs could be in flight, we can't - * tell. Safe thing to do is release them. - */ -void frwr_recycle(struct rpcrdma_req *req) -{ - struct rpcrdma_mr *mr; - - while ((mr = rpcrdma_mr_pop(&req->rl_registered))) - frwr_mr_recycle(mr->mr_xprt, mr); -} - /* frwr_reset - Place MRs back on the free list * @req: request to reset * @@ -166,9 +142,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) struct ib_mr *frmr; int rc; - /* NB: ib_alloc_mr and device drivers typically allocate - * memory with GFP_KERNEL. - */ frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth); if (IS_ERR(frmr)) goto out_mr_err; @@ -180,7 +153,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) mr->frwr.fr_mr = frmr; mr->mr_dir = DMA_NONE; INIT_LIST_HEAD(&mr->mr_list); - INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker); init_completion(&mr->frwr.fr_linv_done); sg_init_table(sg, depth); @@ -424,7 +396,7 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req) struct ib_send_wr *post_wr; struct rpcrdma_mr *mr; - post_wr = &req->rl_sendctx->sc_wr; + post_wr = &req->rl_wr; list_for_each_entry(mr, &req->rl_registered, mr_list) { struct rpcrdma_frwr *frwr; @@ -440,9 +412,6 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req) post_wr = &frwr->fr_regwr.wr; } - /* If ib_post_send fails, the next ->send_request for - * @req will queue these MRs for recovery. - */ return ib_post_send(ia->ri_id->qp, post_wr, NULL); } @@ -468,7 +437,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr) { if (wc->status != IB_WC_SUCCESS) - rpcrdma_mr_recycle(mr); + frwr_mr_recycle(mr); else rpcrdma_mr_put(mr); } @@ -570,7 +539,6 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ bad_wr = NULL; rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); - trace_xprtrdma_post_send(req, rc); /* The final LOCAL_INV WR in the chain is supposed to * do the wake. If it was never posted, the wake will @@ -583,6 +551,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) /* Recycle MRs in the LOCAL_INV chain that did not get posted. */ + trace_xprtrdma_post_linv(req, rc); while (bad_wr) { frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); @@ -590,7 +559,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) bad_wr = bad_wr->next; list_del_init(&mr->mr_list); - rpcrdma_mr_recycle(mr); + frwr_mr_recycle(mr); } } @@ -673,18 +642,18 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ bad_wr = NULL; rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); - trace_xprtrdma_post_send(req, rc); if (!rc) return; /* Recycle MRs in the LOCAL_INV chain that did not get posted. */ + trace_xprtrdma_post_linv(req, rc); while (bad_wr) { frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); mr = container_of(frwr, struct rpcrdma_mr, frwr); bad_wr = bad_wr->next; - rpcrdma_mr_recycle(mr); + frwr_mr_recycle(mr); } /* The final LOCAL_INV WR in the chain is supposed to diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index b86b5fd62d9f..aec3beb93b25 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -78,8 +78,6 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) size += rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ - dprintk("RPC: %s: max call header size = %u\n", - __func__, size); return size; } @@ -100,8 +98,6 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ - dprintk("RPC: %s: max reply header size = %u\n", - __func__, size); return size; } @@ -363,8 +359,7 @@ static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, out_getmr_err: trace_xprtrdma_nomrs(req); xprt_wait_for_buffer_space(&r_xprt->rx_xprt); - if (r_xprt->rx_ep.rep_connected != -ENODEV) - schedule_work(&r_xprt->rx_buf.rb_refresh_worker); + rpcrdma_mrs_refresh(r_xprt); return ERR_PTR(-EAGAIN); } @@ -393,7 +388,7 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, unsigned int pos; int nsegs; - if (rtype == rpcrdma_noch) + if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) goto done; pos = rqst->rq_snd_buf.head[0].iov_len; @@ -565,6 +560,7 @@ static void rpcrdma_sendctx_done(struct kref *kref) */ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) { + struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf; struct ib_sge *sge; if (!sc->sc_unmap_count) @@ -576,7 +572,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) */ for (sge = &sc->sc_sges[2]; sc->sc_unmap_count; ++sge, --sc->sc_unmap_count) - ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length, + ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); @@ -589,149 +585,228 @@ static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, { struct rpcrdma_sendctx *sc = req->rl_sendctx; struct rpcrdma_regbuf *rb = req->rl_rdmabuf; - struct ib_sge *sge = sc->sc_sges; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) - goto out_regbuf; + return false; sge->addr = rdmab_addr(rb); sge->length = len; sge->lkey = rdmab_lkey(rb); ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); - sc->sc_wr.num_sge++; return true; - -out_regbuf: - pr_err("rpcrdma: failed to DMA map a Send buffer\n"); - return false; } -/* Prepare the Send SGEs. The head and tail iovec, and each entry - * in the page list, gets its own SGE. +/* The head iovec is straightforward, as it is usually already + * DMA-mapped. Sync the content that has changed. */ -static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, - struct xdr_buf *xdr, - enum rpcrdma_chunktype rtype) +static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, unsigned int len) { struct rpcrdma_sendctx *sc = req->rl_sendctx; - unsigned int sge_no, page_base, len, remaining; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; struct rpcrdma_regbuf *rb = req->rl_sendbuf; - struct ib_sge *sge = sc->sc_sges; - struct page *page, **ppages; - /* The head iovec is straightforward, as it is already - * DMA-mapped. Sync the content that has changed. - */ if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) - goto out_regbuf; - sc->sc_device = rdmab_device(rb); - sge_no = 1; - sge[sge_no].addr = rdmab_addr(rb); - sge[sge_no].length = xdr->head[0].iov_len; - sge[sge_no].lkey = rdmab_lkey(rb); - ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr, - sge[sge_no].length, DMA_TO_DEVICE); - - /* If there is a Read chunk, the page list is being handled - * via explicit RDMA, and thus is skipped here. However, the - * tail iovec may include an XDR pad for the page list, as - * well as additional content, and may not reside in the - * same page as the head iovec. - */ - if (rtype == rpcrdma_readch) { - len = xdr->tail[0].iov_len; + return false; - /* Do not include the tail if it is only an XDR pad */ - if (len < 4) - goto out; + sge->addr = rdmab_addr(rb); + sge->length = len; + sge->lkey = rdmab_lkey(rb); - page = virt_to_page(xdr->tail[0].iov_base); - page_base = offset_in_page(xdr->tail[0].iov_base); + ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, + DMA_TO_DEVICE); + return true; +} - /* If the content in the page list is an odd length, - * xdr_write_pages() has added a pad at the beginning - * of the tail iovec. Force the tail's non-pad content - * to land at the next XDR position in the Send message. - */ - page_base += len & 3; - len -= len & 3; - goto map_tail; - } +/* If there is a page list present, DMA map and prepare an + * SGE for each page to be sent. + */ +static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + struct rpcrdma_sendctx *sc = req->rl_sendctx; + struct rpcrdma_regbuf *rb = req->rl_sendbuf; + unsigned int page_base, len, remaining; + struct page **ppages; + struct ib_sge *sge; - /* If there is a page list present, temporarily DMA map - * and prepare an SGE for each page to be sent. - */ - if (xdr->page_len) { - ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); - page_base = offset_in_page(xdr->page_base); - remaining = xdr->page_len; - while (remaining) { - sge_no++; - if (sge_no > RPCRDMA_MAX_SEND_SGES - 2) - goto out_mapping_overflow; - - len = min_t(u32, PAGE_SIZE - page_base, remaining); - sge[sge_no].addr = - ib_dma_map_page(rdmab_device(rb), *ppages, - page_base, len, DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdmab_device(rb), - sge[sge_no].addr)) - goto out_mapping_err; - sge[sge_no].length = len; - sge[sge_no].lkey = rdmab_lkey(rb); - - sc->sc_unmap_count++; - ppages++; - remaining -= len; - page_base = 0; - } - } + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_base = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + sge = &sc->sc_sges[req->rl_wr.num_sge++]; + len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); + sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages, + page_base, len, DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) + goto out_mapping_err; - /* The tail iovec is not always constructed in the same - * page where the head iovec resides (see, for example, - * gss_wrap_req_priv). To neatly accommodate that case, - * DMA map it separately. - */ - if (xdr->tail[0].iov_len) { - page = virt_to_page(xdr->tail[0].iov_base); - page_base = offset_in_page(xdr->tail[0].iov_base); - len = xdr->tail[0].iov_len; + sge->length = len; + sge->lkey = rdmab_lkey(rb); -map_tail: - sge_no++; - sge[sge_no].addr = - ib_dma_map_page(rdmab_device(rb), page, page_base, len, - DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr)) - goto out_mapping_err; - sge[sge_no].length = len; - sge[sge_no].lkey = rdmab_lkey(rb); sc->sc_unmap_count++; + ppages++; + remaining -= len; + page_base = 0; } -out: - sc->sc_wr.num_sge += sge_no; - if (sc->sc_unmap_count) - kref_get(&req->rl_kref); return true; -out_regbuf: - pr_err("rpcrdma: failed to DMA map a Send buffer\n"); +out_mapping_err: + trace_xprtrdma_dma_maperr(sge->addr); return false; +} -out_mapping_overflow: - rpcrdma_sendctx_unmap(sc); - pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); - return false; +/* The tail iovec may include an XDR pad for the page list, + * as well as additional content, and may not reside in the + * same page as the head iovec. + */ +static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, + struct xdr_buf *xdr, + unsigned int page_base, unsigned int len) +{ + struct rpcrdma_sendctx *sc = req->rl_sendctx; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; + struct rpcrdma_regbuf *rb = req->rl_sendbuf; + struct page *page = virt_to_page(xdr->tail[0].iov_base); + + sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len, + DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) + goto out_mapping_err; + + sge->length = len; + sge->lkey = rdmab_lkey(rb); + ++sc->sc_unmap_count; + return true; out_mapping_err: - rpcrdma_sendctx_unmap(sc); - trace_xprtrdma_dma_maperr(sge[sge_no].addr); + trace_xprtrdma_dma_maperr(sge->addr); return false; } +/* Copy the tail to the end of the head buffer. + */ +static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + unsigned char *dst; + + dst = (unsigned char *)xdr->head[0].iov_base; + dst += xdr->head[0].iov_len + xdr->page_len; + memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len); + r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len; +} + +/* Copy pagelist content into the head buffer. + */ +static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + unsigned int len, page_base, remaining; + struct page **ppages; + unsigned char *src, *dst; + + dst = (unsigned char *)xdr->head[0].iov_base; + dst += xdr->head[0].iov_len; + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_base = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + src = page_address(*ppages); + src += page_base; + len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); + memcpy(dst, src, len); + r_xprt->rx_stats.pullup_copy_count += len; + + ppages++; + dst += len; + remaining -= len; + page_base = 0; + } +} + +/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. + * When the head, pagelist, and tail are small, a pull-up copy + * is considerably less costly than DMA mapping the components + * of @xdr. + * + * Assumptions: + * - the caller has already verified that the total length + * of the RPC Call body will fit into @rl_sendbuf. + */ +static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + if (unlikely(xdr->tail[0].iov_len)) + rpcrdma_pullup_tail_iov(r_xprt, req, xdr); + + if (unlikely(xdr->page_len)) + rpcrdma_pullup_pagelist(r_xprt, req, xdr); + + /* The whole RPC message resides in the head iovec now */ + return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len); +} + +static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + struct kvec *tail = &xdr->tail[0]; + + if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) + return false; + if (xdr->page_len) + if (!rpcrdma_prepare_pagelist(req, xdr)) + return false; + if (tail->iov_len) + if (!rpcrdma_prepare_tail_iov(req, xdr, + offset_in_page(tail->iov_base), + tail->iov_len)) + return false; + + if (req->rl_sendctx->sc_unmap_count) + kref_get(&req->rl_kref); + return true; +} + +static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) + return false; + + /* If there is a Read chunk, the page list is being handled + * via explicit RDMA, and thus is skipped here. + */ + + /* Do not include the tail if it is only an XDR pad */ + if (xdr->tail[0].iov_len > 3) { + unsigned int page_base, len; + + /* If the content in the page list is an odd length, + * xdr_write_pages() adds a pad at the beginning of + * the tail iovec. Force the tail's non-pad content to + * land at the next XDR position in the Send message. + */ + page_base = offset_in_page(xdr->tail[0].iov_base); + len = xdr->tail[0].iov_len; + page_base += len & 3; + len -= len & 3; + if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) + return false; + kref_get(&req->rl_kref); + } + + return true; +} + /** * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR * @r_xprt: controlling transport @@ -742,31 +817,53 @@ out_mapping_err: * * Returns 0 on success; otherwise a negative errno is returned. */ -int -rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, u32 hdrlen, - struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) +inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, u32 hdrlen, + struct xdr_buf *xdr, + enum rpcrdma_chunktype rtype) { int ret; ret = -EAGAIN; req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); if (!req->rl_sendctx) - goto err; - req->rl_sendctx->sc_wr.num_sge = 0; + goto out_nosc; req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_req = req; kref_init(&req->rl_kref); + req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe; + req->rl_wr.sg_list = req->rl_sendctx->sc_sges; + req->rl_wr.num_sge = 0; + req->rl_wr.opcode = IB_WR_SEND; ret = -EIO; if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) - goto err; - if (rtype != rpcrdma_areadch) - if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype)) - goto err; + goto out_unmap; + + switch (rtype) { + case rpcrdma_noch_pullup: + if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_noch_mapped: + if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_readch: + if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_areadch: + break; + default: + goto out_unmap; + } + return 0; -err: +out_unmap: + rpcrdma_sendctx_unmap(req->rl_sendctx); +out_nosc: trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); return ret; } @@ -796,6 +893,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct xdr_stream *xdr = &req->rl_stream; enum rpcrdma_chunktype rtype, wtype; + struct xdr_buf *buf = &rqst->rq_snd_buf; bool ddp_allowed; __be32 *p; int ret; @@ -853,8 +951,9 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) */ if (rpcrdma_args_inline(r_xprt, rqst)) { *p++ = rdma_msg; - rtype = rpcrdma_noch; - } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { + rtype = buf->len < rdmab_length(req->rl_sendbuf) ? + rpcrdma_noch_pullup : rpcrdma_noch_mapped; + } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) { *p++ = rdma_msg; rtype = rpcrdma_readch; } else { @@ -863,12 +962,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) rtype = rpcrdma_areadch; } - /* If this is a retransmit, discard previously registered - * chunks. Very likely the connection has been replaced, - * so these registrations are invalid and unusable. - */ - frwr_recycle(req); - /* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: * @@ -902,7 +995,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) goto out_err; ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, - &rqst->rq_snd_buf, rtype); + buf, rtype); if (ret) goto out_err; @@ -916,6 +1009,40 @@ out_err: return ret; } +static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt, + struct rpcrdma_buffer *buf, + u32 grant) +{ + buf->rb_credits = grant; + xprt->cwnd = grant << RPC_CWNDSHIFT; +} + +static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + + spin_lock(&xprt->transport_lock); + __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant); + spin_unlock(&xprt->transport_lock); +} + +/** + * rpcrdma_reset_cwnd - Reset the xprt's congestion window + * @r_xprt: controlling transport instance + * + * Prepare @r_xprt for the next connection by reinitializing + * its credit grant to one (see RFC 8166, Section 3.3.3). + */ +void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + + spin_lock(&xprt->transport_lock); + xprt->cong = 0; + __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1); + spin_unlock(&xprt->transport_lock); +} + /** * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs * @rqst: controlling RPC request @@ -955,7 +1082,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) curlen = rqst->rq_rcv_buf.head[0].iov_len; if (curlen > copy_len) curlen = copy_len; - trace_xprtrdma_fixup(rqst, copy_len, curlen); srcp += curlen; copy_len -= curlen; @@ -975,8 +1101,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) if (curlen > pagelist_len) curlen = pagelist_len; - trace_xprtrdma_fixup_pg(rqst, i, srcp, - copy_len, curlen); destp = kmap_atomic(ppages[i]); memcpy(destp + page_base, srcp, curlen); flush_dcache_page(ppages[i]); @@ -1008,6 +1132,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) rqst->rq_private_buf.tail[0].iov_base = srcp; } + if (fixup_copy_count) + trace_xprtrdma_fixup(rqst, fixup_copy_count); return fixup_copy_count; } @@ -1356,12 +1482,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) credits = 1; /* don't deadlock */ else if (credits > buf->rb_max_requests) credits = buf->rb_max_requests; - if (buf->rb_credits != credits) { - spin_lock(&xprt->transport_lock); - buf->rb_credits = credits; - xprt->cwnd = credits << RPC_CWNDSHIFT; - spin_unlock(&xprt->transport_lock); - } + if (buf->rb_credits != credits) + rpcrdma_update_cwnd(r_xprt, credits); + rpcrdma_post_recvs(r_xprt, false); req = rpcr_to_rdmar(rqst); if (req->rl_reply) { diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 160558b4135e..7395eb2cfdeb 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -243,16 +243,13 @@ xprt_rdma_connect_worker(struct work_struct *work) rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia); xprt_clear_connecting(xprt); if (r_xprt->rx_ep.rep_connected > 0) { - if (!xprt_test_and_set_connected(xprt)) { - xprt->stat.connect_count++; - xprt->stat.connect_time += (long)jiffies - - xprt->stat.connect_start; - xprt_wake_pending_tasks(xprt, -EAGAIN); - } - } else { - if (xprt_test_and_clear_connected(xprt)) - xprt_wake_pending_tasks(xprt, rc); + xprt->stat.connect_count++; + xprt->stat.connect_time += (long)jiffies - + xprt->stat.connect_start; + xprt_set_connected(xprt); + rc = -EAGAIN; } + xprt_wake_pending_tasks(xprt, rc); } /** @@ -425,12 +422,6 @@ void xprt_rdma_close(struct rpc_xprt *xprt) return; rpcrdma_ep_disconnect(ep, ia); - /* Prepare @xprt for the next connection by reinitializing - * its credit grant to one (see RFC 8166, Section 3.3.3). - */ - r_xprt->rx_buf.rb_credits = 1; - xprt->cwnd = RPC_CWNDSHIFT; - out: xprt->reestablish_timeout = 0; ++xprt->connect_cookie; @@ -450,12 +441,6 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) struct sockaddr *sap = (struct sockaddr *)&xprt->addr; char buf[8]; - dprintk("RPC: %s: setting port for xprt %p (%s:%s) to %u\n", - __func__, xprt, - xprt->address_strings[RPC_DISPLAY_ADDR], - xprt->address_strings[RPC_DISPLAY_PORT], - port); - rpc_set_port(sap, port); kfree(xprt->address_strings[RPC_DISPLAY_PORT]); @@ -465,6 +450,9 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); snprintf(buf, sizeof(buf), "%4hx", port); xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); + + trace_xprtrdma_op_setport(container_of(xprt, struct rpcrdma_xprt, + rx_xprt)); } /** @@ -536,13 +524,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); unsigned long delay; - trace_xprtrdma_op_connect(r_xprt); - delay = 0; if (r_xprt->rx_ep.rep_connected != 0) { delay = xprt_reconnect_delay(xprt); xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO); } + trace_xprtrdma_op_connect(r_xprt, delay); queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker, delay); } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 3a907537e2cf..77c7dd7f05e8 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -74,17 +74,17 @@ /* * internal functions */ -static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); +static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_sendctx *sc); +static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt); static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf); static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); -static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); -static void rpcrdma_mr_free(struct rpcrdma_mr *mr); +static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt); static struct rpcrdma_regbuf * rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction, gfp_t flags); static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb); static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb); -static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); /* Wait for outstanding transport work to finish. ib_drain_qp * handles the drains in the wrong order for us, so open code @@ -125,7 +125,7 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context) /** * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC - * @cq: completion queue (ignored) + * @cq: completion queue * @wc: completed WR * */ @@ -138,7 +138,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_send(sc, wc); - rpcrdma_sendctx_put_locked(sc); + rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc); } /** @@ -170,7 +170,6 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) rdmab_addr(rep->rr_rdmabuf), wc->byte_len, DMA_FROM_DEVICE); - rpcrdma_post_recvs(r_xprt, false); rpcrdma_reply_handler(rep); return; @@ -178,11 +177,11 @@ out_flushed: rpcrdma_recv_buffer_put(rep); } -static void -rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, - struct rdma_conn_param *param) +static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt, + struct rdma_conn_param *param) { const struct rpcrdma_connect_private *pmsg = param->private_data; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; unsigned int rsize, wsize; /* Default settings for RPC-over-RDMA Version One */ @@ -198,13 +197,11 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); } - if (rsize < r_xprt->rx_ep.rep_inline_recv) - r_xprt->rx_ep.rep_inline_recv = rsize; - if (wsize < r_xprt->rx_ep.rep_inline_send) - r_xprt->rx_ep.rep_inline_send = wsize; - dprintk("RPC: %s: max send %u, max recv %u\n", __func__, - r_xprt->rx_ep.rep_inline_send, - r_xprt->rx_ep.rep_inline_recv); + if (rsize < ep->rep_inline_recv) + ep->rep_inline_recv = rsize; + if (wsize < ep->rep_inline_send) + ep->rep_inline_send = wsize; + rpcrdma_set_max_header_sizes(r_xprt); } @@ -258,7 +255,8 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) case RDMA_CM_EVENT_ESTABLISHED: ++xprt->connect_cookie; ep->rep_connected = 1; - rpcrdma_update_connect_private(r_xprt, &event->param.conn); + rpcrdma_update_cm_private(r_xprt, &event->param.conn); + trace_xprtrdma_inline_thresh(r_xprt); wake_up_all(&ep->rep_connect_wait); break; case RDMA_CM_EVENT_CONNECT_ERROR: @@ -298,8 +296,6 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) struct rdma_cm_id *id; int rc; - trace_xprtrdma_conn_start(xprt); - init_completion(&ia->ri_done); init_completion(&ia->ri_remove_done); @@ -315,10 +311,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) if (rc) goto out; rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); - if (rc < 0) { - trace_xprtrdma_conn_tout(xprt); + if (rc < 0) goto out; - } rc = ia->ri_async_rc; if (rc) @@ -329,10 +323,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) if (rc) goto out; rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); - if (rc < 0) { - trace_xprtrdma_conn_tout(xprt); + if (rc < 0) goto out; - } rc = ia->ri_async_rc; if (rc) goto out; @@ -409,8 +401,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_req *req; - cancel_work_sync(&buf->rb_refresh_worker); - /* This is similar to rpcrdma_ep_destroy, but: * - Don't cancel the connect worker. * - Don't call rpcrdma_ep_disconnect, which waits @@ -437,7 +427,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); rpcrdma_regbuf_dma_unmap(req->rl_recvbuf); } - rpcrdma_mrs_destroy(buf); + rpcrdma_mrs_destroy(r_xprt); ib_dealloc_pd(ia->ri_pd); ia->ri_pd = NULL; @@ -522,7 +512,7 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) init_waitqueue_head(&ep->rep_connect_wait); ep->rep_receive_count = 0; - sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL, + sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt, ep->rep_attr.cap.max_send_wr + 1, IB_POLL_WORKQUEUE); if (IS_ERR(sendcq)) { @@ -630,8 +620,6 @@ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, pr_err("rpcrdma: rdma_create_qp returned %d\n", err); goto out3; } - - rpcrdma_mrs_create(r_xprt); return 0; out3: @@ -649,8 +637,6 @@ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rdma_cm_id *id, *old; int err, rc; - trace_xprtrdma_reconnect(r_xprt); - rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia); rc = -EHOSTUNREACH; @@ -705,7 +691,6 @@ retry: memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr)); switch (ep->rep_connected) { case 0: - dprintk("RPC: %s: connecting...\n", __func__); rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr); if (rc) { rc = -ENETUNREACH; @@ -726,6 +711,7 @@ retry: ep->rep_connected = 0; xprt_clear_connected(xprt); + rpcrdma_reset_cwnd(r_xprt); rpcrdma_post_recvs(r_xprt, true); rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); @@ -742,13 +728,14 @@ retry: goto out; } - dprintk("RPC: %s: connected\n", __func__); + rpcrdma_mrs_create(r_xprt); out: if (rc) ep->rep_connected = rc; out_noupdate: + trace_xprtrdma_connect(r_xprt, rc); return rc; } @@ -757,11 +744,8 @@ out_noupdate: * @ep: endpoint to disconnect * @ia: associated interface adapter * - * This is separate from destroy to facilitate the ability - * to reconnect without recreating the endpoint. - * - * This call is not reentrant, and must not be made in parallel - * on the same endpoint. + * Caller serializes. Either the transport send lock is held, + * or we're being called to destroy the transport. */ void rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) @@ -780,6 +764,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) trace_xprtrdma_disconnect(r_xprt, rc); rpcrdma_xprt_drain(r_xprt); + rpcrdma_reqs_reset(r_xprt); + rpcrdma_mrs_destroy(r_xprt); } /* Fixed-size circular FIFO queue. This implementation is wait-free and @@ -817,9 +803,6 @@ static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) if (!sc) return NULL; - sc->sc_wr.wr_cqe = &sc->sc_cqe; - sc->sc_wr.sg_list = sc->sc_sges; - sc->sc_wr.opcode = IB_WR_SEND; sc->sc_cqe.done = rpcrdma_wc_send; return sc; } @@ -847,7 +830,6 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) if (!sc) return -ENOMEM; - sc->sc_xprt = r_xprt; buf->rb_sc_ctxs[i] = sc; } @@ -910,6 +892,7 @@ out_emptyq: /** * rpcrdma_sendctx_put_locked - Release a send context + * @r_xprt: controlling transport instance * @sc: send context to release * * Usage: Called from Send completion to return a sendctxt @@ -917,10 +900,10 @@ out_emptyq: * * The caller serializes calls to this function (per transport). */ -static void -rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) +static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_sendctx *sc) { - struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; unsigned long next_tail; /* Unmap SGEs of previously completed but unsignaled @@ -938,7 +921,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) /* Paired with READ_ONCE */ smp_store_release(&buf->rb_sc_tail, next_tail); - xprt_write_space(&sc->sc_xprt->rx_xprt); + xprt_write_space(&r_xprt->rx_xprt); } static void @@ -965,7 +948,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) mr->mr_xprt = r_xprt; spin_lock(&buf->rb_lock); - list_add(&mr->mr_list, &buf->rb_mrs); + rpcrdma_mr_push(mr, &buf->rb_mrs); list_add(&mr->mr_all, &buf->rb_all_mrs); spin_unlock(&buf->rb_lock); } @@ -987,6 +970,28 @@ rpcrdma_mr_refresh_worker(struct work_struct *work) } /** + * rpcrdma_mrs_refresh - Wake the MR refresh worker + * @r_xprt: controlling transport instance + * + */ +void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; + + /* If there is no underlying device, it's no use to + * wake the refresh worker. + */ + if (ep->rep_connected != -ENODEV) { + /* The work is scheduled on a WQ_MEM_RECLAIM + * workqueue in order to prevent MR allocation + * from recursing into NFS during direct reclaim. + */ + queue_work(xprtiod_workqueue, &buf->rb_refresh_worker); + } +} + +/** * rpcrdma_req_create - Allocate an rpcrdma_req object * @r_xprt: controlling r_xprt * @size: initial size, in bytes, of send and receive buffers @@ -1042,6 +1047,26 @@ out1: return NULL; } +/** + * rpcrdma_reqs_reset - Reset all reqs owned by a transport + * @r_xprt: controlling transport instance + * + * ASSUMPTION: the rb_allreqs list is stable for the duration, + * and thus can be walked without holding rb_lock. Eg. the + * caller is holding the transport send lock to exclude + * device removal or disconnection. + */ +static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_req *req; + + list_for_each_entry(req, &buf->rb_allreqs, rl_all) { + /* Credits are valid only for one connection */ + req->rl_slot.rq_cong = 0; + } +} + static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) { @@ -1125,8 +1150,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) INIT_LIST_HEAD(&buf->rb_all_mrs); INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker); - rpcrdma_mrs_create(r_xprt); - INIT_LIST_HEAD(&buf->rb_send_bufs); INIT_LIST_HEAD(&buf->rb_allreqs); @@ -1134,14 +1157,13 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; - req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE, + req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2, GFP_KERNEL); if (!req) goto out; list_add(&req->rl_list, &buf->rb_send_bufs); } - buf->rb_credits = 1; init_llist_head(&buf->rb_free_reps); rc = rpcrdma_sendctxs_create(r_xprt); @@ -1158,15 +1180,24 @@ out: * rpcrdma_req_destroy - Destroy an rpcrdma_req object * @req: unused object to be destroyed * - * This function assumes that the caller prevents concurrent device - * unload and transport tear-down. + * Relies on caller holding the transport send lock to protect + * removing req->rl_all from buf->rb_all_reqs safely. */ void rpcrdma_req_destroy(struct rpcrdma_req *req) { + struct rpcrdma_mr *mr; + list_del(&req->rl_all); - while (!list_empty(&req->rl_free_mrs)) - rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs)); + while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) { + struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf; + + spin_lock(&buf->rb_lock); + list_del(&mr->mr_all); + spin_unlock(&buf->rb_lock); + + frwr_release_mr(mr); + } rpcrdma_regbuf_free(req->rl_recvbuf); rpcrdma_regbuf_free(req->rl_sendbuf); @@ -1174,28 +1205,33 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req) kfree(req); } -static void -rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) +/** + * rpcrdma_mrs_destroy - Release all of a transport's MRs + * @r_xprt: controlling transport instance + * + * Relies on caller holding the transport send lock to protect + * removing mr->mr_list from req->rl_free_mrs safely. + */ +static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt) { - struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, - rx_buf); + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_mr *mr; - unsigned int count; - count = 0; + cancel_work_sync(&buf->rb_refresh_worker); + spin_lock(&buf->rb_lock); while ((mr = list_first_entry_or_null(&buf->rb_all_mrs, struct rpcrdma_mr, mr_all)) != NULL) { + list_del(&mr->mr_list); list_del(&mr->mr_all); spin_unlock(&buf->rb_lock); frwr_release_mr(mr); - count++; + spin_lock(&buf->rb_lock); } spin_unlock(&buf->rb_lock); - r_xprt->rx_stats.mrs_allocated = 0; } /** @@ -1209,8 +1245,6 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { - cancel_work_sync(&buf->rb_refresh_worker); - rpcrdma_sendctxs_destroy(buf); rpcrdma_reps_destroy(buf); @@ -1222,8 +1256,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) list_del(&req->rl_list); rpcrdma_req_destroy(req); } - - rpcrdma_mrs_destroy(buf); } /** @@ -1264,17 +1296,6 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr) rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs); } -static void rpcrdma_mr_free(struct rpcrdma_mr *mr) -{ - struct rpcrdma_xprt *r_xprt = mr->mr_xprt; - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - - mr->mr_req = NULL; - spin_lock(&buf->rb_lock); - rpcrdma_mr_push(mr, &buf->rb_mrs); - spin_unlock(&buf->rb_lock); -} - /** * rpcrdma_buffer_get - Get a request buffer * @buffers: Buffer pool from which to obtain a buffer @@ -1437,7 +1458,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_req *req) { - struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; + struct ib_send_wr *send_wr = &req->rl_wr; int rc; if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) { @@ -1455,8 +1476,13 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, return 0; } -static void -rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) +/** + * rpcrdma_post_recvs - Refill the Receive Queue + * @r_xprt: controlling transport instance + * @temp: mark Receive buffers to be deleted after use + * + */ +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ep *ep = &r_xprt->rx_ep; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 65e6b0eb862e..5d15140a0266 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -218,12 +218,8 @@ enum { /* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes */ struct rpcrdma_req; -struct rpcrdma_xprt; struct rpcrdma_sendctx { - struct ib_send_wr sc_wr; struct ib_cqe sc_cqe; - struct ib_device *sc_device; - struct rpcrdma_xprt *sc_xprt; struct rpcrdma_req *sc_req; unsigned int sc_unmap_count; struct ib_sge sc_sges[]; @@ -257,7 +253,6 @@ struct rpcrdma_mr { u32 mr_handle; u32 mr_length; u64 mr_offset; - struct work_struct mr_recycle; struct list_head mr_all; }; @@ -318,6 +313,7 @@ struct rpcrdma_req { struct rpcrdma_rep *rl_reply; struct xdr_stream rl_stream; struct xdr_buf rl_hdrbuf; + struct ib_send_wr rl_wr; struct rpcrdma_sendctx *rl_sendctx; struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */ struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */ @@ -474,6 +470,7 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *); int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_req *); +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); /* * Buffer calls - xprtrdma/verbs.c @@ -487,12 +484,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt); struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt); void rpcrdma_mr_put(struct rpcrdma_mr *mr); - -static inline void -rpcrdma_mr_recycle(struct rpcrdma_mr *mr) -{ - schedule_work(&mr->mr_recycle); -} +void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, @@ -542,7 +534,6 @@ rpcrdma_data_dir(bool writing) /* Memory registration calls xprtrdma/frwr_ops.c */ bool frwr_is_supported(struct ib_device *device); -void frwr_recycle(struct rpcrdma_req *req); void frwr_reset(struct rpcrdma_req *req); int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep); int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr); @@ -563,6 +554,8 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); enum rpcrdma_chunktype { rpcrdma_noch = 0, + rpcrdma_noch_pullup, + rpcrdma_noch_mapped, rpcrdma_readch, rpcrdma_areadch, rpcrdma_writech, @@ -576,6 +569,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc); int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); +void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 70e52f567b2a..98e2d40b2d6a 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1752,7 +1752,7 @@ static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) { - if (transport->srcport == 0) + if (transport->srcport == 0 && transport->xprt.reuseport) transport->srcport = xs_sock_getport(sock); } |