diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/svc.c | 6 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 28 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_rw.c | 116 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 40 |
4 files changed, 73 insertions, 117 deletions
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 85ce0db5b0a6..aa04666f929d 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -421,7 +421,7 @@ __svc_init_bc(struct svc_serv *serv) */ static struct svc_serv * __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, - struct svc_serv_ops *ops) + const struct svc_serv_ops *ops) { struct svc_serv *serv; unsigned int vers; @@ -486,7 +486,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, struct svc_serv * svc_create(struct svc_program *prog, unsigned int bufsize, - struct svc_serv_ops *ops) + const struct svc_serv_ops *ops) { return __svc_create(prog, bufsize, /*npools*/1, ops); } @@ -494,7 +494,7 @@ EXPORT_SYMBOL_GPL(svc_create); struct svc_serv * svc_create_pooled(struct svc_program *prog, unsigned int bufsize, - struct svc_serv_ops *ops) + const struct svc_serv_ops *ops) { struct svc_serv *serv; unsigned int npools = svc_pool_map_get(); diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 272063ca81e8..ff8e06cd067e 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -421,6 +421,9 @@ static void svc_data_ready(struct sock *sk) dprintk("svc: socket %p(inet %p), busy=%d\n", svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)); + + /* Refer to svc_setup_socket() for details. */ + rmb(); svsk->sk_odata(sk); if (!test_and_set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags)) svc_xprt_enqueue(&svsk->sk_xprt); @@ -437,6 +440,9 @@ static void svc_write_space(struct sock *sk) if (svsk) { dprintk("svc: socket %p(inet %p), write_space busy=%d\n", svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)); + + /* Refer to svc_setup_socket() for details. */ + rmb(); svsk->sk_owspace(sk); svc_xprt_enqueue(&svsk->sk_xprt); } @@ -687,7 +693,7 @@ static struct svc_xprt *svc_udp_create(struct svc_serv *serv, return svc_create_socket(serv, IPPROTO_UDP, net, sa, salen, flags); } -static struct svc_xprt_ops svc_udp_ops = { +static const struct svc_xprt_ops svc_udp_ops = { .xpo_create = svc_udp_create, .xpo_recvfrom = svc_udp_recvfrom, .xpo_sendto = svc_udp_sendto, @@ -760,8 +766,12 @@ static void svc_tcp_listen_data_ready(struct sock *sk) dprintk("svc: socket %p TCP (listen) state change %d\n", sk, sk->sk_state); - if (svsk) + if (svsk) { + /* Refer to svc_setup_socket() for details. */ + rmb(); svsk->sk_odata(sk); + } + /* * This callback may called twice when a new connection * is established as a child socket inherits everything @@ -794,6 +804,8 @@ static void svc_tcp_state_change(struct sock *sk) if (!svsk) printk("svc: socket %p: no user data\n", sk); else { + /* Refer to svc_setup_socket() for details. */ + rmb(); svsk->sk_ostate(sk); if (sk->sk_state != TCP_ESTABLISHED) { set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); @@ -1229,7 +1241,7 @@ static void svc_bc_tcp_sock_detach(struct svc_xprt *xprt) { } -static struct svc_xprt_ops svc_tcp_bc_ops = { +static const struct svc_xprt_ops svc_tcp_bc_ops = { .xpo_create = svc_bc_tcp_create, .xpo_detach = svc_bc_tcp_sock_detach, .xpo_free = svc_bc_sock_free, @@ -1263,7 +1275,7 @@ static void svc_cleanup_bc_xprt_sock(void) } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -static struct svc_xprt_ops svc_tcp_ops = { +static const struct svc_xprt_ops svc_tcp_ops = { .xpo_create = svc_tcp_create, .xpo_recvfrom = svc_tcp_recvfrom, .xpo_sendto = svc_tcp_sendto, @@ -1381,12 +1393,18 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv, return ERR_PTR(err); } - inet->sk_user_data = svsk; svsk->sk_sock = sock; svsk->sk_sk = inet; svsk->sk_ostate = inet->sk_state_change; svsk->sk_odata = inet->sk_data_ready; svsk->sk_owspace = inet->sk_write_space; + /* + * This barrier is necessary in order to prevent race condition + * with svc_data_ready(), svc_listen_data_ready() and others + * when calling callbacks above. + */ + wmb(); + inet->sk_user_data = svsk; /* Initialize the socket */ if (sock->type == SOCK_DGRAM) diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index 933f79bed270..7dcda4597057 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -660,19 +660,21 @@ out_initerr: return -EIO; } +/* Walk the segments in the Read chunk starting at @p and construct + * RDMA Read operations to pull the chunk to the server. + */ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, struct svc_rdma_read_info *info, __be32 *p) { int ret; + ret = -EINVAL; info->ri_chunklen = 0; - while (*p++ != xdr_zero) { + while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) { u32 rs_handle, rs_length; u64 rs_offset; - if (be32_to_cpup(p++) != info->ri_position) - break; rs_handle = be32_to_cpup(p++); rs_length = be32_to_cpup(p++); p = xdr_decode_hyper(p, &rs_offset); @@ -689,78 +691,6 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, return ret; } -/* If there is inline content following the Read chunk, append it to - * the page list immediately following the data payload. This has to - * be done after the reader function has determined how many pages - * were consumed for RDMA Read. - * - * On entry, ri_pageno and ri_pageoff point directly to the end of the - * page list. On exit, both have been updated to the new "next byte". - * - * Assumptions: - * - Inline content fits entirely in rq_pages[0] - * - Trailing content is only a handful of bytes - */ -static int svc_rdma_copy_tail(struct svc_rqst *rqstp, - struct svc_rdma_read_info *info) -{ - struct svc_rdma_op_ctxt *head = info->ri_readctxt; - unsigned int tail_length, remaining; - u8 *srcp, *destp; - - /* Assert that all inline content fits in page 0. This is an - * implementation limit, not a protocol limit. - */ - if (head->arg.head[0].iov_len > PAGE_SIZE) { - pr_warn_once("svcrdma: too much trailing inline content\n"); - return -EINVAL; - } - - srcp = head->arg.head[0].iov_base; - srcp += info->ri_position; - tail_length = head->arg.head[0].iov_len - info->ri_position; - remaining = tail_length; - - /* If there is room on the last page in the page list, try to - * fit the trailing content there. - */ - if (info->ri_pageoff > 0) { - unsigned int len; - - len = min_t(unsigned int, remaining, - PAGE_SIZE - info->ri_pageoff); - destp = page_address(rqstp->rq_pages[info->ri_pageno]); - destp += info->ri_pageoff; - - memcpy(destp, srcp, len); - srcp += len; - destp += len; - info->ri_pageoff += len; - remaining -= len; - - if (info->ri_pageoff == PAGE_SIZE) { - info->ri_pageno++; - info->ri_pageoff = 0; - } - } - - /* Otherwise, a fresh page is needed. */ - if (remaining) { - head->arg.pages[info->ri_pageno] = - rqstp->rq_pages[info->ri_pageno]; - head->count++; - - destp = page_address(rqstp->rq_pages[info->ri_pageno]); - memcpy(destp, srcp, remaining); - info->ri_pageoff += remaining; - } - - head->arg.page_len += tail_length; - head->arg.len += tail_length; - head->arg.buflen += tail_length; - return 0; -} - /* Construct RDMA Reads to pull over a normal Read chunk. The chunk * data lands in the page list of head->arg.pages. * @@ -785,34 +715,28 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, if (ret < 0) goto out; - /* Read chunk may need XDR round-up (see RFC 5666, s. 3.7). + /* Split the Receive buffer between the head and tail + * buffers at Read chunk's position. XDR roundup of the + * chunk is not included in either the pagelist or in + * the tail. */ - if (info->ri_chunklen & 3) { - u32 padlen = 4 - (info->ri_chunklen & 3); - - info->ri_chunklen += padlen; + head->arg.tail[0].iov_base = + head->arg.head[0].iov_base + info->ri_position; + head->arg.tail[0].iov_len = + head->arg.head[0].iov_len - info->ri_position; + head->arg.head[0].iov_len = info->ri_position; - /* NB: data payload always starts on XDR alignment, - * thus the pad can never contain a page boundary. - */ - info->ri_pageoff += padlen; - if (info->ri_pageoff == PAGE_SIZE) { - info->ri_pageno++; - info->ri_pageoff = 0; - } - } + /* Read chunk may need XDR roundup (see RFC 5666, s. 3.7). + * + * NFSv2/3 write decoders need the length of the tail to + * contain the size of the roundup padding. + */ + head->arg.tail[0].iov_len += 4 - (info->ri_chunklen & 3); head->arg.page_len = info->ri_chunklen; head->arg.len += info->ri_chunklen; head->arg.buflen += info->ri_chunklen; - if (info->ri_position < head->arg.head[0].iov_len) { - ret = svc_rdma_copy_tail(rqstp, info); - if (ret < 0) - goto out; - } - head->arg.head[0].iov_len = info->ri_position; - out: return ret; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index e660d4965b18..5caf8e722a11 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -51,6 +51,7 @@ #include <linux/workqueue.h> #include <rdma/ib_verbs.h> #include <rdma/rdma_cm.h> +#include <rdma/rw.h> #include <linux/sunrpc/svc_rdma.h> #include <linux/export.h> #include "xprt_rdma.h" @@ -70,7 +71,7 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt); static int svc_rdma_secure_port(struct svc_rqst *); static void svc_rdma_kill_temp_xprt(struct svc_xprt *); -static struct svc_xprt_ops svc_rdma_ops = { +static const struct svc_xprt_ops svc_rdma_ops = { .xpo_create = svc_rdma_create, .xpo_recvfrom = svc_rdma_recvfrom, .xpo_sendto = svc_rdma_sendto, @@ -98,7 +99,7 @@ static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, static void svc_rdma_bc_detach(struct svc_xprt *); static void svc_rdma_bc_free(struct svc_xprt *); -static struct svc_xprt_ops svc_rdma_bc_ops = { +static const struct svc_xprt_ops svc_rdma_bc_ops = { .xpo_create = svc_rdma_bc_create, .xpo_detach = svc_rdma_bc_detach, .xpo_free = svc_rdma_bc_free, @@ -167,8 +168,8 @@ static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) { unsigned int i; - /* Each RPC/RDMA credit can consume a number of send - * and receive WQEs. One ctxt is allocated for each. + /* Each RPC/RDMA credit can consume one Receive and + * one Send WQE at the same time. */ i = xprt->sc_sq_depth + xprt->sc_rq_depth; @@ -713,7 +714,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) struct ib_qp_init_attr qp_attr; struct ib_device *dev; struct sockaddr *sap; - unsigned int i; + unsigned int i, ctxts; int ret = 0; listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); @@ -742,14 +743,26 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, (size_t)RPCSVC_MAXPAGES); newxprt->sc_max_req_size = svcrdma_max_req_size; - newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr, - svcrdma_max_requests); - newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); - newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr, - svcrdma_max_bc_requests); + newxprt->sc_max_requests = svcrdma_max_requests; + newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; newxprt->sc_rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests; - newxprt->sc_sq_depth = newxprt->sc_rq_depth; + if (newxprt->sc_rq_depth > dev->attrs.max_qp_wr) { + pr_warn("svcrdma: reducing receive depth to %d\n", + dev->attrs.max_qp_wr); + newxprt->sc_rq_depth = dev->attrs.max_qp_wr; + newxprt->sc_max_requests = newxprt->sc_rq_depth - 2; + newxprt->sc_max_bc_requests = 2; + } + newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); + ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES); + ctxts *= newxprt->sc_max_requests; + newxprt->sc_sq_depth = newxprt->sc_rq_depth + ctxts; + if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) { + pr_warn("svcrdma: reducing send depth to %d\n", + dev->attrs.max_qp_wr); + newxprt->sc_sq_depth = dev->attrs.max_qp_wr; + } atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); if (!svc_rdma_prealloc_ctxts(newxprt)) @@ -784,8 +797,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) qp_attr.event_handler = qp_event_handler; qp_attr.qp_context = &newxprt->sc_xprt; qp_attr.port_num = newxprt->sc_port_num; - qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests; - qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; + qp_attr.cap.max_rdma_ctxs = ctxts; + qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts; qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; qp_attr.cap.max_send_sge = newxprt->sc_max_sge; qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; @@ -853,6 +866,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); dprintk(" max_sge : %d\n", newxprt->sc_max_sge); dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); + dprintk(" rdma_rw_ctxs : %d\n", ctxts); dprintk(" max_requests : %d\n", newxprt->sc_max_requests); dprintk(" ord : %d\n", newxprt->sc_ord); |