From 14c04493cb77bc38404dbcb39d5ccbb667831ad7 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:17 +0200 Subject: tipc: add ability to order and receive topology events in driver As preparation for introducing communication groups, we add the ability to issue topology subscriptions and receive topology events from kernel space. This will make it possible for group member sockets to keep track of other group members. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/core.h | 5 +++ net/tipc/msg.h | 1 + net/tipc/server.c | 121 ++++++++++++++++++++++++++++++++++++++++++------------ net/tipc/server.h | 5 ++- net/tipc/socket.c | 32 +++++++++------ 5 files changed, 124 insertions(+), 40 deletions(-) diff --git a/net/tipc/core.h b/net/tipc/core.h index 5cc5398be722..964342689f2c 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -132,6 +132,11 @@ static inline struct list_head *tipc_nodes(struct net *net) return &tipc_net(net)->node_list; } +static inline struct tipc_server *tipc_topsrv(struct net *net) +{ + return tipc_net(net)->topsrv; +} + static inline unsigned int tipc_hashfn(u32 addr) { return addr & (NODE_HTABLE_SIZE - 1); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index c843fd2bc48d..d058b1c464e9 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -78,6 +78,7 @@ struct plist; #define MSG_FRAGMENTER 12 #define LINK_CONFIG 13 #define SOCK_WAKEUP 14 /* pseudo user */ +#define TOP_SRV 15 /* pseudo user */ /* * Message header sizes diff --git a/net/tipc/server.c b/net/tipc/server.c index 3cd6402e812c..713077536d0c 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -36,6 +36,8 @@ #include "server.h" #include "core.h" #include "socket.h" +#include "addr.h" +#include "msg.h" #include #include @@ -105,13 +107,11 @@ static void tipc_conn_kref_release(struct kref *kref) kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); sock_release(sock); con->sock = NULL; - - spin_lock_bh(&s->idr_lock); - idr_remove(&s->conn_idr, con->conid); - s->idr_in_use--; - spin_unlock_bh(&s->idr_lock); } - + spin_lock_bh(&s->idr_lock); + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + spin_unlock_bh(&s->idr_lock); tipc_clean_outqueues(con); kfree(con); } @@ -197,7 +197,8 @@ static void tipc_close_conn(struct tipc_conn *con) struct tipc_server *s = con->server; if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { - tipc_unregister_callbacks(con); + if (con->sock) + tipc_unregister_callbacks(con); if (con->conid) s->tipc_conn_release(con->conid, con->usr_data); @@ -207,8 +208,8 @@ static void tipc_close_conn(struct tipc_conn *con) * are harmless for us here as we have already deleted this * connection from server connection list. */ - kernel_sock_shutdown(con->sock, SHUT_RDWR); - + if (con->sock) + kernel_sock_shutdown(con->sock, SHUT_RDWR); conn_put(con); } } @@ -487,38 +488,104 @@ void tipc_conn_terminate(struct tipc_server *s, int conid) } } +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, + u32 lower, u32 upper, int *conid) +{ + struct tipc_subscriber *scbr; + struct tipc_subscr sub; + struct tipc_server *s; + struct tipc_conn *con; + + sub.seq.type = type; + sub.seq.lower = lower; + sub.seq.upper = upper; + sub.timeout = TIPC_WAIT_FOREVER; + sub.filter = TIPC_SUB_PORTS; + *(u32 *)&sub.usr_handle = port; + + con = tipc_alloc_conn(tipc_topsrv(net)); + if (!con) + return false; + + *conid = con->conid; + s = con->server; + scbr = s->tipc_conn_new(*conid); + if (!scbr) { + tipc_close_conn(con); + return false; + } + + con->usr_data = scbr; + con->sock = NULL; + s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub)); + return true; +} + +void tipc_topsrv_kern_unsubscr(struct net *net, int conid) +{ + struct tipc_conn *con; + + con = tipc_conn_lookup(tipc_topsrv(net), conid); + if (!con) + return; + tipc_close_conn(con); + conn_put(con); +} + +static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) +{ + u32 port = *(u32 *)&evt->s.usr_handle; + u32 self = tipc_own_addr(net); + struct sk_buff_head evtq; + struct sk_buff *skb; + + skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), + self, self, port, port, 0); + if (!skb) + return; + msg_set_dest_droppable(buf_msg(skb), true); + memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); + skb_queue_head_init(&evtq); + __skb_queue_tail(&evtq, skb); + tipc_sk_rcv(net, &evtq); +} + static void tipc_send_to_sock(struct tipc_conn *con) { - int count = 0; struct tipc_server *s = con->server; struct outqueue_entry *e; + struct tipc_event *evt; struct msghdr msg; + int count = 0; int ret; spin_lock_bh(&con->outqueue_lock); while (test_bit(CF_CONNECTED, &con->flags)) { - e = list_entry(con->outqueue.next, struct outqueue_entry, - list); + e = list_entry(con->outqueue.next, struct outqueue_entry, list); if ((struct list_head *) e == &con->outqueue) break; - spin_unlock_bh(&con->outqueue_lock); - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT; + spin_unlock_bh(&con->outqueue_lock); - if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { - msg.msg_name = &e->dest; - msg.msg_namelen = sizeof(struct sockaddr_tipc); - } - ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, - e->iov.iov_len); - if (ret == -EWOULDBLOCK || ret == 0) { - cond_resched(); - goto out; - } else if (ret < 0) { - goto send_err; + if (con->sock) { + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { + msg.msg_name = &e->dest; + msg.msg_namelen = sizeof(struct sockaddr_tipc); + } + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, + e->iov.iov_len); + if (ret == -EWOULDBLOCK || ret == 0) { + cond_resched(); + goto out; + } else if (ret < 0) { + goto send_err; + } + } else { + evt = e->iov.iov_base; + tipc_send_kern_top_evt(s->net, evt); } - /* Don't starve users filling buffers */ if (++count >= MAX_SEND_MSG_COUNT) { cond_resched(); diff --git a/net/tipc/server.h b/net/tipc/server.h index 34f8055afa3b..2113c9192633 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -83,13 +83,16 @@ struct tipc_server { int tipc_conn_sendmsg(struct tipc_server *s, int conid, struct sockaddr_tipc *addr, void *data, size_t len); +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, + u32 lower, u32 upper, int *conid); +void tipc_topsrv_kern_unsubscr(struct net *net, int conid); + /** * tipc_conn_terminate - terminate connection with server * * Note: Must call it in process context since it might sleep */ void tipc_conn_terminate(struct tipc_server *s, int conid); - int tipc_server_start(struct tipc_server *s); void tipc_server_stop(struct tipc_server *s); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d50edd6e0019..9a7e7b5cf23f 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -896,6 +896,10 @@ exit: kfree_skb(skb); } +static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt) +{ +} + /** * tipc_sendmsg - send message in connectionless manner * @sock: socket structure @@ -1671,20 +1675,24 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, struct tipc_msg *hdr = buf_msg(skb); unsigned int limit = rcvbuf_limit(sk, skb); int err = TIPC_OK; - int usr = msg_user(hdr); - u32 onode; - - if (unlikely(msg_user(hdr) == CONN_MANAGER)) { - tipc_sk_proto_rcv(tsk, skb, xmitq); - return false; - } - if (unlikely(usr == SOCK_WAKEUP)) { - onode = msg_orignode(hdr); + if (unlikely(!msg_isdata(hdr))) { + switch (msg_user(hdr)) { + case CONN_MANAGER: + tipc_sk_proto_rcv(tsk, skb, xmitq); + return false; + case SOCK_WAKEUP: + u32_del(&tsk->cong_links, msg_orignode(hdr)); + tsk->cong_link_cnt--; + sk->sk_write_space(sk); + break; + case TOP_SRV: + tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); + break; + default: + break; + } kfree_skb(skb); - u32_del(&tsk->cong_links, onode); - tsk->cong_link_cnt--; - sk->sk_write_space(sk); return false; } -- cgit v1.2.3 From 23998835be98a6842e5698fa1824f404c7de850d Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:18 +0200 Subject: tipc: improve address sanity check in tipc_connect() The address given to tipc_connect() is not completely sanity checked, under the assumption that this will be done later in the function __tipc_sendmsg() when the address is used there. However, the latter functon will in the next commits serve as caller to several other send functions, so we want to move the corresponding sanity check there to the beginning of that function, before we possibly need to grab the address stored by tipc_connect(). We must therefore be able to trust that this address already has been thoroughly checked. We do this in this commit. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/socket.c | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 9a7e7b5cf23f..7659e792ecdb 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1911,28 +1911,27 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest, int previous; int res = 0; + if (destlen != sizeof(struct sockaddr_tipc)) + return -EINVAL; + lock_sock(sk); - /* DGRAM/RDM connect(), just save the destaddr */ - if (tipc_sk_type_connectionless(sk)) { - if (dst->family == AF_UNSPEC) { - memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); - } else if (destlen != sizeof(struct sockaddr_tipc)) { + if (dst->family == AF_UNSPEC) { + memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); + if (!tipc_sk_type_connectionless(sk)) res = -EINVAL; - } else { - memcpy(&tsk->peer, dest, destlen); - } goto exit; + } else if (dst->family != AF_TIPC) { + res = -EINVAL; } - - /* - * Reject connection attempt using multicast address - * - * Note: send_msg() validates the rest of the address fields, - * so there's no need to do it here - */ - if (dst->addrtype == TIPC_ADDR_MCAST) { + if (dst->addrtype != TIPC_ADDR_ID && dst->addrtype != TIPC_ADDR_NAME) res = -EINVAL; + if (res) + goto exit; + + /* DGRAM/RDM connect(), just save the destaddr */ + if (tipc_sk_type_connectionless(sk)) { + memcpy(&tsk->peer, dest, destlen); goto exit; } -- cgit v1.2.3 From 38077b8ef831daba55913f7e24732b062d0bdebb Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:19 +0200 Subject: tipc: add ability to obtain node availability status from other files In the coming commits, functions at the socket level will need the ability to read the availability status of a given node. We therefore introduce a new function for this purpose, while renaming the existing static function currently having the wanted name. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/node.c | 26 +++++++++++++++++++++----- net/tipc/node.h | 1 + 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/net/tipc/node.c b/net/tipc/node.c index 198dbc7adbe1..6cc1ae600820 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -157,7 +157,7 @@ static void tipc_node_timeout(unsigned long data); static void tipc_node_fsm_evt(struct tipc_node *n, int evt); static struct tipc_node *tipc_node_find(struct net *net, u32 addr); static void tipc_node_put(struct tipc_node *node); -static bool tipc_node_is_up(struct tipc_node *n); +static bool node_is_up(struct tipc_node *n); struct tipc_sock_conn { u32 port; @@ -657,7 +657,7 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, *slot1 = i; } - if (!tipc_node_is_up(n)) { + if (!node_is_up(n)) { if (tipc_link_peer_is_down(l)) tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT); @@ -717,11 +717,27 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) tipc_sk_rcv(n->net, &le->inputq); } -static bool tipc_node_is_up(struct tipc_node *n) +static bool node_is_up(struct tipc_node *n) { return n->active_links[0] != INVALID_BEARER_ID; } +bool tipc_node_is_up(struct net *net, u32 addr) +{ + struct tipc_node *n; + bool retval = false; + + if (in_own_node(net, addr)) + return true; + + n = tipc_node_find(net, addr); + if (!n) + return false; + retval = node_is_up(n); + tipc_node_put(n); + return retval; +} + void tipc_node_check_dest(struct net *net, u32 onode, struct tipc_bearer *b, u16 capabilities, u32 signature, @@ -1149,7 +1165,7 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr)) goto attr_msg_full; - if (tipc_node_is_up(node)) + if (node_is_up(node)) if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP)) goto attr_msg_full; @@ -1249,7 +1265,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb) dst = n->addr; if (in_own_node(net, dst)) continue; - if (!tipc_node_is_up(n)) + if (!node_is_up(n)) continue; txskb = pskb_copy(skb, GFP_ATOMIC); if (!txskb) diff --git a/net/tipc/node.h b/net/tipc/node.h index 898c22916984..8db59feb122f 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -76,6 +76,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb); int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel); +bool tipc_node_is_up(struct net *net, u32 addr); u16 tipc_node_get_capabilities(struct net *net, u32 addr); int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb); int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb); -- cgit v1.2.3 From 64ac5f5977df5b276374fb2f051082129f5cdb22 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:20 +0200 Subject: tipc: refactor function filter_rcv() In the following commits we will need to handle multiple incoming and rejected/returned buffers in the function socket.c::filter_rcv(). As a preparation for this, we generalize the function by handling buffer queues instead of individual buffers. We also introduce a help function tipc_skb_reject(), and rename filter_rcv() to tipc_sk_filter_rcv() in line with other functions in socket.c. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/msg.c | 7 +++ net/tipc/msg.h | 2 + net/tipc/socket.c | 161 +++++++++++++++++++++++++++--------------------------- 3 files changed, 89 insertions(+), 81 deletions(-) diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 17146c16ee2d..1649d456e22d 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -666,3 +666,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, } kfree_skb(skb); } + +void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) + __skb_queue_tail(xmitq, skb); +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index d058b1c464e9..be3e38aa9dd2 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -819,6 +819,8 @@ static inline bool msg_is_reset(struct tipc_msg *hdr) struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp); bool tipc_msg_validate(struct sk_buff *skb); bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err); +void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb, + struct sk_buff_head *xmitq); void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 7659e792ecdb..bc226f5a1be3 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -111,7 +111,7 @@ struct tipc_sock { struct rcu_head rcu; }; -static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); +static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static void tipc_sock_destruct(struct sock *sk); @@ -453,7 +453,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, msg_set_origport(msg, tsk->portid); setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); sk->sk_shutdown = 0; - sk->sk_backlog_rcv = tipc_backlog_rcv; + sk->sk_backlog_rcv = tipc_sk_backlog_rcv; sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; @@ -850,16 +850,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, } /** - * tipc_sk_proto_rcv - receive a connection mng protocol message + * tipc_sk_conn_proto_rcv - receive a connection mng protocol message * @tsk: receiving socket * @skb: pointer to message buffer. */ -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, - struct sk_buff_head *xmitq) +static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { - struct sock *sk = &tsk->sk; - u32 onode = tsk_own_node(tsk); struct tipc_msg *hdr = buf_msg(skb); + u32 onode = tsk_own_node(tsk); + struct sock *sk = &tsk->sk; int mtyp = msg_type(hdr); bool conn_cong; @@ -1536,14 +1536,41 @@ static void tipc_sock_destruct(struct sock *sk) __skb_queue_purge(&sk->sk_receive_queue); } +static void tipc_sk_proto_rcv(struct sock *sk, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb = __skb_dequeue(inputq); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = buf_msg(skb); + + switch (msg_user(hdr)) { + case CONN_MANAGER: + tipc_sk_conn_proto_rcv(tsk, skb, xmitq); + return; + case SOCK_WAKEUP: + u32_del(&tsk->cong_links, msg_orignode(hdr)); + tsk->cong_link_cnt--; + sk->sk_write_space(sk); + break; + case TOP_SRV: + tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); + break; + default: + break; + } + + kfree_skb(skb); +} + /** - * filter_connect - Handle all incoming messages for a connection-based socket + * tipc_filter_connect - Handle incoming message for a connection-based socket * @tsk: TIPC socket * @skb: pointer to message buffer. Set to NULL if buffer is consumed * * Returns true if everything ok, false otherwise */ -static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) +static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) { struct sock *sk = &tsk->sk; struct net *net = sock_net(sk); @@ -1657,7 +1684,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) } /** - * filter_rcv - validate incoming message + * tipc_sk_filter_rcv - validate incoming message * @sk: socket * @skb: pointer to message. * @@ -1666,75 +1693,49 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) * * Called with socket lock already taken * - * Returns true if message was added to socket receive queue, otherwise false */ -static bool filter_rcv(struct sock *sk, struct sk_buff *skb, - struct sk_buff_head *xmitq) +static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { + bool sk_conn = !tipc_sk_type_connectionless(sk); struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); - unsigned int limit = rcvbuf_limit(sk, skb); - int err = TIPC_OK; + struct net *net = sock_net(sk); + struct sk_buff_head inputq; + int limit, err = TIPC_OK; - if (unlikely(!msg_isdata(hdr))) { - switch (msg_user(hdr)) { - case CONN_MANAGER: - tipc_sk_proto_rcv(tsk, skb, xmitq); - return false; - case SOCK_WAKEUP: - u32_del(&tsk->cong_links, msg_orignode(hdr)); - tsk->cong_link_cnt--; - sk->sk_write_space(sk); - break; - case TOP_SRV: - tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); - break; - default: - break; - } - kfree_skb(skb); - return false; - } + TIPC_SKB_CB(skb)->bytes_read = 0; + __skb_queue_head_init(&inputq); + __skb_queue_tail(&inputq, skb); - /* Drop if illegal message type */ - if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { - kfree_skb(skb); - return false; - } + if (unlikely(!msg_isdata(hdr))) + tipc_sk_proto_rcv(sk, &inputq, xmitq); + else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) + return kfree_skb(skb); - /* Reject if wrong message type for current socket state */ - if (tipc_sk_type_connectionless(sk)) { - if (msg_connected(hdr)) { + /* Validate and add to receive buffer if there is space */ + while ((skb = __skb_dequeue(&inputq))) { + hdr = buf_msg(skb); + limit = rcvbuf_limit(sk, skb); + if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || + (!sk_conn && msg_connected(hdr))) err = TIPC_ERR_NO_PORT; - goto reject; - } - } else if (unlikely(!filter_connect(tsk, skb))) { - err = TIPC_ERR_NO_PORT; - goto reject; - } + else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) + err = TIPC_ERR_OVERLOAD; - /* Reject message if there isn't room to queue it */ - if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { - err = TIPC_ERR_OVERLOAD; - goto reject; + if (unlikely(err)) { + tipc_skb_reject(net, err, skb, xmitq); + err = TIPC_OK; + continue; + } + __skb_queue_tail(&sk->sk_receive_queue, skb); + skb_set_owner_r(skb, sk); + sk->sk_data_ready(sk); } - - /* Enqueue message */ - TIPC_SKB_CB(skb)->bytes_read = 0; - __skb_queue_tail(&sk->sk_receive_queue, skb); - skb_set_owner_r(skb, sk); - - sk->sk_data_ready(sk); - return true; - -reject: - if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err)) - __skb_queue_tail(xmitq, skb); - return false; } /** - * tipc_backlog_rcv - handle incoming message from backlog queue + * tipc_sk_backlog_rcv - handle incoming message from backlog queue * @sk: socket * @skb: message * @@ -1742,27 +1743,25 @@ reject: * * Returns 0 */ -static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) +static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb) { - unsigned int truesize = skb->truesize; + unsigned int before = sk_rmem_alloc_get(sk); struct sk_buff_head xmitq; u32 dnode, selector; + unsigned int added; __skb_queue_head_init(&xmitq); - if (likely(filter_rcv(sk, skb, &xmitq))) { - atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); - return 0; - } - - if (skb_queue_empty(&xmitq)) - return 0; + tipc_sk_filter_rcv(sk, skb, &xmitq); + added = sk_rmem_alloc_get(sk) - before; + atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt); - /* Send response/rejected message */ - skb = __skb_dequeue(&xmitq); - dnode = msg_destnode(buf_msg(skb)); - selector = msg_origport(buf_msg(skb)); - tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); + /* Send pending response/rejected messages, if any */ + while ((skb = __skb_dequeue(&xmitq))) { + selector = msg_origport(buf_msg(skb)); + dnode = msg_destnode(buf_msg(skb)); + tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); + } return 0; } @@ -1794,7 +1793,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, /* Add message directly to receive queue if possible */ if (!sock_owned_by_user(sk)) { - filter_rcv(sk, skb, xmitq); + tipc_sk_filter_rcv(sk, skb, xmitq); continue; } -- cgit v1.2.3 From f70d37b796241f617107d5585ee96a7e1b660b63 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:21 +0200 Subject: tipc: add new function for sending multiple small messages We see an increasing need to send multiple single-buffer messages of TIPC_SYSTEM_IMPORTANCE to different individual destination nodes. Instead of looping over the send queue and sending each buffer individually, as we do now, we add a new help function tipc_node_distr_xmit() to do this. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/node.c | 16 ++++++++++++++++ net/tipc/node.h | 1 + net/tipc/socket.c | 14 ++------------ 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/net/tipc/node.c b/net/tipc/node.c index 6cc1ae600820..89f8ac73bf65 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1254,6 +1254,22 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, return 0; } +/* tipc_node_distr_xmit(): send single buffer msgs to individual destinations + * Note: this is only for SYSTEM_IMPORTANCE messages, which cannot be rejected + */ +int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *xmitq) +{ + struct sk_buff *skb; + u32 selector, dnode; + + while ((skb = __skb_dequeue(xmitq))) { + selector = msg_origport(buf_msg(skb)); + dnode = msg_destnode(buf_msg(skb)); + tipc_node_xmit_skb(net, skb, dnode, selector); + } + return 0; +} + void tipc_node_broadcast(struct net *net, struct sk_buff *skb) { struct sk_buff *txskb; diff --git a/net/tipc/node.h b/net/tipc/node.h index 8db59feb122f..df2f2197c4ad 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -68,6 +68,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node, char *linkname, size_t len); int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, int selector); +int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *list); int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, u32 selector); void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index bc226f5a1be3..c7c674934474 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1740,14 +1740,11 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, * @skb: message * * Caller must hold socket lock - * - * Returns 0 */ static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb) { unsigned int before = sk_rmem_alloc_get(sk); struct sk_buff_head xmitq; - u32 dnode, selector; unsigned int added; __skb_queue_head_init(&xmitq); @@ -1757,11 +1754,7 @@ static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb) atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt); /* Send pending response/rejected messages, if any */ - while ((skb = __skb_dequeue(&xmitq))) { - selector = msg_origport(buf_msg(skb)); - dnode = msg_destnode(buf_msg(skb)); - tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); - } + tipc_node_distr_xmit(sock_net(sk), &xmitq); return 0; } @@ -1840,10 +1833,7 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) spin_unlock_bh(&sk->sk_lock.slock); } /* Send pending response/rejected messages, if any */ - while ((skb = __skb_dequeue(&xmitq))) { - dnode = msg_destnode(buf_msg(skb)); - tipc_node_xmit_skb(net, skb, dnode, dport); - } + tipc_node_distr_xmit(sock_net(sk), &xmitq); sock_put(sk); continue; } -- cgit v1.2.3 From a80ae5306a7346d4e52f59462878beb8362f4bbd Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:22 +0200 Subject: tipc: improve destination linked list We often see a need for a linked list of destination identities, sometimes containing a port number, sometimes a node identity, and sometimes both. The currently defined struct u32_list is not generic enough to cover all cases, so we extend it to contain two u32 integers and rename it to struct tipc_dest_list. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bcast.c | 18 +++++------ net/tipc/name_table.c | 89 ++++++++++++++++++++++++++------------------------- net/tipc/name_table.h | 22 ++++++++----- net/tipc/socket.c | 12 +++---- 4 files changed, 74 insertions(+), 67 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index a140dd4a84af..329325bd553e 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -258,20 +258,20 @@ static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, struct tipc_nlist *dests, u16 *cong_link_cnt) { + struct tipc_dest *dst, *tmp; struct sk_buff_head _pkts; - struct u32_item *n, *tmp; - u32 dst, selector; + u32 dnode, selector; selector = msg_link_selector(buf_msg(skb_peek(pkts))); skb_queue_head_init(&_pkts); - list_for_each_entry_safe(n, tmp, &dests->list, list) { - dst = n->value; - if (!tipc_msg_pskb_copy(dst, pkts, &_pkts)) + list_for_each_entry_safe(dst, tmp, &dests->list, list) { + dnode = dst->node; + if (!tipc_msg_pskb_copy(dnode, pkts, &_pkts)) return -ENOMEM; /* Any other return value than -ELINKCONG is ignored */ - if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG) + if (tipc_node_xmit(net, &_pkts, dnode, selector) == -ELINKCONG) (*cong_link_cnt)++; } return 0; @@ -554,7 +554,7 @@ void tipc_nlist_add(struct tipc_nlist *nl, u32 node) { if (node == nl->self) nl->local = true; - else if (u32_push(&nl->list, node)) + else if (tipc_dest_push(&nl->list, node, 0)) nl->remote++; } @@ -562,13 +562,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node) { if (node == nl->self) nl->local = false; - else if (u32_del(&nl->list, node)) + else if (tipc_dest_del(&nl->list, node, 0)) nl->remote--; } void tipc_nlist_purge(struct tipc_nlist *nl) { - u32_list_purge(&nl->list); + tipc_dest_list_purge(&nl->list); nl->remote = 0; nl->local = 0; } diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index bd0aac87b41a..76bd2777baaf 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -634,7 +634,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, info = sseq->info; list_for_each_entry(publ, &info->node_list, node_list) { if (publ->scope <= limit) - u32_push(dports, publ->ref); + tipc_dest_push(dports, 0, publ->ref); } if (info->cluster_list_size != info->node_list_size) @@ -1057,78 +1057,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) return skb->len; } -bool u32_find(struct list_head *l, u32 value) +struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port) { - struct u32_item *item; + u64 value = (u64)node << 32 | port; + struct tipc_dest *dst; - list_for_each_entry(item, l, list) { - if (item->value == value) - return true; + list_for_each_entry(dst, l, list) { + if (dst->value != value) + continue; + return dst; } - return false; + return NULL; } -bool u32_push(struct list_head *l, u32 value) +bool tipc_dest_push(struct list_head *l, u32 node, u32 port) { - struct u32_item *item; + u64 value = (u64)node << 32 | port; + struct tipc_dest *dst; - list_for_each_entry(item, l, list) { - if (item->value == value) - return false; - } - item = kmalloc(sizeof(*item), GFP_ATOMIC); - if (unlikely(!item)) + if (tipc_dest_find(l, node, port)) return false; - item->value = value; - list_add(&item->list, l); + dst = kmalloc(sizeof(*dst), GFP_ATOMIC); + if (unlikely(!dst)) + return false; + dst->value = value; + list_add(&dst->list, l); return true; } -u32 u32_pop(struct list_head *l) +bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port) { - struct u32_item *item; - u32 value = 0; + struct tipc_dest *dst; if (list_empty(l)) - return 0; - item = list_first_entry(l, typeof(*item), list); - value = item->value; - list_del(&item->list); - kfree(item); - return value; + return false; + dst = list_first_entry(l, typeof(*dst), list); + if (port) + *port = dst->port; + if (node) + *node = dst->node; + list_del(&dst->list); + kfree(dst); + return true; } -bool u32_del(struct list_head *l, u32 value) +bool tipc_dest_del(struct list_head *l, u32 node, u32 port) { - struct u32_item *item, *tmp; + struct tipc_dest *dst; - list_for_each_entry_safe(item, tmp, l, list) { - if (item->value != value) - continue; - list_del(&item->list); - kfree(item); - return true; - } - return false; + dst = tipc_dest_find(l, node, port); + if (!dst) + return false; + list_del(&dst->list); + kfree(dst); + return true; } -void u32_list_purge(struct list_head *l) +void tipc_dest_list_purge(struct list_head *l) { - struct u32_item *item, *tmp; + struct tipc_dest *dst, *tmp; - list_for_each_entry_safe(item, tmp, l, list) { - list_del(&item->list); - kfree(item); + list_for_each_entry_safe(dst, tmp, l, list) { + list_del(&dst->list); + kfree(dst); } } -int u32_list_len(struct list_head *l) +int tipc_dest_list_len(struct list_head *l) { - struct u32_item *item; + struct tipc_dest *dst; int i = 0; - list_for_each_entry(item, l, list) { + list_for_each_entry(dst, l, list) { i++; } return i; diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 6ebdeb1d84a5..d121175a92b5 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -120,16 +120,22 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s); int tipc_nametbl_init(struct net *net); void tipc_nametbl_stop(struct net *net); -struct u32_item { +struct tipc_dest { struct list_head list; - u32 value; + union { + struct { + u32 port; + u32 node; + }; + u64 value; + }; }; -bool u32_push(struct list_head *l, u32 value); -u32 u32_pop(struct list_head *l); -bool u32_find(struct list_head *l, u32 value); -bool u32_del(struct list_head *l, u32 value); -void u32_list_purge(struct list_head *l); -int u32_list_len(struct list_head *l); +struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port); +bool tipc_dest_push(struct list_head *l, u32 node, u32 port); +bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port); +bool tipc_dest_del(struct list_head *l, u32 node, u32 port); +void tipc_dest_list_purge(struct list_head *l); +int tipc_dest_list_len(struct list_head *l); #endif diff --git a/net/tipc/socket.c b/net/tipc/socket.c index c7c674934474..daf7c4df4531 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -565,7 +565,7 @@ static int tipc_release(struct socket *sock) /* Reject any messages that accumulated in backlog queue */ release_sock(sk); - u32_list_purge(&tsk->cong_links); + tipc_dest_list_purge(&tsk->cong_links); tsk->cong_link_cnt = 0; call_rcu(&tsk->rcu, tipc_sk_callback); sock->sk = NULL; @@ -826,8 +826,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), msg_nameupper(msg), scope, &dports); - portid = u32_pop(&dports); - for (; portid; portid = u32_pop(&dports)) { + while (tipc_dest_pop(&dports, NULL, &portid)) { _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); if (_skb) { msg_set_destport(buf_msg(_skb), portid); @@ -1000,7 +999,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) } /* Block or return if destination link is congested */ - rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); + rc = tipc_wait_for_cond(sock, &timeout, + !tipc_dest_find(clinks, dnode, 0)); if (unlikely(rc)) return rc; @@ -1012,7 +1012,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); if (unlikely(rc == -ELINKCONG)) { - u32_push(clinks, dnode); + tipc_dest_push(clinks, dnode, 0); tsk->cong_link_cnt++; rc = 0; } @@ -1549,7 +1549,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, tipc_sk_conn_proto_rcv(tsk, skb, xmitq); return; case SOCK_WAKEUP: - u32_del(&tsk->cong_links, msg_orignode(hdr)); + tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0); tsk->cong_link_cnt--; sk->sk_write_space(sk); break; -- cgit v1.2.3 From 75da2163dbb6af9f2dce1d80056d11d290dd19a5 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:23 +0200 Subject: tipc: introduce communication groups As a preparation for introducing flow control for multicast and datagram messaging we need a more strictly defined framework than we have now. A socket must be able keep track of exactly how many and which other sockets it is allowed to communicate with at any moment, and keep the necessary state for those. We therefore introduce a new concept we have named Communication Group. Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN. The call takes four parameters: 'type' serves as group identifier, 'instance' serves as an logical member identifier, and 'scope' indicates the visibility of the group (node/cluster/zone). Finally, 'flags' makes it possible to set certain properties for the member. For now, there is only one flag, indicating if the creator of the socket wants to receive a copy of broadcast or multicast messages it is sending via the socket, and if wants to be eligible as destination for its own anycasts. A group is closed, i.e., sockets which have not joined a group will not be able to send messages to or receive messages from members of the group, and vice versa. Any member of a group can send multicast ('group broadcast') messages to all group members, optionally including itself, using the primitive send(). The messages are received via the recvmsg() primitive. A socket can only be member of one group at a time. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- include/uapi/linux/tipc.h | 14 ++ net/tipc/Makefile | 2 +- net/tipc/group.c | 404 ++++++++++++++++++++++++++++++++++++++++++++++ net/tipc/group.h | 64 ++++++++ net/tipc/link.c | 3 +- net/tipc/msg.h | 50 +++++- net/tipc/name_table.c | 44 +++-- net/tipc/name_table.h | 3 + net/tipc/node.h | 3 +- net/tipc/socket.c | 209 ++++++++++++++++++++---- 10 files changed, 748 insertions(+), 48 deletions(-) create mode 100644 net/tipc/group.c create mode 100644 net/tipc/group.h diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h index 5351b08c897a..5f7b2c4a09ab 100644 --- a/include/uapi/linux/tipc.h +++ b/include/uapi/linux/tipc.h @@ -231,6 +231,20 @@ struct sockaddr_tipc { #define TIPC_SOCK_RECVQ_DEPTH 132 /* Default: none (read only) */ #define TIPC_MCAST_BROADCAST 133 /* Default: TIPC selects. No arg */ #define TIPC_MCAST_REPLICAST 134 /* Default: TIPC selects. No arg */ +#define TIPC_GROUP_JOIN 135 /* Takes struct tipc_group_req* */ +#define TIPC_GROUP_LEAVE 136 /* No argument */ + +/* + * Flag values + */ +#define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */ + +struct tipc_group_req { + __u32 type; /* group id */ + __u32 instance; /* member id */ + __u32 scope; /* zone/cluster/node */ + __u32 flags; +}; /* * Maximum sizes of TIPC bearer-related names (including terminating NULL) diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 31b9f9c52974..a3af73ec0b78 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -8,7 +8,7 @@ tipc-y += addr.o bcast.o bearer.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o monitor.o name_table.o net.o \ netlink.o netlink_compat.o node.o socket.o eth_media.o \ - server.o socket.o + server.o socket.o group.o tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o diff --git a/net/tipc/group.c b/net/tipc/group.c new file mode 100644 index 000000000000..3f0e1ce1e3b9 --- /dev/null +++ b/net/tipc/group.c @@ -0,0 +1,404 @@ +/* + * net/tipc/group.c: TIPC group messaging code + * + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "core.h" +#include "addr.h" +#include "group.h" +#include "bcast.h" +#include "server.h" +#include "msg.h" +#include "socket.h" +#include "node.h" +#include "name_table.h" +#include "subscr.h" + +#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) +#define ADV_IDLE ADV_UNIT + +enum mbr_state { + MBR_QUARANTINED, + MBR_DISCOVERED, + MBR_JOINING, + MBR_PUBLISHED, + MBR_JOINED, + MBR_LEAVING +}; + +struct tipc_member { + struct rb_node tree_node; + struct list_head list; + u32 node; + u32 port; + enum mbr_state state; + u16 bc_rcv_nxt; +}; + +struct tipc_group { + struct rb_root members; + struct tipc_nlist dests; + struct net *net; + int subid; + u32 type; + u32 instance; + u32 domain; + u32 scope; + u32 portid; + u16 member_cnt; + u16 bc_snd_nxt; + bool loopback; +}; + +static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, + int mtyp, struct sk_buff_head *xmitq); + +u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) +{ + return grp->bc_snd_nxt; +} + +static bool tipc_group_is_receiver(struct tipc_member *m) +{ + return m && m->state >= MBR_JOINED; +} + +int tipc_group_size(struct tipc_group *grp) +{ + return grp->member_cnt; +} + +struct tipc_group *tipc_group_create(struct net *net, u32 portid, + struct tipc_group_req *mreq) +{ + struct tipc_group *grp; + u32 type = mreq->type; + + grp = kzalloc(sizeof(*grp), GFP_ATOMIC); + if (!grp) + return NULL; + tipc_nlist_init(&grp->dests, tipc_own_addr(net)); + grp->members = RB_ROOT; + grp->net = net; + grp->portid = portid; + grp->domain = addr_domain(net, mreq->scope); + grp->type = type; + grp->instance = mreq->instance; + grp->scope = mreq->scope; + grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; + if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) + return grp; + kfree(grp); + return NULL; +} + +void tipc_group_delete(struct net *net, struct tipc_group *grp) +{ + struct rb_root *tree = &grp->members; + struct tipc_member *m, *tmp; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { + tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); + list_del(&m->list); + kfree(m); + } + tipc_node_distr_xmit(net, &xmitq); + tipc_nlist_purge(&grp->dests); + tipc_topsrv_kern_unsubscr(net, grp->subid); + kfree(grp); +} + +struct tipc_member *tipc_group_find_member(struct tipc_group *grp, + u32 node, u32 port) +{ + struct rb_node *n = grp->members.rb_node; + u64 nkey, key = (u64)node << 32 | port; + struct tipc_member *m; + + while (n) { + m = container_of(n, struct tipc_member, tree_node); + nkey = (u64)m->node << 32 | m->port; + if (key < nkey) + n = n->rb_left; + else if (key > nkey) + n = n->rb_right; + else + return m; + } + return NULL; +} + +static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, + u32 node) +{ + struct tipc_member *m; + struct rb_node *n; + + for (n = rb_first(&grp->members); n; n = rb_next(n)) { + m = container_of(n, struct tipc_member, tree_node); + if (m->node == node) + return m; + } + return NULL; +} + +static void tipc_group_add_to_tree(struct tipc_group *grp, + struct tipc_member *m) +{ + u64 nkey, key = (u64)m->node << 32 | m->port; + struct rb_node **n, *parent = NULL; + struct tipc_member *tmp; + + n = &grp->members.rb_node; + while (*n) { + tmp = container_of(*n, struct tipc_member, tree_node); + parent = *n; + tmp = container_of(parent, struct tipc_member, tree_node); + nkey = (u64)tmp->node << 32 | tmp->port; + if (key < nkey) + n = &(*n)->rb_left; + else if (key > nkey) + n = &(*n)->rb_right; + else + return; + } + rb_link_node(&m->tree_node, parent, n); + rb_insert_color(&m->tree_node, &grp->members); +} + +static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, + u32 node, u32 port, + int state) +{ + struct tipc_member *m; + + m = kzalloc(sizeof(*m), GFP_ATOMIC); + if (!m) + return NULL; + INIT_LIST_HEAD(&m->list); + m->node = node; + m->port = port; + grp->member_cnt++; + tipc_group_add_to_tree(grp, m); + tipc_nlist_add(&grp->dests, m->node); + m->state = state; + return m; +} + +void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) +{ + tipc_group_create_member(grp, node, port, MBR_DISCOVERED); +} + +static void tipc_group_delete_member(struct tipc_group *grp, + struct tipc_member *m) +{ + rb_erase(&m->tree_node, &grp->members); + grp->member_cnt--; + list_del_init(&m->list); + + /* If last member on a node, remove node from dest list */ + if (!tipc_group_find_node(grp, m->node)) + tipc_nlist_del(&grp->dests, m->node); + + kfree(m); +} + +struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) +{ + return &grp->dests; +} + +void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, + int *scope) +{ + seq->type = grp->type; + seq->lower = grp->instance; + seq->upper = grp->instance; + *scope = grp->scope; +} + +void tipc_group_update_bc_members(struct tipc_group *grp) +{ + grp->bc_snd_nxt++; +} + +/* tipc_group_filter_msg() - determine if we should accept arriving message + */ +void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb = __skb_dequeue(inputq); + struct tipc_member *m; + struct tipc_msg *hdr; + u32 node, port; + int mtyp; + + if (!skb) + return; + + hdr = buf_msg(skb); + mtyp = msg_type(hdr); + node = msg_orignode(hdr); + port = msg_origport(hdr); + + if (!msg_in_group(hdr)) + goto drop; + + m = tipc_group_find_member(grp, node, port); + if (!tipc_group_is_receiver(m)) + goto drop; + + __skb_queue_tail(inputq, skb); + + m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; + return; +drop: + kfree_skb(skb); +} + +static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, + int mtyp, struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr; + struct sk_buff *skb; + + skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, + m->node, tipc_own_addr(grp->net), + m->port, grp->portid, 0); + if (!skb) + return; + + hdr = buf_msg(skb); + if (mtyp == GRP_JOIN_MSG) + msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); + __skb_queue_tail(xmitq, skb); +} + +void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, + struct sk_buff_head *xmitq) +{ + u32 node = msg_orignode(hdr); + u32 port = msg_origport(hdr); + struct tipc_member *m; + + if (!grp) + return; + + m = tipc_group_find_member(grp, node, port); + + switch (msg_type(hdr)) { + case GRP_JOIN_MSG: + if (!m) + m = tipc_group_create_member(grp, node, port, + MBR_QUARANTINED); + if (!m) + return; + m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); + + /* Wait until PUBLISH event is received */ + if (m->state == MBR_DISCOVERED) + m->state = MBR_JOINING; + else if (m->state == MBR_PUBLISHED) + m->state = MBR_JOINED; + return; + case GRP_LEAVE_MSG: + if (!m) + return; + + /* Wait until WITHDRAW event is received */ + if (m->state != MBR_LEAVING) { + m->state = MBR_LEAVING; + return; + } + /* Otherwise deliver already received WITHDRAW event */ + tipc_group_delete_member(grp, m); + return; + default: + pr_warn("Received unknown GROUP_PROTO message\n"); + } +} + +/* tipc_group_member_evt() - receive and handle a member up/down event + */ +void tipc_group_member_evt(struct tipc_group *grp, + struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + struct tipc_event *evt = (void *)msg_data(hdr); + u32 node = evt->port.node; + u32 port = evt->port.ref; + struct tipc_member *m; + struct net *net; + u32 self; + + if (!grp) + goto drop; + + net = grp->net; + self = tipc_own_addr(net); + if (!grp->loopback && node == self && port == grp->portid) + goto drop; + + m = tipc_group_find_member(grp, node, port); + + if (evt->event == TIPC_PUBLISHED) { + if (!m) + m = tipc_group_create_member(grp, node, port, + MBR_DISCOVERED); + if (!m) + goto drop; + + /* Wait if JOIN message not yet received */ + if (m->state == MBR_DISCOVERED) + m->state = MBR_PUBLISHED; + else + m->state = MBR_JOINED; + tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); + } else if (evt->event == TIPC_WITHDRAWN) { + if (!m) + goto drop; + + /* Keep back event if more messages might be expected */ + if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) + m->state = MBR_LEAVING; + else + tipc_group_delete_member(grp, m); + } +drop: + kfree_skb(skb); +} diff --git a/net/tipc/group.h b/net/tipc/group.h new file mode 100644 index 000000000000..9bdf4479fc03 --- /dev/null +++ b/net/tipc/group.h @@ -0,0 +1,64 @@ +/* + * net/tipc/group.h: Include file for TIPC group unicast/multicast functions + * + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_GROUP_H +#define _TIPC_GROUP_H + +#include "core.h" + +struct tipc_group; +struct tipc_member; +struct tipc_msg; + +struct tipc_group *tipc_group_create(struct net *net, u32 portid, + struct tipc_group_req *mreq); +void tipc_group_delete(struct net *net, struct tipc_group *grp); +void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port); +struct tipc_nlist *tipc_group_dests(struct tipc_group *grp); +void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, + int *scope); +void tipc_group_filter_msg(struct tipc_group *grp, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq); +void tipc_group_member_evt(struct tipc_group *grp, + struct sk_buff *skb, + struct sk_buff_head *xmitq); +void tipc_group_proto_rcv(struct tipc_group *grp, + struct tipc_msg *hdr, + struct sk_buff_head *xmitq); +void tipc_group_update_bc_members(struct tipc_group *grp); +u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); +int tipc_group_size(struct tipc_group *grp); +#endif diff --git a/net/tipc/link.c b/net/tipc/link.c index ac0144f532aa..bd25bff63925 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1046,11 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: - if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { + if (unlikely(msg_mcast(hdr))) { skb_queue_tail(l->bc_rcvlink->inputq, skb); return true; } case CONN_MANAGER: + case GROUP_PROTOCOL: skb_queue_tail(inputq, skb); return true; case NAME_DISTRIBUTOR: diff --git a/net/tipc/msg.h b/net/tipc/msg.h index be3e38aa9dd2..dad400935405 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1,7 +1,7 @@ /* * net/tipc/msg.h: Include file for TIPC message header routines * - * Copyright (c) 2000-2007, 2014-2015 Ericsson AB + * Copyright (c) 2000-2007, 2014-2017 Ericsson AB * Copyright (c) 2005-2008, 2010-2011, Wind River Systems * All rights reserved. * @@ -61,10 +61,11 @@ struct plist; /* * Payload message types */ -#define TIPC_CONN_MSG 0 -#define TIPC_MCAST_MSG 1 -#define TIPC_NAMED_MSG 2 -#define TIPC_DIRECT_MSG 3 +#define TIPC_CONN_MSG 0 +#define TIPC_MCAST_MSG 1 +#define TIPC_NAMED_MSG 2 +#define TIPC_DIRECT_MSG 3 +#define TIPC_GRP_BCAST_MSG 4 /* * Internal message users @@ -73,6 +74,7 @@ struct plist; #define MSG_BUNDLER 6 #define LINK_PROTOCOL 7 #define CONN_MANAGER 8 +#define GROUP_PROTOCOL 9 #define TUNNEL_PROTOCOL 10 #define NAME_DISTRIBUTOR 11 #define MSG_FRAGMENTER 12 @@ -87,6 +89,7 @@ struct plist; #define BASIC_H_SIZE 32 /* Basic payload message */ #define NAMED_H_SIZE 40 /* Named payload message */ #define MCAST_H_SIZE 44 /* Multicast payload message */ +#define GROUP_H_SIZE 44 /* Group payload message */ #define INT_H_SIZE 40 /* Internal messages */ #define MIN_H_SIZE 24 /* Smallest legal TIPC header size */ #define MAX_H_SIZE 60 /* Largest possible TIPC header size */ @@ -252,6 +255,11 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 29, 0x7, n); } +static inline int msg_in_group(struct tipc_msg *m) +{ + return (msg_type(m) == TIPC_GRP_BCAST_MSG); +} + static inline u32 msg_named(struct tipc_msg *m) { return msg_type(m) == TIPC_NAMED_MSG; @@ -259,7 +267,9 @@ static inline u32 msg_named(struct tipc_msg *m) static inline u32 msg_mcast(struct tipc_msg *m) { - return msg_type(m) == TIPC_MCAST_MSG; + int mtyp = msg_type(m); + + return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG)); } static inline u32 msg_connected(struct tipc_msg *m) @@ -514,6 +524,12 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) #define DSC_REQ_MSG 0 #define DSC_RESP_MSG 1 +/* + * Group protocol message types + */ +#define GRP_JOIN_MSG 0 +#define GRP_LEAVE_MSG 1 + /* * Word 1 */ @@ -795,6 +811,28 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } +static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + +/* Word 10 + */ +static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) +{ + return msg_bits(m, 10, 16, 0xffff); +} + +static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 10, 16, 0xffff, n); +} + static inline bool msg_peer_link_is_up(struct tipc_msg *m) { if (likely(msg_user(m) != LINK_PROTOCOL)) diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 76bd2777baaf..114d72bab827 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -43,6 +43,7 @@ #include "bcast.h" #include "addr.h" #include "node.h" +#include "group.h" #include #define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ @@ -596,18 +597,6 @@ not_found: return ref; } -/** - * tipc_nametbl_mc_translate - find multicast destinations - * - * Creates list of all local ports that overlap the given multicast address; - * also determines if any off-node ports overlap. - * - * Note: Publications with a scope narrower than 'limit' are ignored. - * (i.e. local node-scope publications mustn't receive messages arriving - * from another node, even if the multcast link brought it here) - * - * Returns non-zero if any off-node ports overlap - */ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports) { @@ -679,6 +668,37 @@ exit: rcu_read_unlock(); } +/* tipc_nametbl_build_group - build list of communication group members + */ +void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, + u32 type, u32 domain) +{ + struct sub_seq *sseq, *stop; + struct name_info *info; + struct publication *p; + struct name_seq *seq; + + rcu_read_lock(); + seq = nametbl_find_seq(net, type); + if (!seq) + goto exit; + + spin_lock_bh(&seq->lock); + sseq = seq->sseqs; + stop = seq->sseqs + seq->first_free; + for (; sseq != stop; sseq++) { + info = sseq->info; + list_for_each_entry(p, &info->zone_list, zone_list) { + if (!tipc_in_scope(domain, p->node)) + continue; + tipc_group_add_member(grp, p->node, p->ref); + } + } + spin_unlock_bh(&seq->lock); +exit: + rcu_read_unlock(); +} + /* * tipc_nametbl_publish - add name publication to network name tables */ diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index d121175a92b5..97646b17a4a2 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -40,6 +40,7 @@ struct tipc_subscription; struct tipc_plist; struct tipc_nlist; +struct tipc_group; /* * TIPC name types reserved for internal TIPC use (both current and planned) @@ -101,6 +102,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports); +void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, + u32 type, u32 domain); void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, u32 upper, u32 domain, struct tipc_nlist *nodes); diff --git a/net/tipc/node.h b/net/tipc/node.h index df2f2197c4ad..acd58d23a70e 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -48,7 +48,8 @@ enum { TIPC_BCAST_SYNCH = (1 << 1), TIPC_BCAST_STATE_NACK = (1 << 2), TIPC_BLOCK_FLOWCTL = (1 << 3), - TIPC_BCAST_RCAST = (1 << 4) + TIPC_BCAST_RCAST = (1 << 4), + TIPC_MCAST_GROUPS = (1 << 5) }; #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ diff --git a/net/tipc/socket.c b/net/tipc/socket.c index daf7c4df4531..64bbf9d03629 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, 2012-2016, Ericsson AB + * Copyright (c) 2001-2007, 2012-2017, Ericsson AB * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * @@ -45,6 +45,7 @@ #include "socket.h" #include "bcast.h" #include "netlink.h" +#include "group.h" #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */ @@ -78,7 +79,7 @@ enum { * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue * @cong_link_cnt: number of congested links - * @sent_unacked: # messages sent by socket, and not yet acked by peer + * @snt_unacked: # messages sent by socket, and not yet acked by peer * @rcv_unacked: # messages read by user, but not yet acked back to peer * @peer: 'connected' peer for dgram/rdm * @node: hash table node @@ -109,6 +110,7 @@ struct tipc_sock { struct rhash_head node; struct tipc_mc_method mc_method; struct rcu_head rcu; + struct tipc_group *group; }; static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); @@ -123,6 +125,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); +static int tipc_sk_leave(struct tipc_sock *tsk); static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); @@ -559,6 +562,7 @@ static int tipc_release(struct socket *sock) __tipc_shutdown(sock, TIPC_ERR_NO_PORT); sk->sk_shutdown = SHUTDOWN_MASK; + tipc_sk_leave(tsk); tipc_sk_withdraw(tsk, 0, NULL); sk_stop_timer(sk, &sk->sk_timer); tipc_sk_remove(tsk); @@ -601,7 +605,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, res = tipc_sk_withdraw(tsk, 0, NULL); goto exit; } - + if (tsk->group) { + res = -EACCES; + goto exit; + } if (uaddr_len < sizeof(struct sockaddr_tipc)) { res = -EINVAL; goto exit; @@ -698,6 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; u32 mask = 0; sock_poll_wait(file, sk_sleep(sk), wait); @@ -718,8 +726,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, mask |= (POLLIN | POLLRDNORM); break; case TIPC_OPEN: - if (!tsk->cong_link_cnt) - mask |= POLLOUT; + if (!grp || tipc_group_size(grp)) + if (!tsk->cong_link_cnt) + mask |= POLLOUT; if (tipc_sk_type_connectionless(sk) && (!skb_queue_empty(&sk->sk_receive_queue))) mask |= (POLLIN | POLLRDNORM); @@ -757,6 +766,9 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, struct tipc_nlist dsts; int rc; + if (tsk->group) + return -EACCES; + /* Block or return if any destination link is congested */ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); if (unlikely(rc)) @@ -793,6 +805,64 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, return rc ? rc : dlen; } +/** + * tipc_send_group_bcast - send message to all members in communication group + * @sk: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + struct net *net = sock_net(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct tipc_nlist *dsts = tipc_group_dests(grp); + struct tipc_mc_method *method = &tsk->mc_method; + struct tipc_msg *hdr = &tsk->phdr; + int mtu = tipc_bcast_get_mtu(net); + struct sk_buff_head pkts; + int rc = -EHOSTUNREACH; + + if (!dsts->local && !dsts->remote) + return -EHOSTUNREACH; + + /* Block or return if any destination link is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; + + /* Complete message header */ + msg_set_type(hdr, TIPC_GRP_BCAST_MSG); + msg_set_hdr_sz(hdr, MCAST_H_SIZE); + msg_set_destport(hdr, 0); + msg_set_destnode(hdr, 0); + msg_set_nameinst(hdr, 0); + msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); + + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; + + /* Send message */ + rc = tipc_mcast_xmit(net, &pkts, method, dsts, + &tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; + + /* Update broadcast sequence number */ + tipc_group_update_bc_members(tsk->group); + + return dlen; +} + /** * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets * @arrvq: queue with arriving messages, to be cloned after destination lookup @@ -803,13 +873,15 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff_head *inputq) { - struct tipc_msg *msg; - struct list_head dports; - u32 portid; u32 scope = TIPC_CLUSTER_SCOPE; - struct sk_buff_head tmpq; - uint hsz; + u32 self = tipc_own_addr(net); struct sk_buff *skb, *_skb; + u32 lower = 0, upper = ~0; + struct sk_buff_head tmpq; + u32 portid, oport, onode; + struct list_head dports; + struct tipc_msg *msg; + int hsz; __skb_queue_head_init(&tmpq); INIT_LIST_HEAD(&dports); @@ -818,14 +890,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { msg = buf_msg(skb); hsz = skb_headroom(skb) + msg_hdr_sz(msg); - - if (in_own_node(net, msg_orignode(msg))) + oport = msg_origport(msg); + onode = msg_orignode(msg); + if (onode == self) scope = TIPC_NODE_SCOPE; /* Create destination port list and message clones: */ - tipc_nametbl_mc_translate(net, - msg_nametype(msg), msg_namelower(msg), - msg_nameupper(msg), scope, &dports); + if (!msg_in_group(msg)) { + lower = msg_namelower(msg); + upper = msg_nameupper(msg); + } + tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper, + scope, &dports); while (tipc_dest_pop(&dports, NULL, &portid)) { _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); if (_skb) { @@ -895,10 +971,6 @@ exit: kfree_skb(skb); } -static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt) -{ -} - /** * tipc_sendmsg - send message in connectionless manner * @sock: socket structure @@ -934,6 +1006,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); struct list_head *clinks = &tsk->cong_links; bool syn = !tipc_sk_type_connectionless(sk); + struct tipc_group *grp = tsk->group; struct tipc_msg *hdr = &tsk->phdr; struct tipc_name_seq *seq; struct sk_buff_head pkts; @@ -944,6 +1017,9 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; + if (unlikely(grp)) + return tipc_send_group_bcast(sock, m, dlen, timeout); + if (unlikely(!dest)) { dest = &tsk->peer; if (!syn || dest->family != AF_TIPC) @@ -1543,6 +1619,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, struct sk_buff *skb = __skb_dequeue(inputq); struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); + struct tipc_group *grp = tsk->group; switch (msg_user(hdr)) { case CONN_MANAGER: @@ -1553,8 +1630,12 @@ static void tipc_sk_proto_rcv(struct sock *sk, tsk->cong_link_cnt--; sk->sk_write_space(sk); break; + case GROUP_PROTOCOL: + tipc_group_proto_rcv(grp, hdr, xmitq); + break; case TOP_SRV: - tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); + tipc_group_member_evt(tsk->group, skb, xmitq); + skb = NULL; break; default: break; @@ -1699,6 +1780,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, { bool sk_conn = !tipc_sk_type_connectionless(sk); struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; struct tipc_msg *hdr = buf_msg(skb); struct net *net = sock_net(sk); struct sk_buff_head inputq; @@ -1710,15 +1792,19 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(!msg_isdata(hdr))) tipc_sk_proto_rcv(sk, &inputq, xmitq); - else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) + else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG)) return kfree_skb(skb); + if (unlikely(grp)) + tipc_group_filter_msg(grp, &inputq, xmitq); + /* Validate and add to receive buffer if there is space */ while ((skb = __skb_dequeue(&inputq))) { hdr = buf_msg(skb); limit = rcvbuf_limit(sk, skb); if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || - (!sk_conn && msg_connected(hdr))) + (!sk_conn && msg_connected(hdr)) || + (!grp && msg_in_group(hdr))) err = TIPC_ERR_NO_PORT; else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) err = TIPC_ERR_OVERLOAD; @@ -1837,7 +1923,6 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) sock_put(sk); continue; } - /* No destination socket => dequeue skb if still there */ skb = tipc_skb_dequeue(inputq, dport); if (!skb) @@ -1905,6 +1990,11 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest, lock_sock(sk); + if (tsk->group) { + res = -EINVAL; + goto exit; + } + if (dst->family == AF_UNSPEC) { memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); if (!tipc_sk_type_connectionless(sk)) @@ -2341,6 +2431,52 @@ void tipc_sk_rht_destroy(struct net *net) rhashtable_destroy(&tn->sk_rht); } +static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) +{ + struct net *net = sock_net(&tsk->sk); + u32 domain = addr_domain(net, mreq->scope); + struct tipc_group *grp = tsk->group; + struct tipc_msg *hdr = &tsk->phdr; + struct tipc_name_seq seq; + int rc; + + if (mreq->type < TIPC_RESERVED_TYPES) + return -EACCES; + if (grp) + return -EACCES; + grp = tipc_group_create(net, tsk->portid, mreq); + if (!grp) + return -ENOMEM; + tsk->group = grp; + msg_set_lookup_scope(hdr, mreq->scope); + msg_set_nametype(hdr, mreq->type); + msg_set_dest_droppable(hdr, true); + seq.type = mreq->type; + seq.lower = mreq->instance; + seq.upper = seq.lower; + tipc_nametbl_build_group(net, grp, mreq->type, domain); + rc = tipc_sk_publish(tsk, mreq->scope, &seq); + if (rc) + tipc_group_delete(net, grp); + return rc; +} + +static int tipc_sk_leave(struct tipc_sock *tsk) +{ + struct net *net = sock_net(&tsk->sk); + struct tipc_group *grp = tsk->group; + struct tipc_name_seq seq; + int scope; + + if (!grp) + return -EINVAL; + tipc_group_self(grp, &seq, &scope); + tipc_group_delete(net, grp); + tsk->group = NULL; + tipc_sk_withdraw(tsk, scope, &seq); + return 0; +} + /** * tipc_setsockopt - set socket option * @sock: socket structure @@ -2359,6 +2495,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group_req mreq; u32 value = 0; int res = 0; @@ -2374,9 +2511,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, case TIPC_CONN_TIMEOUT: if (ol < sizeof(value)) return -EINVAL; - res = get_user(value, (u32 __user *)ov); - if (res) - return res; + if (get_user(value, (u32 __user *)ov)) + return -EFAULT; + break; + case TIPC_GROUP_JOIN: + if (ol < sizeof(mreq)) + return -EINVAL; + if (copy_from_user(&mreq, ov, sizeof(mreq))) + return -EFAULT; break; default: if (ov || ol) @@ -2409,6 +2551,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, tsk->mc_method.rcast = true; tsk->mc_method.mandatory = true; break; + case TIPC_GROUP_JOIN: + res = tipc_sk_join(tsk, &mreq); + break; + case TIPC_GROUP_LEAVE: + res = tipc_sk_leave(tsk); + break; default: res = -EINVAL; } @@ -2436,7 +2584,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - int len; + struct tipc_name_seq seq; + int len, scope; u32 value; int res; @@ -2470,6 +2619,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, case TIPC_SOCK_RECVQ_DEPTH: value = skb_queue_len(&sk->sk_receive_queue); break; + case TIPC_GROUP_JOIN: + seq.type = 0; + if (tsk->group) + tipc_group_self(tsk->group, &seq, &scope); + value = seq.type; + break; default: res = -EINVAL; } -- cgit v1.2.3 From 31c82a2d9d51fccbb85cbd2be983eb115225301c Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:24 +0200 Subject: tipc: add second source address to recvmsg()/recvfrom() With group communication, it becomes important for a message receiver to identify not only from which socket (identfied by a node:port tuple) the message was sent, but also the logical identity (type:instance) of the sending member. We fix this by adding a second instance of struct sockaddr_tipc to the source address area when a message is read. The extra address struct is filled in with data found in the received message header (type,) and in the local member representation struct (instance.) Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 3 +++ net/tipc/msg.h | 1 + net/tipc/socket.c | 53 +++++++++++++++++++++++++++++++++++------------------ 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index 3f0e1ce1e3b9..beb214a3420c 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -61,6 +61,7 @@ struct tipc_member { struct list_head list; u32 node; u32 port; + u32 instance; enum mbr_state state; u16 bc_rcv_nxt; }; @@ -282,6 +283,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!tipc_group_is_receiver(m)) goto drop; + TIPC_SKB_CB(skb)->orig_member = m->instance; __skb_queue_tail(inputq, skb); m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; @@ -388,6 +390,7 @@ void tipc_group_member_evt(struct tipc_group *grp, m->state = MBR_PUBLISHED; else m->state = MBR_JOINED; + m->instance = evt->found_lower; tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); } else if (evt->event == TIPC_WITHDRAWN) { if (!m) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index dad400935405..e438716d2372 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -100,6 +100,7 @@ struct plist; struct tipc_skb_cb { u32 bytes_read; + u32 orig_member; struct sk_buff *tail; bool validated; u16 chain_imp; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 64bbf9d03629..25ecf1201527 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -62,6 +62,11 @@ enum { TIPC_CONNECTING = TCP_SYN_SENT, }; +struct sockaddr_pair { + struct sockaddr_tipc sock; + struct sockaddr_tipc member; +}; + /** * struct tipc_sock - TIPC socket structure * @sk: socket - interacts with 'port' and with user via the socket API @@ -1222,26 +1227,38 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, } /** - * set_orig_addr - capture sender's address for received message + * tipc_sk_set_orig_addr - capture sender's address for received message * @m: descriptor for message info - * @msg: received message header + * @hdr: received message header * * Note: Address is not captured if not requested by receiver. */ -static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) -{ - DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name); - - if (addr) { - addr->family = AF_TIPC; - addr->addrtype = TIPC_ADDR_ID; - memset(&addr->addr, 0, sizeof(addr->addr)); - addr->addr.id.ref = msg_origport(msg); - addr->addr.id.node = msg_orignode(msg); - addr->addr.name.domain = 0; /* could leave uninitialized */ - addr->scope = 0; /* could leave uninitialized */ - m->msg_namelen = sizeof(struct sockaddr_tipc); - } +static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb) +{ + DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name); + struct tipc_msg *hdr = buf_msg(skb); + + if (!srcaddr) + return; + + srcaddr->sock.family = AF_TIPC; + srcaddr->sock.addrtype = TIPC_ADDR_ID; + srcaddr->sock.addr.id.ref = msg_origport(hdr); + srcaddr->sock.addr.id.node = msg_orignode(hdr); + srcaddr->sock.addr.name.domain = 0; + srcaddr->sock.scope = 0; + m->msg_namelen = sizeof(struct sockaddr_tipc); + + if (!msg_in_group(hdr)) + return; + + /* Group message users may also want to know sending member's id */ + srcaddr->member.family = AF_TIPC; + srcaddr->member.addrtype = TIPC_ADDR_NAME; + srcaddr->member.addr.name.name.type = msg_nametype(hdr); + srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member; + srcaddr->member.addr.name.domain = 0; + m->msg_namelen = sizeof(*srcaddr); } /** @@ -1432,7 +1449,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, } while (1); /* Collect msg meta data, including error code and rejected data */ - set_orig_addr(m, hdr); + tipc_sk_set_orig_addr(m, skb); rc = tipc_sk_anc_data_recv(m, hdr, tsk); if (unlikely(rc)) goto exit; @@ -1526,7 +1543,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, /* Collect msg meta data, incl. error code and rejected data */ if (!copied) { - set_orig_addr(m, hdr); + tipc_sk_set_orig_addr(m, skb); rc = tipc_sk_anc_data_recv(m, hdr, tsk); if (rc) break; -- cgit v1.2.3 From ae236fb208a6fbbd2e7a6913385e8fb78ac807f8 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:25 +0200 Subject: tipc: receive group membership events via member socket Like with any other service, group members' availability can be subscribed for by connecting to be topology server. However, because the events arrive via a different socket than the member socket, there is a real risk that membership events my arrive out of synch with the actual JOIN/LEAVE action. I.e., it is possible to receive the first messages from a new member before the corresponding JOIN event arrives, just as it is possible to receive the last messages from a leaving member after the LEAVE event has already been received. Since each member socket is internally also subscribing for membership events, we now fix this problem by passing those events on to the user via the member socket. We leverage the already present member synch- ronization protocol to guarantee correct message/event order. An event is delivered to the user as an empty message where the two source addresses identify the new/lost member. Furthermore, we set the MSG_OOB bit in the message flags to mark it as an event. If the event is an indication about a member loss we also set the MSG_EOR bit, so it can be distinguished from a member addition event. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- include/uapi/linux/tipc.h | 1 + net/tipc/group.c | 60 +++++++++++++++++++++++++++++++++++++---------- net/tipc/group.h | 2 ++ net/tipc/msg.h | 22 +++++++++++++++-- net/tipc/socket.c | 49 ++++++++++++++++++++++++-------------- 5 files changed, 101 insertions(+), 33 deletions(-) diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h index 5f7b2c4a09ab..ef41c11a7f38 100644 --- a/include/uapi/linux/tipc.h +++ b/include/uapi/linux/tipc.h @@ -238,6 +238,7 @@ struct sockaddr_tipc { * Flag values */ #define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */ +#define TIPC_GROUP_MEMBER_EVTS 0x2 /* Receive membership events in socket */ struct tipc_group_req { __u32 type; /* group id */ diff --git a/net/tipc/group.c b/net/tipc/group.c index beb214a3420c..1bfa9348b26d 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -59,6 +59,7 @@ enum mbr_state { struct tipc_member { struct rb_node tree_node; struct list_head list; + struct sk_buff *event_msg; u32 node; u32 port; u32 instance; @@ -79,6 +80,7 @@ struct tipc_group { u16 member_cnt; u16 bc_snd_nxt; bool loopback; + bool events; }; static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, @@ -117,6 +119,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, grp->instance = mreq->instance; grp->scope = mreq->scope; grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; + grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) return grp; kfree(grp); @@ -279,6 +282,13 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!msg_in_group(hdr)) goto drop; + if (mtyp == TIPC_GRP_MEMBER_EVT) { + if (!grp->events) + goto drop; + __skb_queue_tail(inputq, skb); + return; + } + m = tipc_group_find_member(grp, node, port); if (!tipc_group_is_receiver(m)) goto drop; @@ -311,6 +321,7 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, } void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, + struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { u32 node = msg_orignode(hdr); @@ -332,10 +343,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); /* Wait until PUBLISH event is received */ - if (m->state == MBR_DISCOVERED) + if (m->state == MBR_DISCOVERED) { m->state = MBR_JOINING; - else if (m->state == MBR_PUBLISHED) + } else if (m->state == MBR_PUBLISHED) { m->state = MBR_JOINED; + __skb_queue_tail(inputq, m->event_msg); + } return; case GRP_LEAVE_MSG: if (!m) @@ -347,6 +360,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, return; } /* Otherwise deliver already received WITHDRAW event */ + __skb_queue_tail(inputq, m->event_msg); tipc_group_delete_member(grp, m); return; default: @@ -354,16 +368,17 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, } } -/* tipc_group_member_evt() - receive and handle a member up/down event - */ void tipc_group_member_evt(struct tipc_group *grp, struct sk_buff *skb, + struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { struct tipc_msg *hdr = buf_msg(skb); struct tipc_event *evt = (void *)msg_data(hdr); + u32 instance = evt->found_lower; u32 node = evt->port.node; u32 port = evt->port.ref; + int event = evt->event; struct tipc_member *m; struct net *net; u32 self; @@ -376,32 +391,51 @@ void tipc_group_member_evt(struct tipc_group *grp, if (!grp->loopback && node == self && port == grp->portid) goto drop; + /* Convert message before delivery to user */ + msg_set_hdr_sz(hdr, GROUP_H_SIZE); + msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); + msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); + msg_set_origport(hdr, port); + msg_set_orignode(hdr, node); + msg_set_nametype(hdr, grp->type); + msg_set_grp_evt(hdr, event); + m = tipc_group_find_member(grp, node, port); - if (evt->event == TIPC_PUBLISHED) { + if (event == TIPC_PUBLISHED) { if (!m) m = tipc_group_create_member(grp, node, port, MBR_DISCOVERED); if (!m) goto drop; - /* Wait if JOIN message not yet received */ - if (m->state == MBR_DISCOVERED) + /* Hold back event if JOIN message not yet received */ + if (m->state == MBR_DISCOVERED) { + m->event_msg = skb; m->state = MBR_PUBLISHED; - else + } else { + __skb_queue_tail(inputq, skb); m->state = MBR_JOINED; - m->instance = evt->found_lower; + } + m->instance = instance; + TIPC_SKB_CB(skb)->orig_member = m->instance; tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); - } else if (evt->event == TIPC_WITHDRAWN) { + } else if (event == TIPC_WITHDRAWN) { if (!m) goto drop; - /* Keep back event if more messages might be expected */ - if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) + TIPC_SKB_CB(skb)->orig_member = m->instance; + + /* Hold back event if more messages might be expected */ + if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) { + m->event_msg = skb; m->state = MBR_LEAVING; - else + } else { + __skb_queue_tail(inputq, skb); tipc_group_delete_member(grp, m); + } } + return; drop: kfree_skb(skb); } diff --git a/net/tipc/group.h b/net/tipc/group.h index 9bdf4479fc03..5d3f10d28967 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -54,9 +54,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *xmitq); void tipc_group_member_evt(struct tipc_group *grp, struct sk_buff *skb, + struct sk_buff_head *inputq, struct sk_buff_head *xmitq); void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, + struct sk_buff_head *inputq, struct sk_buff_head *xmitq); void tipc_group_update_bc_members(struct tipc_group *grp); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index e438716d2372..1b527b154e46 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -65,7 +65,8 @@ struct plist; #define TIPC_MCAST_MSG 1 #define TIPC_NAMED_MSG 2 #define TIPC_DIRECT_MSG 3 -#define TIPC_GRP_BCAST_MSG 4 +#define TIPC_GRP_MEMBER_EVT 4 +#define TIPC_GRP_BCAST_MSG 5 /* * Internal message users @@ -258,7 +259,14 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n) static inline int msg_in_group(struct tipc_msg *m) { - return (msg_type(m) == TIPC_GRP_BCAST_MSG); + int mtyp = msg_type(m); + + return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT); +} + +static inline bool msg_is_grp_evt(struct tipc_msg *m) +{ + return msg_type(m) == TIPC_GRP_MEMBER_EVT; } static inline u32 msg_named(struct tipc_msg *m) @@ -824,6 +832,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) /* Word 10 */ +static inline u16 msg_grp_evt(struct tipc_msg *m) +{ + return msg_bits(m, 10, 0, 0x3); +} + +static inline void msg_set_grp_evt(struct tipc_msg *m, int n) +{ + msg_set_bits(m, 10, 0, 0x3, n); +} + static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) { return msg_bits(m, 10, 16, 0xffff); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 25ecf1201527..0a2eac309177 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -709,41 +709,43 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, poll_table *wait) { struct sock *sk = sock->sk; + struct sk_buff *skb = skb_peek(&sk->sk_receive_queue); struct tipc_sock *tsk = tipc_sk(sk); struct tipc_group *grp = tsk->group; - u32 mask = 0; + u32 revents = 0; sock_poll_wait(file, sk_sleep(sk), wait); if (sk->sk_shutdown & RCV_SHUTDOWN) - mask |= POLLRDHUP | POLLIN | POLLRDNORM; + revents |= POLLRDHUP | POLLIN | POLLRDNORM; if (sk->sk_shutdown == SHUTDOWN_MASK) - mask |= POLLHUP; + revents |= POLLHUP; switch (sk->sk_state) { case TIPC_ESTABLISHED: if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) - mask |= POLLOUT; + revents |= POLLOUT; /* fall thru' */ case TIPC_LISTEN: case TIPC_CONNECTING: - if (!skb_queue_empty(&sk->sk_receive_queue)) - mask |= (POLLIN | POLLRDNORM); + if (skb) + revents |= POLLIN | POLLRDNORM; break; case TIPC_OPEN: if (!grp || tipc_group_size(grp)) if (!tsk->cong_link_cnt) - mask |= POLLOUT; - if (tipc_sk_type_connectionless(sk) && - (!skb_queue_empty(&sk->sk_receive_queue))) - mask |= (POLLIN | POLLRDNORM); + revents |= POLLOUT; + if (!tipc_sk_type_connectionless(sk)) + break; + if (!skb) + break; + revents |= POLLIN | POLLRDNORM; break; case TIPC_DISCONNECTING: - mask = (POLLIN | POLLRDNORM | POLLHUP); + revents = POLLIN | POLLRDNORM | POLLHUP; break; } - - return mask; + return revents; } /** @@ -1415,11 +1417,12 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buflen, int flags) { struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - struct sk_buff *skb; - struct tipc_msg *hdr; bool connected = !tipc_sk_type_connectionless(sk); + struct tipc_sock *tsk = tipc_sk(sk); int rc, err, hlen, dlen, copy; + struct tipc_msg *hdr; + struct sk_buff *skb; + bool grp_evt; long timeout; /* Catch invalid receive requests */ @@ -1443,6 +1446,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, dlen = msg_data_sz(hdr); hlen = msg_hdr_sz(hdr); err = msg_errcode(hdr); + grp_evt = msg_is_grp_evt(hdr); if (likely(dlen || err)) break; tsk_advance_rx_queue(sk); @@ -1469,11 +1473,20 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, if (unlikely(rc)) goto exit; + /* Mark message as group event if applicable */ + if (unlikely(grp_evt)) { + if (msg_grp_evt(hdr) == TIPC_WITHDRAWN) + m->msg_flags |= MSG_EOR; + m->msg_flags |= MSG_OOB; + copy = 0; + } + /* Caption of data or error code/rejected data was successful */ if (unlikely(flags & MSG_PEEK)) goto exit; tsk_advance_rx_queue(sk); + if (likely(!connected)) goto exit; @@ -1648,10 +1661,10 @@ static void tipc_sk_proto_rcv(struct sock *sk, sk->sk_write_space(sk); break; case GROUP_PROTOCOL: - tipc_group_proto_rcv(grp, hdr, xmitq); + tipc_group_proto_rcv(grp, hdr, inputq, xmitq); break; case TOP_SRV: - tipc_group_member_evt(tsk->group, skb, xmitq); + tipc_group_member_evt(tsk->group, skb, inputq, xmitq); skb = NULL; break; default: -- cgit v1.2.3 From b7d42635517fde2b095deddd0fba37be2302a285 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:26 +0200 Subject: tipc: introduce flow control for group broadcast messages We introduce an end-to-end flow control mechanism for group broadcast messages. This ensures that no messages are ever lost because of destination receive buffer overflow, with minimal impact on performance. For now, the algorithm is based on the assumption that there is only one active transmitter at any moment in time. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- net/tipc/group.h | 11 ++-- net/tipc/msg.h | 5 +- net/tipc/socket.c | 48 +++++++++++++----- 4 files changed, 190 insertions(+), 22 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index 1bfa9348b26d..b8ed70abba01 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -46,6 +46,7 @@ #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) #define ADV_IDLE ADV_UNIT +#define ADV_ACTIVE (ADV_UNIT * 12) enum mbr_state { MBR_QUARANTINED, @@ -59,16 +60,22 @@ enum mbr_state { struct tipc_member { struct rb_node tree_node; struct list_head list; + struct list_head congested; struct sk_buff *event_msg; + struct tipc_group *group; u32 node; u32 port; u32 instance; enum mbr_state state; + u16 advertised; + u16 window; u16 bc_rcv_nxt; + bool usr_pending; }; struct tipc_group { struct rb_root members; + struct list_head congested; struct tipc_nlist dests; struct net *net; int subid; @@ -86,11 +93,24 @@ struct tipc_group { static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, int mtyp, struct sk_buff_head *xmitq); +static int tipc_group_rcvbuf_limit(struct tipc_group *grp) +{ + int mcnt = grp->member_cnt + 1; + + /* Scale to bytes, considering worst-case truesize/msgsize ratio */ + return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4; +} + u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) { return grp->bc_snd_nxt; } +static bool tipc_group_is_enabled(struct tipc_member *m) +{ + return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; +} + static bool tipc_group_is_receiver(struct tipc_member *m) { return m && m->state >= MBR_JOINED; @@ -111,6 +131,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, if (!grp) return NULL; tipc_nlist_init(&grp->dests, tipc_own_addr(net)); + INIT_LIST_HEAD(&grp->congested); grp->members = RB_ROOT; grp->net = net; grp->portid = portid; @@ -213,6 +234,8 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, if (!m) return NULL; INIT_LIST_HEAD(&m->list); + INIT_LIST_HEAD(&m->congested); + m->group = grp; m->node = node; m->port = port; grp->member_cnt++; @@ -233,6 +256,7 @@ static void tipc_group_delete_member(struct tipc_group *grp, rb_erase(&m->tree_node, &grp->members); grp->member_cnt--; list_del_init(&m->list); + list_del_init(&m->congested); /* If last member on a node, remove node from dest list */ if (!tipc_group_find_node(grp, m->node)) @@ -255,11 +279,59 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, *scope = grp->scope; } -void tipc_group_update_bc_members(struct tipc_group *grp) +void tipc_group_update_member(struct tipc_member *m, int len) +{ + struct tipc_group *grp = m->group; + struct tipc_member *_m, *tmp; + + if (!tipc_group_is_enabled(m)) + return; + + m->window -= len; + + if (m->window >= ADV_IDLE) + return; + + if (!list_empty(&m->congested)) + return; + + /* Sort member into congested members' list */ + list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { + if (m->window > _m->window) + continue; + list_add_tail(&m->congested, &_m->congested); + return; + } + list_add_tail(&m->congested, &grp->congested); +} + +void tipc_group_update_bc_members(struct tipc_group *grp, int len) { + struct tipc_member *m; + struct rb_node *n; + + for (n = rb_first(&grp->members); n; n = rb_next(n)) { + m = container_of(n, struct tipc_member, tree_node); + if (tipc_group_is_enabled(m)) + tipc_group_update_member(m, len); + } grp->bc_snd_nxt++; } +bool tipc_group_bc_cong(struct tipc_group *grp, int len) +{ + struct tipc_member *m; + + if (list_empty(&grp->congested)) + return false; + + m = list_first_entry(&grp->congested, struct tipc_member, congested); + if (m->window >= len) + return false; + + return true; +} + /* tipc_group_filter_msg() - determine if we should accept arriving message */ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, @@ -302,11 +374,36 @@ drop: kfree_skb(skb); } +void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, + u32 port, struct sk_buff_head *xmitq) +{ + struct tipc_member *m; + + m = tipc_group_find_member(grp, node, port); + if (!m) + return; + + m->advertised -= blks; + + switch (m->state) { + case MBR_JOINED: + if (m->advertised <= (ADV_ACTIVE - ADV_UNIT)) + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + break; + case MBR_DISCOVERED: + case MBR_JOINING: + case MBR_LEAVING: + default: + break; + } +} + static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, int mtyp, struct sk_buff_head *xmitq) { struct tipc_msg *hdr; struct sk_buff *skb; + int adv = 0; skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, m->node, tipc_own_addr(grp->net), @@ -314,14 +411,24 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, if (!skb) return; + if (m->state == MBR_JOINED) + adv = ADV_ACTIVE - m->advertised; + hdr = buf_msg(skb); - if (mtyp == GRP_JOIN_MSG) + + if (mtyp == GRP_JOIN_MSG) { msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); + msg_set_adv_win(hdr, adv); + m->advertised += adv; + } else if (mtyp == GRP_ADV_MSG) { + msg_set_adv_win(hdr, adv); + m->advertised += adv; + } __skb_queue_tail(xmitq, skb); } -void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, - struct sk_buff_head *inputq, +void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, + struct tipc_msg *hdr, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { u32 node = msg_orignode(hdr); @@ -341,14 +448,22 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, if (!m) return; m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); + m->window += msg_adv_win(hdr); /* Wait until PUBLISH event is received */ if (m->state == MBR_DISCOVERED) { m->state = MBR_JOINING; } else if (m->state == MBR_PUBLISHED) { m->state = MBR_JOINED; + *usr_wakeup = true; + m->usr_pending = false; + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); __skb_queue_tail(inputq, m->event_msg); } + if (m->window < ADV_IDLE) + tipc_group_update_member(m, 0); + else + list_del_init(&m->congested); return; case GRP_LEAVE_MSG: if (!m) @@ -361,14 +476,28 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, } /* Otherwise deliver already received WITHDRAW event */ __skb_queue_tail(inputq, m->event_msg); + *usr_wakeup = m->usr_pending; tipc_group_delete_member(grp, m); + list_del_init(&m->congested); + return; + case GRP_ADV_MSG: + if (!m) + return; + m->window += msg_adv_win(hdr); + *usr_wakeup = m->usr_pending; + m->usr_pending = false; + list_del_init(&m->congested); return; default: pr_warn("Received unknown GROUP_PROTO message\n"); } } +/* tipc_group_member_evt() - receive and handle a member up/down event + */ void tipc_group_member_evt(struct tipc_group *grp, + bool *usr_wakeup, + int *sk_rcvbuf, struct sk_buff *skb, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) @@ -416,16 +545,25 @@ void tipc_group_member_evt(struct tipc_group *grp, } else { __skb_queue_tail(inputq, skb); m->state = MBR_JOINED; + *usr_wakeup = true; + m->usr_pending = false; } m->instance = instance; TIPC_SKB_CB(skb)->orig_member = m->instance; tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); + if (m->window < ADV_IDLE) + tipc_group_update_member(m, 0); + else + list_del_init(&m->congested); } else if (event == TIPC_WITHDRAWN) { if (!m) goto drop; TIPC_SKB_CB(skb)->orig_member = m->instance; + *usr_wakeup = m->usr_pending; + m->usr_pending = false; + /* Hold back event if more messages might be expected */ if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) { m->event_msg = skb; @@ -434,7 +572,9 @@ void tipc_group_member_evt(struct tipc_group *grp, __skb_queue_tail(inputq, skb); tipc_group_delete_member(grp, m); } + list_del_init(&m->congested); } + *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); return; drop: kfree_skb(skb); diff --git a/net/tipc/group.h b/net/tipc/group.h index 5d3f10d28967..0e2740e1da90 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -52,15 +52,18 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); -void tipc_group_member_evt(struct tipc_group *grp, - struct sk_buff *skb, +void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup, + int *sk_rcvbuf, struct sk_buff *skb, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); -void tipc_group_proto_rcv(struct tipc_group *grp, +void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, struct tipc_msg *hdr, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); -void tipc_group_update_bc_members(struct tipc_group *grp); +void tipc_group_update_bc_members(struct tipc_group *grp, int len); +bool tipc_group_bc_cong(struct tipc_group *grp, int len); +void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, + u32 port, struct sk_buff_head *xmitq); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); int tipc_group_size(struct tipc_group *grp); #endif diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 1b527b154e46..237d007499f9 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -538,6 +538,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) */ #define GRP_JOIN_MSG 0 #define GRP_LEAVE_MSG 1 +#define GRP_ADV_MSG 2 /* * Word 1 @@ -790,12 +791,12 @@ static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 16, 0xffff, n); } -static inline u32 msg_adv_win(struct tipc_msg *m) +static inline u16 msg_adv_win(struct tipc_msg *m) { return msg_bits(m, 9, 0, 0xffff); } -static inline void msg_set_adv_win(struct tipc_msg *m, u32 n) +static inline void msg_set_adv_win(struct tipc_msg *m, u16 n) { msg_set_bits(m, 9, 0, 0xffff, n); } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 0a2eac309177..50145c95ac96 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -201,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk) return tsk->snt_unacked > tsk->snd_win; } +static u16 tsk_blocks(int len) +{ + return ((len / FLOWCTL_BLK_SZ) + 1); +} + /* tsk_blocks(): translate a buffer size in bytes to number of * advertisable blocks, taking into account the ratio truesize(len)/len * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ @@ -831,6 +836,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, struct tipc_group *grp = tsk->group; struct tipc_nlist *dsts = tipc_group_dests(grp); struct tipc_mc_method *method = &tsk->mc_method; + int blks = tsk_blocks(MCAST_H_SIZE + dlen); struct tipc_msg *hdr = &tsk->phdr; int mtu = tipc_bcast_get_mtu(net); struct sk_buff_head pkts; @@ -839,14 +845,15 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, if (!dsts->local && !dsts->remote) return -EHOSTUNREACH; - /* Block or return if any destination link is congested */ - rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); + /* Block or return if any destination link or member is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt && + !tipc_group_bc_cong(grp, blks)); if (unlikely(rc)) return rc; /* Complete message header */ msg_set_type(hdr, TIPC_GRP_BCAST_MSG); - msg_set_hdr_sz(hdr, MCAST_H_SIZE); + msg_set_hdr_sz(hdr, GROUP_H_SIZE); msg_set_destport(hdr, 0); msg_set_destnode(hdr, 0); msg_set_nameinst(hdr, 0); @@ -864,9 +871,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, if (unlikely(rc)) return rc; - /* Update broadcast sequence number */ - tipc_group_update_bc_members(tsk->group); - + /* Update broadcast sequence number and send windows */ + tipc_group_update_bc_members(tsk->group, blks); return dlen; } @@ -1024,7 +1030,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; - if (unlikely(grp)) + if (unlikely(grp && !dest)) return tipc_send_group_bcast(sock, m, dlen, timeout); if (unlikely(!dest)) { @@ -1420,6 +1426,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, bool connected = !tipc_sk_type_connectionless(sk); struct tipc_sock *tsk = tipc_sk(sk); int rc, err, hlen, dlen, copy; + struct sk_buff_head xmitq; struct tipc_msg *hdr; struct sk_buff *skb; bool grp_evt; @@ -1436,8 +1443,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, } timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); + /* Step rcv queue to first msg with data or error; wait if necessary */ do { - /* Look at first msg in receive queue; wait if necessary */ rc = tipc_wait_for_rcvmsg(sock, &timeout); if (unlikely(rc)) goto exit; @@ -1485,12 +1492,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, if (unlikely(flags & MSG_PEEK)) goto exit; + /* Send group flow control advertisement when applicable */ + if (tsk->group && msg_in_group(hdr) && !grp_evt) { + skb_queue_head_init(&xmitq); + tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen), + msg_orignode(hdr), msg_origport(hdr), + &xmitq); + tipc_node_distr_xmit(sock_net(sk), &xmitq); + } + tsk_advance_rx_queue(sk); if (likely(!connected)) goto exit; - /* Send connection flow control ack when applicable */ + /* Send connection flow control advertisement when applicable */ tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) tipc_sk_send_ack(tsk); @@ -1650,6 +1666,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); struct tipc_group *grp = tsk->group; + bool wakeup = false; switch (msg_user(hdr)) { case CONN_MANAGER: @@ -1658,19 +1675,23 @@ static void tipc_sk_proto_rcv(struct sock *sk, case SOCK_WAKEUP: tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0); tsk->cong_link_cnt--; - sk->sk_write_space(sk); + wakeup = true; break; case GROUP_PROTOCOL: - tipc_group_proto_rcv(grp, hdr, inputq, xmitq); + tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); break; case TOP_SRV: - tipc_group_member_evt(tsk->group, skb, inputq, xmitq); + tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf, + skb, inputq, xmitq); skb = NULL; break; default: break; } + if (wakeup) + sk->sk_write_space(sk); + kfree_skb(skb); } @@ -1785,6 +1806,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); + if (unlikely(msg_in_group(hdr))) + return sk->sk_rcvbuf; + if (unlikely(!msg_connected(hdr))) return sk->sk_rcvbuf << msg_importance(hdr); -- cgit v1.2.3 From 27bd9ec027f396457d1a147043c92ff22fc4c71e Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:27 +0200 Subject: tipc: introduce group unicast messaging We now make it possible to send connectionless unicast messages within a communication group. To send a message, the sender can use either a direct port address, aka port identity, or an indirect port name to be looked up. This type of messages are subject to the same start synchronization and flow control mechanism as group broadcast messages. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 45 +++++++++++++++++++++- net/tipc/group.h | 3 ++ net/tipc/msg.h | 3 +- net/tipc/socket.c | 112 +++++++++++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 150 insertions(+), 13 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index b8ed70abba01..18440be5b5fc 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -186,6 +186,17 @@ struct tipc_member *tipc_group_find_member(struct tipc_group *grp, return NULL; } +static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, + u32 node, u32 port) +{ + struct tipc_member *m; + + m = tipc_group_find_member(grp, node, port); + if (m && tipc_group_is_enabled(m)) + return m; + return NULL; +} + static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, u32 node) { @@ -318,9 +329,39 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len) grp->bc_snd_nxt++; } -bool tipc_group_bc_cong(struct tipc_group *grp, int len) +bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, + int len, struct tipc_member **mbr) { + struct sk_buff_head xmitq; struct tipc_member *m; + int adv, state; + + m = tipc_group_find_dest(grp, dnode, dport); + *mbr = m; + if (!m) + return false; + if (m->usr_pending) + return true; + if (m->window >= len) + return false; + m->usr_pending = true; + + /* If not fully advertised, do it now to prevent mutual blocking */ + adv = m->advertised; + state = m->state; + if (state < MBR_JOINED) + return true; + if (state == MBR_JOINED && adv == ADV_IDLE) + return true; + skb_queue_head_init(&xmitq); + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); + tipc_node_distr_xmit(grp->net, &xmitq); + return true; +} + +bool tipc_group_bc_cong(struct tipc_group *grp, int len) +{ + struct tipc_member *m = NULL; if (list_empty(&grp->congested)) return false; @@ -329,7 +370,7 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len) if (m->window >= len) return false; - return true; + return tipc_group_cong(grp, m->node, m->port, len, &m); } /* tipc_group_filter_msg() - determine if we should accept arriving message diff --git a/net/tipc/group.h b/net/tipc/group.h index 0e2740e1da90..8f77290bb415 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -61,9 +61,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); void tipc_group_update_bc_members(struct tipc_group *grp, int len); +bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, + int len, struct tipc_member **m); bool tipc_group_bc_cong(struct tipc_group *grp, int len); void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, u32 port, struct sk_buff_head *xmitq); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); +void tipc_group_update_member(struct tipc_member *m, int len); int tipc_group_size(struct tipc_group *grp); #endif diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 237d007499f9..f5033f4a7951 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -67,6 +67,7 @@ struct plist; #define TIPC_DIRECT_MSG 3 #define TIPC_GRP_MEMBER_EVT 4 #define TIPC_GRP_BCAST_MSG 5 +#define TIPC_GRP_UCAST_MSG 6 /* * Internal message users @@ -261,7 +262,7 @@ static inline int msg_in_group(struct tipc_msg *m) { int mtyp = msg_type(m); - return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT); + return mtyp >= TIPC_GRP_MEMBER_EVT && mtyp <= TIPC_GRP_UCAST_MSG; } static inline bool msg_is_grp_evt(struct tipc_msg *m) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 50145c95ac96..e71c8d23acb9 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -817,6 +817,93 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, return rc ? rc : dlen; } +/** + * tipc_send_group_msg - send a message to a member in the group + * @net: network namespace + * @m: message to send + * @mb: group member + * @dnode: destination node + * @dport: destination port + * @dlen: total length of message data + */ +static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, + struct msghdr *m, struct tipc_member *mb, + u32 dnode, u32 dport, int dlen) +{ + int blks = tsk_blocks(GROUP_H_SIZE + dlen); + struct tipc_msg *hdr = &tsk->phdr; + struct sk_buff_head pkts; + int mtu, rc; + + /* Complete message header */ + msg_set_type(hdr, TIPC_GRP_UCAST_MSG); + msg_set_hdr_sz(hdr, GROUP_H_SIZE); + msg_set_destport(hdr, dport); + msg_set_destnode(hdr, dnode); + + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + mtu = tipc_node_get_mtu(net, dnode, tsk->portid); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; + + /* Send message */ + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + tipc_dest_push(&tsk->cong_links, dnode, 0); + tsk->cong_link_cnt++; + } + + /* Update send window and sequence number */ + tipc_group_update_member(mb, blks); + + return dlen; +} + +/** + * tipc_send_group_unicast - send message to a member in the group + * @sock: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + int blks = tsk_blocks(GROUP_H_SIZE + dlen); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct net *net = sock_net(sk); + struct tipc_member *mb = NULL; + u32 node, port; + int rc; + + node = dest->addr.id.node; + port = dest->addr.id.ref; + if (!port && !node) + return -EHOSTUNREACH; + + /* Block or return if destination link or member is congested */ + rc = tipc_wait_for_cond(sock, &timeout, + !tipc_dest_find(&tsk->cong_links, node, 0) && + !tipc_group_cong(grp, node, port, blks, &mb)); + if (unlikely(rc)) + return rc; + + if (unlikely(!mb)) + return -EHOSTUNREACH; + + rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen); + + return rc ? rc : dlen; +} + /** * tipc_send_group_bcast - send message to all members in communication group * @sk: socket structure @@ -1030,8 +1117,20 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; - if (unlikely(grp && !dest)) - return tipc_send_group_bcast(sock, m, dlen, timeout); + if (likely(dest)) { + if (unlikely(m->msg_namelen < sizeof(*dest))) + return -EINVAL; + if (unlikely(dest->family != AF_TIPC)) + return -EINVAL; + } + + if (grp) { + if (!dest) + return tipc_send_group_bcast(sock, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_ID) + return tipc_send_group_unicast(sock, m, dlen, timeout); + return -EINVAL; + } if (unlikely(!dest)) { dest = &tsk->peer; @@ -1039,12 +1138,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) return -EDESTADDRREQ; } - if (unlikely(m->msg_namelen < sizeof(*dest))) - return -EINVAL; - - if (unlikely(dest->family != AF_TIPC)) - return -EINVAL; - if (unlikely(syn)) { if (sk->sk_state == TIPC_LISTEN) return -EPIPE; @@ -1077,7 +1170,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) msg_set_destport(hdr, dport); if (unlikely(!dport && !dnode)) return -EHOSTUNREACH; - } else if (dest->addrtype == TIPC_ADDR_ID) { dnode = dest->addr.id.node; msg_set_type(hdr, TIPC_DIRECT_MSG); @@ -1846,7 +1938,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(!msg_isdata(hdr))) tipc_sk_proto_rcv(sk, &inputq, xmitq); - else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG)) + else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG)) return kfree_skb(skb); if (unlikely(grp)) -- cgit v1.2.3 From ee106d7f942dabce1352e01c6fe9ca4a720c2331 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:28 +0200 Subject: tipc: introduce group anycast messaging In this commit, we make it possible to send connectionless unicast messages to any member corresponding to the given member identity, when there is more than one such member. The sender must use a TIPC_ADDR_NAME address to achieve this effect. We also perform load balancing between the destinations, i.e., we primarily select one which has advertised sufficient send window to not cause a block/EAGAIN delay, if any. This mechanism is overlayed on the always present round-robin selection. Anycast messages are subject to the same start synchronization and flow control mechanism as group broadcast messages. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 7 +++++ net/tipc/group.h | 3 ++ net/tipc/name_table.c | 41 +++++++++++++++++++++++++ net/tipc/name_table.h | 3 ++ net/tipc/socket.c | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 138 insertions(+) diff --git a/net/tipc/group.c b/net/tipc/group.c index 18440be5b5fc..16aaaa97a005 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -116,6 +116,13 @@ static bool tipc_group_is_receiver(struct tipc_member *m) return m && m->state >= MBR_JOINED; } +u32 tipc_group_exclude(struct tipc_group *grp) +{ + if (!grp->loopback) + return grp->portid; + return 0; +} + int tipc_group_size(struct tipc_group *grp) { return grp->member_cnt; diff --git a/net/tipc/group.h b/net/tipc/group.h index 8f77290bb415..e432066a211e 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -49,6 +49,7 @@ void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port); struct tipc_nlist *tipc_group_dests(struct tipc_group *grp); void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, int *scope); +u32 tipc_group_exclude(struct tipc_group *grp); void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); @@ -68,5 +69,7 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, u32 port, struct sk_buff_head *xmitq); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); void tipc_group_update_member(struct tipc_member *m, int len); +struct tipc_member *tipc_group_find_sender(struct tipc_group *grp, + u32 node, u32 port); int tipc_group_size(struct tipc_group *grp); #endif diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 114d72bab827..2856e19e036e 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -597,6 +597,47 @@ not_found: return ref; } +bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, + struct list_head *dsts, int *dstcnt, u32 exclude, + bool all) +{ + u32 self = tipc_own_addr(net); + struct publication *publ; + struct name_info *info; + struct name_seq *seq; + struct sub_seq *sseq; + + if (!tipc_in_scope(domain, self)) + return false; + + *dstcnt = 0; + rcu_read_lock(); + seq = nametbl_find_seq(net, type); + if (unlikely(!seq)) + goto exit; + spin_lock_bh(&seq->lock); + sseq = nameseq_find_subseq(seq, instance); + if (likely(sseq)) { + info = sseq->info; + list_for_each_entry(publ, &info->zone_list, zone_list) { + if (!tipc_in_scope(domain, publ->node)) + continue; + if (publ->ref == exclude && publ->node == self) + continue; + tipc_dest_push(dsts, publ->node, publ->ref); + (*dstcnt)++; + if (all) + continue; + list_move_tail(&publ->zone_list, &info->zone_list); + break; + } + } + spin_unlock_bh(&seq->lock); +exit: + rcu_read_unlock(); + return !list_empty(dsts); +} + int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports) { diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 97646b17a4a2..71926e429446 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -107,6 +107,9 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, u32 upper, u32 domain, struct tipc_nlist *nodes); +bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, + struct list_head *dsts, int *dstcnt, u32 exclude, + bool all); struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, u32 upper, u32 scope, u32 port_ref, u32 key); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index e71c8d23acb9..66165e12d2f4 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -904,6 +904,88 @@ static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m, return rc ? rc : dlen; } +/** + * tipc_send_group_anycast - send message to any member with given identity + * @sock: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + struct sock *sk = sock->sk; + struct tipc_sock *tsk = tipc_sk(sk); + struct list_head *cong_links = &tsk->cong_links; + int blks = tsk_blocks(GROUP_H_SIZE + dlen); + struct tipc_group *grp = tsk->group; + struct tipc_member *first = NULL; + struct tipc_member *mbr = NULL; + struct net *net = sock_net(sk); + u32 node, port, exclude; + u32 type, inst, domain; + struct list_head dsts; + int lookups = 0; + int dstcnt, rc; + bool cong; + + INIT_LIST_HEAD(&dsts); + + type = dest->addr.name.name.type; + inst = dest->addr.name.name.instance; + domain = addr_domain(net, dest->scope); + exclude = tipc_group_exclude(grp); + + while (++lookups < 4) { + first = NULL; + + /* Look for a non-congested destination member, if any */ + while (1) { + if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts, + &dstcnt, exclude, false)) + return -EHOSTUNREACH; + tipc_dest_pop(&dsts, &node, &port); + cong = tipc_group_cong(grp, node, port, blks, &mbr); + if (!cong) + break; + if (mbr == first) + break; + if (!first) + first = mbr; + } + + /* Start over if destination was not in member list */ + if (unlikely(!mbr)) + continue; + + if (likely(!cong && !tipc_dest_find(cong_links, node, 0))) + break; + + /* Block or return if destination link or member is congested */ + rc = tipc_wait_for_cond(sock, &timeout, + !tipc_dest_find(cong_links, node, 0) && + !tipc_group_cong(grp, node, port, + blks, &mbr)); + if (unlikely(rc)) + return rc; + + /* Send, unless destination disappeared while waiting */ + if (likely(mbr)) + break; + } + + if (unlikely(lookups >= 4)) + return -EHOSTUNREACH; + + rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen); + + return rc ? rc : dlen; +} + /** * tipc_send_group_bcast - send message to all members in communication group * @sk: socket structure @@ -1127,6 +1209,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (grp) { if (!dest) return tipc_send_group_bcast(sock, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_NAME) + return tipc_send_group_anycast(sock, m, dlen, timeout); if (dest->addrtype == TIPC_ADDR_ID) return tipc_send_group_unicast(sock, m, dlen, timeout); return -EINVAL; -- cgit v1.2.3 From 5b8dddb63769587badc50725ec9857caaeba4de0 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:29 +0200 Subject: tipc: introduce group multicast messaging The previously introduced message transport to all group members is based on the tipc multicast service, but is logically a broadcast service within the group, and that is what we call it. We now add functionality for sending messages to all group members having a certain identity. Correspondingly, we call this feature 'group multicast'. The service is using unicast when only one destination is found, otherwise it will use the bearer broadcast service to transfer the messages. In the latter case, the receiving members filter arriving messages by looking at the intended destination instance. If there is no match, the message will be dropped, while still being considered received and read as seen by the flow control mechanism. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 14 +++++++++++++- net/tipc/msg.h | 11 +++++++++-- net/tipc/socket.c | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 74 insertions(+), 7 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index 16aaaa97a005..ffac2f33fce2 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -413,10 +413,22 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!tipc_group_is_receiver(m)) goto drop; + m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; + + /* Drop multicast here if not for this member */ + if (mtyp == TIPC_GRP_MCAST_MSG) { + if (msg_nameinst(hdr) != grp->instance) { + m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; + tipc_group_update_rcv_win(grp, msg_blocks(hdr), + node, port, xmitq); + kfree_skb(skb); + return; + } + } + TIPC_SKB_CB(skb)->orig_member = m->instance; __skb_queue_tail(inputq, skb); - m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; return; drop: kfree_skb(skb); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index f5033f4a7951..d6f98215267e 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -67,7 +67,8 @@ struct plist; #define TIPC_DIRECT_MSG 3 #define TIPC_GRP_MEMBER_EVT 4 #define TIPC_GRP_BCAST_MSG 5 -#define TIPC_GRP_UCAST_MSG 6 +#define TIPC_GRP_MCAST_MSG 6 +#define TIPC_GRP_UCAST_MSG 7 /* * Internal message users @@ -195,6 +196,11 @@ static inline u32 msg_size(struct tipc_msg *m) return msg_bits(m, 0, 0, 0x1ffff); } +static inline u32 msg_blocks(struct tipc_msg *m) +{ + return (msg_size(m) / 1024) + 1; +} + static inline u32 msg_data_sz(struct tipc_msg *m) { return msg_size(m) - msg_hdr_sz(m); @@ -279,7 +285,8 @@ static inline u32 msg_mcast(struct tipc_msg *m) { int mtyp = msg_type(m); - return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG)); + return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG) || + (mtyp == TIPC_GRP_MCAST_MSG)); } static inline u32 msg_connected(struct tipc_msg *m) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 66165e12d2f4..8fdd969e12bd 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -999,6 +999,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m, static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, int dlen, long timeout) { + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); struct sock *sk = sock->sk; struct net *net = sock_net(sk); struct tipc_sock *tsk = tipc_sk(sk); @@ -1021,11 +1022,16 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, return rc; /* Complete message header */ - msg_set_type(hdr, TIPC_GRP_BCAST_MSG); + if (dest) { + msg_set_type(hdr, TIPC_GRP_MCAST_MSG); + msg_set_nameinst(hdr, dest->addr.name.name.instance); + } else { + msg_set_type(hdr, TIPC_GRP_BCAST_MSG); + msg_set_nameinst(hdr, 0); + } msg_set_hdr_sz(hdr, GROUP_H_SIZE); msg_set_destport(hdr, 0); msg_set_destnode(hdr, 0); - msg_set_nameinst(hdr, 0); msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); /* Build message as chain of buffers */ @@ -1045,6 +1051,48 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, return dlen; } +/** + * tipc_send_group_mcast - send message to all members with given identity + * @sock: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + struct tipc_name_seq *seq = &dest->addr.nameseq; + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct net *net = sock_net(sk); + u32 domain, exclude, dstcnt; + struct list_head dsts; + + INIT_LIST_HEAD(&dsts); + + if (seq->lower != seq->upper) + return -ENOTSUPP; + + domain = addr_domain(net, dest->scope); + exclude = tipc_group_exclude(grp); + if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain, + &dsts, &dstcnt, exclude, true)) + return -EHOSTUNREACH; + + if (dstcnt == 1) { + tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref); + return tipc_send_group_unicast(sock, m, dlen, timeout); + } + + tipc_dest_list_purge(&dsts); + return tipc_send_group_bcast(sock, m, dlen, timeout); +} + /** * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets * @arrvq: queue with arriving messages, to be cloned after destination lookup @@ -1213,6 +1261,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) return tipc_send_group_anycast(sock, m, dlen, timeout); if (dest->addrtype == TIPC_ADDR_ID) return tipc_send_group_unicast(sock, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_MCAST) + return tipc_send_group_mcast(sock, m, dlen, timeout); return -EINVAL; } @@ -2022,8 +2072,6 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(!msg_isdata(hdr))) tipc_sk_proto_rcv(sk, &inputq, xmitq); - else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG)) - return kfree_skb(skb); if (unlikely(grp)) tipc_group_filter_msg(grp, &inputq, xmitq); -- cgit v1.2.3 From b87a5ea31c935a7f7e11ca85df2ec7917921e96d Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:30 +0200 Subject: tipc: guarantee group unicast doesn't bypass group broadcast Group unicast messages don't follow the same path as broadcast messages, and there is a high risk that unicasts sent from a socket might bypass previously sent broadcasts from the same socket. We fix this by letting all unicast messages carry the sequence number of the next sent broadcast from the same node, but without updating this number at the receiver. This way, a receiver can check and if necessary re-order such messages before they are added to the socket receive buffer. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 87 +++++++++++++++++++++++++++++++++++++++++++++---------- net/tipc/socket.c | 2 ++ 2 files changed, 74 insertions(+), 15 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index ffac2f33fce2..985e0ce32e8e 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -62,6 +62,7 @@ struct tipc_member { struct list_head list; struct list_head congested; struct sk_buff *event_msg; + struct sk_buff_head deferredq; struct tipc_group *group; u32 node; u32 port; @@ -253,6 +254,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, return NULL; INIT_LIST_HEAD(&m->list); INIT_LIST_HEAD(&m->congested); + __skb_queue_head_init(&m->deferredq); m->group = grp; m->node = node; m->port = port; @@ -380,29 +382,54 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len) return tipc_group_cong(grp, m->node, m->port, len, &m); } +/* tipc_group_sort_msg() - sort msg into queue by bcast sequence number + */ +static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) +{ + struct tipc_msg *_hdr, *hdr = buf_msg(skb); + u16 bc_seqno = msg_grp_bc_seqno(hdr); + struct sk_buff *_skb, *tmp; + int mtyp = msg_type(hdr); + + /* Bcast may be bypassed by unicast, - sort it in */ + if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { + skb_queue_walk_safe(defq, _skb, tmp) { + _hdr = buf_msg(_skb); + if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) + continue; + __skb_queue_before(defq, _skb, skb); + return; + } + /* Bcast was not bypassed, - add to tail */ + } + /* Unicasts are never bypassed, - always add to tail */ + __skb_queue_tail(defq, skb); +} + /* tipc_group_filter_msg() - determine if we should accept arriving message */ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { struct sk_buff *skb = __skb_dequeue(inputq); + struct sk_buff_head *defq; struct tipc_member *m; struct tipc_msg *hdr; + bool deliver, update; u32 node, port; - int mtyp; + int mtyp, blks; if (!skb) return; hdr = buf_msg(skb); - mtyp = msg_type(hdr); node = msg_orignode(hdr); port = msg_origport(hdr); if (!msg_in_group(hdr)) goto drop; - if (mtyp == TIPC_GRP_MEMBER_EVT) { + if (msg_is_grp_evt(hdr)) { if (!grp->events) goto drop; __skb_queue_tail(inputq, skb); @@ -413,22 +440,52 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!tipc_group_is_receiver(m)) goto drop; - m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; + if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) + goto drop; - /* Drop multicast here if not for this member */ - if (mtyp == TIPC_GRP_MCAST_MSG) { - if (msg_nameinst(hdr) != grp->instance) { - m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; - tipc_group_update_rcv_win(grp, msg_blocks(hdr), - node, port, xmitq); - kfree_skb(skb); - return; + TIPC_SKB_CB(skb)->orig_member = m->instance; + defq = &m->deferredq; + tipc_group_sort_msg(skb, defq); + + while ((skb = skb_peek(defq))) { + hdr = buf_msg(skb); + mtyp = msg_type(hdr); + deliver = true; + update = false; + + if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) + break; + + /* Decide what to do with message */ + switch (mtyp) { + case TIPC_GRP_MCAST_MSG: + if (msg_nameinst(hdr) != grp->instance) { + update = true; + deliver = false; + } + /* Fall thru */ + case TIPC_GRP_BCAST_MSG: + m->bc_rcv_nxt++; + break; + case TIPC_GRP_UCAST_MSG: + break; + default: + break; } - } - TIPC_SKB_CB(skb)->orig_member = m->instance; - __skb_queue_tail(inputq, skb); + /* Execute decisions */ + __skb_dequeue(defq); + if (deliver) + __skb_queue_tail(inputq, skb); + else + kfree_skb(skb); + + if (!update) + continue; + blks = msg_blocks(hdr); + tipc_group_update_rcv_win(grp, blks, node, port, xmitq); + } return; drop: kfree_skb(skb); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 8fdd969e12bd..3276b7a0d445 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -830,6 +830,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, struct msghdr *m, struct tipc_member *mb, u32 dnode, u32 dport, int dlen) { + u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group); int blks = tsk_blocks(GROUP_H_SIZE + dlen); struct tipc_msg *hdr = &tsk->phdr; struct sk_buff_head pkts; @@ -840,6 +841,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, msg_set_hdr_sz(hdr, GROUP_H_SIZE); msg_set_destport(hdr, dport); msg_set_destnode(hdr, dnode); + msg_set_grp_bc_seqno(hdr, bc_snd_nxt); /* Build message as chain of buffers */ skb_queue_head_init(&pkts); -- cgit v1.2.3 From 2f487712b89376fce267223bbb0db93d393d4b09 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:31 +0200 Subject: tipc: guarantee that group broadcast doesn't bypass group unicast We need a mechanism guaranteeing that group unicasts sent out from a socket are not bypassed by later sent broadcasts from the same socket. We do this as follows: - Each time a unicast is sent, we set a the broadcast method for the socket to "replicast" and "mandatory". This forces the first subsequent broadcast message to follow the same network and data path as the preceding unicast to a destination, hence preventing it from overtaking the latter. - In order to make the 'same data path' statement above true, we let group unicasts pass through the multicast link input queue, instead of as previously through the unicast link input queue. - In the first broadcast following a unicast, we set a new header flag, requiring all recipients to immediately acknowledge its reception. - During the period before all the expected acknowledges are received, the socket refuses to accept any more broadcast attempts, i.e., by blocking or returning EAGAIN. This period should typically not be longer than a few microseconds. - When all acknowledges have been received, the sending socket will open up for subsequent broadcasts, this time giving the link layer freedom to itself select the best transmission method. - The forced and/or abrupt transmission method changes described above may lead to broadcasts arriving out of order to the recipients. We remedy this by introducing code that checks and if necessary re-orders such messages at the receiving end. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 47 +++++++++++++++++++++++++++++++++++++++++------ net/tipc/group.h | 4 +--- net/tipc/link.c | 5 ++--- net/tipc/msg.h | 21 +++++++++++++++++++++ net/tipc/socket.c | 34 +++++++++++++++++++++++++++++----- 5 files changed, 94 insertions(+), 17 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index 985e0ce32e8e..eab862e047dc 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -71,6 +71,7 @@ struct tipc_member { u16 advertised; u16 window; u16 bc_rcv_nxt; + u16 bc_acked; bool usr_pending; }; @@ -87,6 +88,7 @@ struct tipc_group { u32 portid; u16 member_cnt; u16 bc_snd_nxt; + u16 bc_ackers; bool loopback; bool events; }; @@ -258,6 +260,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, m->group = grp; m->node = node; m->port = port; + m->bc_acked = grp->bc_snd_nxt - 1; grp->member_cnt++; tipc_group_add_to_tree(grp, m); tipc_nlist_add(&grp->dests, m->node); @@ -275,6 +278,11 @@ static void tipc_group_delete_member(struct tipc_group *grp, { rb_erase(&m->tree_node, &grp->members); grp->member_cnt--; + + /* Check if we were waiting for replicast ack from this member */ + if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) + grp->bc_ackers--; + list_del_init(&m->list); list_del_init(&m->congested); @@ -325,16 +333,23 @@ void tipc_group_update_member(struct tipc_member *m, int len) list_add_tail(&m->congested, &grp->congested); } -void tipc_group_update_bc_members(struct tipc_group *grp, int len) +void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) { + u16 prev = grp->bc_snd_nxt - 1; struct tipc_member *m; struct rb_node *n; for (n = rb_first(&grp->members); n; n = rb_next(n)) { m = container_of(n, struct tipc_member, tree_node); - if (tipc_group_is_enabled(m)) + if (tipc_group_is_enabled(m)) { tipc_group_update_member(m, len); + m->bc_acked = prev; + } } + + /* Mark number of acknowledges to expect, if any */ + if (ack) + grp->bc_ackers = grp->member_cnt; grp->bc_snd_nxt++; } @@ -372,6 +387,10 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len) { struct tipc_member *m = NULL; + /* If prev bcast was replicast, reject until all receivers have acked */ + if (grp->bc_ackers) + return true; + if (list_empty(&grp->congested)) return false; @@ -391,7 +410,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) struct sk_buff *_skb, *tmp; int mtyp = msg_type(hdr); - /* Bcast may be bypassed by unicast, - sort it in */ + /* Bcast may be bypassed by unicast or other bcast, - sort it in */ if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { skb_queue_walk_safe(defq, _skb, tmp) { _hdr = buf_msg(_skb); @@ -412,10 +431,10 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { struct sk_buff *skb = __skb_dequeue(inputq); + bool ack, deliver, update; struct sk_buff_head *defq; struct tipc_member *m; struct tipc_msg *hdr; - bool deliver, update; u32 node, port; int mtyp, blks; @@ -451,6 +470,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, hdr = buf_msg(skb); mtyp = msg_type(hdr); deliver = true; + ack = false; update = false; if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) @@ -466,6 +486,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, /* Fall thru */ case TIPC_GRP_BCAST_MSG: m->bc_rcv_nxt++; + ack = msg_grp_bc_ack_req(hdr); break; case TIPC_GRP_UCAST_MSG: break; @@ -480,6 +501,9 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, else kfree_skb(skb); + if (ack) + tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); + if (!update) continue; @@ -540,6 +564,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, } else if (mtyp == GRP_ADV_MSG) { msg_set_adv_win(hdr, adv); m->advertised += adv; + } else if (mtyp == GRP_ACK_MSG) { + msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); } __skb_queue_tail(xmitq, skb); } @@ -593,7 +619,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, } /* Otherwise deliver already received WITHDRAW event */ __skb_queue_tail(inputq, m->event_msg); - *usr_wakeup = m->usr_pending; + *usr_wakeup = true; tipc_group_delete_member(grp, m); list_del_init(&m->congested); return; @@ -605,6 +631,15 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, m->usr_pending = false; list_del_init(&m->congested); return; + case GRP_ACK_MSG: + if (!m) + return; + m->bc_acked = msg_grp_bc_acked(hdr); + if (--grp->bc_ackers) + break; + *usr_wakeup = true; + m->usr_pending = false; + return; default: pr_warn("Received unknown GROUP_PROTO message\n"); } @@ -678,7 +713,7 @@ void tipc_group_member_evt(struct tipc_group *grp, TIPC_SKB_CB(skb)->orig_member = m->instance; - *usr_wakeup = m->usr_pending; + *usr_wakeup = true; m->usr_pending = false; /* Hold back event if more messages might be expected */ diff --git a/net/tipc/group.h b/net/tipc/group.h index e432066a211e..d525e1cd7de5 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -61,7 +61,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, struct tipc_msg *hdr, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); -void tipc_group_update_bc_members(struct tipc_group *grp, int len); +void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack); bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, int len, struct tipc_member **m); bool tipc_group_bc_cong(struct tipc_group *grp, int len); @@ -69,7 +69,5 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, u32 port, struct sk_buff_head *xmitq); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); void tipc_group_update_member(struct tipc_member *m, int len); -struct tipc_member *tipc_group_find_sender(struct tipc_group *grp, - u32 node, u32 port); int tipc_group_size(struct tipc_group *grp); #endif diff --git a/net/tipc/link.c b/net/tipc/link.c index bd25bff63925..70a21499804d 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1046,13 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: - if (unlikely(msg_mcast(hdr))) { + if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) { skb_queue_tail(l->bc_rcvlink->inputq, skb); return true; } - case CONN_MANAGER: case GROUP_PROTOCOL: - skb_queue_tail(inputq, skb); + case CONN_MANAGER: return true; case NAME_DISTRIBUTOR: l->bc_rcvlink->state = LINK_ESTABLISHED; diff --git a/net/tipc/msg.h b/net/tipc/msg.h index d6f98215267e..52c6a2e01995 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -547,6 +547,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) #define GRP_JOIN_MSG 0 #define GRP_LEAVE_MSG 1 #define GRP_ADV_MSG 2 +#define GRP_ACK_MSG 3 /* * Word 1 @@ -839,6 +840,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) msg_set_bits(m, 9, 16, 0xffff, n); } +static inline u16 msg_grp_bc_acked(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + /* Word 10 */ static inline u16 msg_grp_evt(struct tipc_msg *m) @@ -851,6 +862,16 @@ static inline void msg_set_grp_evt(struct tipc_msg *m, int n) msg_set_bits(m, 10, 0, 0x3, n); } +static inline u16 msg_grp_bc_ack_req(struct tipc_msg *m) +{ + return msg_bits(m, 10, 0, 0x1); +} + +static inline void msg_set_grp_bc_ack_req(struct tipc_msg *m, bool n) +{ + msg_set_bits(m, 10, 0, 0x1, n); +} + static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) { return msg_bits(m, 10, 16, 0xffff); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 3276b7a0d445..b1f1c3c2b1e2 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -831,6 +831,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, u32 dnode, u32 dport, int dlen) { u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group); + struct tipc_mc_method *method = &tsk->mc_method; int blks = tsk_blocks(GROUP_H_SIZE + dlen); struct tipc_msg *hdr = &tsk->phdr; struct sk_buff_head pkts; @@ -857,9 +858,12 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, tsk->cong_link_cnt++; } - /* Update send window and sequence number */ + /* Update send window */ tipc_group_update_member(mb, blks); + /* A broadcast sent within next EXPIRE period must follow same path */ + method->rcast = true; + method->mandatory = true; return dlen; } @@ -1008,6 +1012,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, struct tipc_group *grp = tsk->group; struct tipc_nlist *dsts = tipc_group_dests(grp); struct tipc_mc_method *method = &tsk->mc_method; + bool ack = method->mandatory && method->rcast; int blks = tsk_blocks(MCAST_H_SIZE + dlen); struct tipc_msg *hdr = &tsk->phdr; int mtu = tipc_bcast_get_mtu(net); @@ -1036,6 +1041,9 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, msg_set_destnode(hdr, 0); msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); + /* Avoid getting stuck with repeated forced replicasts */ + msg_set_grp_bc_ack_req(hdr, ack); + /* Build message as chain of buffers */ skb_queue_head_init(&pkts); rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); @@ -1043,13 +1051,17 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, return rc; /* Send message */ - rc = tipc_mcast_xmit(net, &pkts, method, dsts, - &tsk->cong_link_cnt); + rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt); if (unlikely(rc)) return rc; /* Update broadcast sequence number and send windows */ - tipc_group_update_bc_members(tsk->group, blks); + tipc_group_update_bc_members(tsk->group, blks, ack); + + /* Broadcast link is now free to choose method for next broadcast */ + method->mandatory = false; + method->expires = jiffies; + return dlen; } @@ -1113,7 +1125,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, u32 portid, oport, onode; struct list_head dports; struct tipc_msg *msg; - int hsz; + int user, mtyp, hsz; __skb_queue_head_init(&tmpq); INIT_LIST_HEAD(&dports); @@ -1121,6 +1133,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, skb = tipc_skb_peek(arrvq, &inputq->lock); for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { msg = buf_msg(skb); + user = msg_user(msg); + mtyp = msg_type(msg); + if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) { + spin_lock_bh(&inputq->lock); + if (skb_peek(arrvq) == skb) { + __skb_dequeue(arrvq); + __skb_queue_tail(inputq, skb); + } + refcount_dec(&skb->users); + spin_unlock_bh(&inputq->lock); + continue; + } hsz = skb_headroom(skb) + msg_hdr_sz(msg); oport = msg_origport(msg); onode = msg_orignode(msg); -- cgit v1.2.3 From 399574d41963285e72ba28dd46783c96316a81d1 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:32 +0200 Subject: tipc: guarantee delivery of UP event before first broadcast The following scenario is possible: - A user joins a group, and immediately sends out a broadcast message to its members. - The broadcast message, following a different data path than the initial JOIN message sent out during the joining procedure, arrives to a receiver before the latter.. - The receiver drops the message, since it is not ready to accept any messages until the JOIN has arrived. We avoid this by treating group protocol JOIN messages like unicast messages. - We let them pass through the recipient's multicast input queue, just like ordinary unicasts. - We force the first following broadacst to be sent as replicated unicast and being acknowledged by the recipient before accepting any more broadcast transmissions. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 7 +++++-- net/tipc/socket.c | 4 ++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 70a21499804d..723dd6998684 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1039,6 +1039,7 @@ int tipc_link_retrans(struct tipc_link *l, struct tipc_link *nacker, static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { + struct sk_buff_head *mc_inputq = l->bc_rcvlink->inputq; struct tipc_msg *hdr = buf_msg(skb); switch (msg_user(hdr)) { @@ -1047,12 +1048,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) { - skb_queue_tail(l->bc_rcvlink->inputq, skb); + skb_queue_tail(mc_inputq, skb); return true; } - case GROUP_PROTOCOL: case CONN_MANAGER: return true; + case GROUP_PROTOCOL: + skb_queue_tail(mc_inputq, skb); + return true; case NAME_DISTRIBUTOR: l->bc_rcvlink->state = LINK_ESTABLISHED; skb_queue_tail(l->namedq, skb); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b1f1c3c2b1e2..2bbab4fe2f53 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2762,6 +2762,10 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) rc = tipc_sk_publish(tsk, mreq->scope, &seq); if (rc) tipc_group_delete(net, grp); + + /* Eliminate any risk that a broadcast overtakes the sent JOIN */ + tsk->mc_method.rcast = true; + tsk->mc_method.mandatory = true; return rc; } -- cgit v1.2.3 From a3bada70660fb020430135ec8a774ae1ea6bc9a9 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:33 +0200 Subject: tipc: guarantee delivery of last broadcast before DOWN event The following scenario is possible: - A user sends a broadcast message, and thereafter immediately leaves the group. - The LEAVE message, following a different path than the broadcast, arrives ahead of the broadcast, and the sending member is removed from the receiver's list. - The broadcast message arrives, but is dropped because the sender now is unknown to the receipient. We fix this by sequence numbering membership events, just like ordinary unicast messages. Currently, when a JOIN is sent to a peer, it contains a synchronization point, - the sequence number of the next sent broadcast, in order to give the receiver a start synchronization point. We now let even LEAVE messages contain such an "end synchronization" point, so that the recipient can delay the removal of the sending member until it knows that all messages have been received. The received synchronization points are added as sequence numbers to the generated membership events, making it possible to handle them almost the same way as regular unicasts in the receiving filter function. In particular, a DOWN event with a too high sequence number will be kept in the reordering queue until the missing broadcast(s) arrive and have been delivered. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 45 ++++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index eab862e047dc..8f0eb5d22e8f 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -71,6 +71,7 @@ struct tipc_member { u16 advertised; u16 window; u16 bc_rcv_nxt; + u16 bc_syncpt; u16 bc_acked; bool usr_pending; }; @@ -410,7 +411,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) struct sk_buff *_skb, *tmp; int mtyp = msg_type(hdr); - /* Bcast may be bypassed by unicast or other bcast, - sort it in */ + /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { skb_queue_walk_safe(defq, _skb, tmp) { _hdr = buf_msg(_skb); @@ -431,7 +432,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { struct sk_buff *skb = __skb_dequeue(inputq); - bool ack, deliver, update; + bool ack, deliver, update, leave = false; struct sk_buff_head *defq; struct tipc_member *m; struct tipc_msg *hdr; @@ -448,13 +449,6 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!msg_in_group(hdr)) goto drop; - if (msg_is_grp_evt(hdr)) { - if (!grp->events) - goto drop; - __skb_queue_tail(inputq, skb); - return; - } - m = tipc_group_find_member(grp, node, port); if (!tipc_group_is_receiver(m)) goto drop; @@ -490,6 +484,12 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, break; case TIPC_GRP_UCAST_MSG: break; + case TIPC_GRP_MEMBER_EVT: + if (m->state == MBR_LEAVING) + leave = true; + if (!grp->events) + deliver = false; + break; default: break; } @@ -504,6 +504,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (ack) tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); + if (leave) { + tipc_group_delete_member(grp, m); + __skb_queue_purge(defq); + break; + } if (!update) continue; @@ -561,6 +566,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); msg_set_adv_win(hdr, adv); m->advertised += adv; + } else if (mtyp == GRP_LEAVE_MSG) { + msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); } else if (mtyp == GRP_ADV_MSG) { msg_set_adv_win(hdr, adv); m->advertised += adv; @@ -577,6 +584,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, u32 node = msg_orignode(hdr); u32 port = msg_origport(hdr); struct tipc_member *m; + struct tipc_msg *ehdr; if (!grp) return; @@ -590,7 +598,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, MBR_QUARANTINED); if (!m) return; - m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); + m->bc_syncpt = msg_grp_bc_syncpt(hdr); + m->bc_rcv_nxt = m->bc_syncpt; m->window += msg_adv_win(hdr); /* Wait until PUBLISH event is received */ @@ -601,6 +610,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, *usr_wakeup = true; m->usr_pending = false; tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + ehdr = buf_msg(m->event_msg); + msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); __skb_queue_tail(inputq, m->event_msg); } if (m->window < ADV_IDLE) @@ -611,6 +622,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, case GRP_LEAVE_MSG: if (!m) return; + m->bc_syncpt = msg_grp_bc_syncpt(hdr); /* Wait until WITHDRAW event is received */ if (m->state != MBR_LEAVING) { @@ -618,9 +630,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, return; } /* Otherwise deliver already received WITHDRAW event */ + ehdr = buf_msg(m->event_msg); + msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); __skb_queue_tail(inputq, m->event_msg); *usr_wakeup = true; - tipc_group_delete_member(grp, m); list_del_init(&m->congested); return; case GRP_ADV_MSG: @@ -662,6 +675,7 @@ void tipc_group_member_evt(struct tipc_group *grp, int event = evt->event; struct tipc_member *m; struct net *net; + bool node_up; u32 self; if (!grp) @@ -695,6 +709,7 @@ void tipc_group_member_evt(struct tipc_group *grp, m->event_msg = skb; m->state = MBR_PUBLISHED; } else { + msg_set_grp_bc_seqno(hdr, m->bc_syncpt); __skb_queue_tail(inputq, skb); m->state = MBR_JOINED; *usr_wakeup = true; @@ -715,14 +730,18 @@ void tipc_group_member_evt(struct tipc_group *grp, *usr_wakeup = true; m->usr_pending = false; + node_up = tipc_node_is_up(net, node); /* Hold back event if more messages might be expected */ - if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) { + if (m->state != MBR_LEAVING && node_up) { m->event_msg = skb; m->state = MBR_LEAVING; } else { + if (node_up) + msg_set_grp_bc_seqno(hdr, m->bc_syncpt); + else + msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); __skb_queue_tail(inputq, skb); - tipc_group_delete_member(grp, m); } list_del_init(&m->congested); } -- cgit v1.2.3 From 04d7b574b245c66001a33cb9da2c0311063af73f Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:34 +0200 Subject: tipc: add multipoint-to-point flow control We already have point-to-multipoint flow control within a group. But we even need the opposite; -a scheme which can handle that potentially hundreds of sources may try to send messages to the same destination simultaneously without causing buffer overflow at the recipient. This commit adds such a mechanism. The algorithm works as follows: - When a member detects a new, joining member, it initially set its state to JOINED and advertises a minimum window to the new member. This window is chosen so that the new member can send exactly one maximum sized message, or several smaller ones, to the recipient before it must stop and wait for an additional advertisement. This minimum window ADV_IDLE is set to 65 1kB blocks. - When a member receives the first data message from a JOINED member, it changes the state of the latter to ACTIVE, and advertises a larger window ADV_ACTIVE = 12 x ADV_IDLE blocks to the sender, so it can continue sending with minimal disturbances to the data flow. - The active members are kept in a dedicated linked list. Each time a message is received from an active member, it will be moved to the tail of that list. This way, we keep a record of which members have been most (tail) and least (head) recently active. - There is a maximum number (16) of permitted simultaneous active senders per receiver. When this limit is reached, the receiver will not advertise anything immediately to a new sender, but instead put it in a PENDING state, and add it to a corresponding queue. At the same time, it will pick the least recently active member, send it an advertisement RECLAIM message, and set this member to state RECLAIMING. - The reclaimee member has to respond with a REMIT message, meaning that it goes back to a send window of ADV_IDLE, and returns its unused advertised blocks beyond that value to the reclaiming member. - When the reclaiming member receives the REMIT message, it unlinks the reclaimee from its active list, resets its state to JOINED, and notes that it is now back at ADV_IDLE advertised blocks to that member. If there are still unread data messages sent out by reclaimee before the REMIT, the member goes into an intermediate state REMITTED, where it stays until the said messages have been consumed. - The returned advertised blocks can now be re-advertised to the pending member, which is now set to state ACTIVE and added to the active member list. - To be proactive, i.e., to minimize the risk that any member will end up in the pending queue, we start reclaiming resources already when the number of active members exceeds 3/4 of the permitted maximum. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 129 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- net/tipc/msg.h | 12 ++++++ 2 files changed, 136 insertions(+), 5 deletions(-) diff --git a/net/tipc/group.c b/net/tipc/group.c index 8f0eb5d22e8f..7821085a7dd8 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -54,6 +54,10 @@ enum mbr_state { MBR_JOINING, MBR_PUBLISHED, MBR_JOINED, + MBR_PENDING, + MBR_ACTIVE, + MBR_RECLAIMING, + MBR_REMITTED, MBR_LEAVING }; @@ -79,6 +83,9 @@ struct tipc_member { struct tipc_group { struct rb_root members; struct list_head congested; + struct list_head pending; + struct list_head active; + struct list_head reclaiming; struct tipc_nlist dests; struct net *net; int subid; @@ -88,6 +95,8 @@ struct tipc_group { u32 scope; u32 portid; u16 member_cnt; + u16 active_cnt; + u16 max_active; u16 bc_snd_nxt; u16 bc_ackers; bool loopback; @@ -97,12 +106,29 @@ struct tipc_group { static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, int mtyp, struct sk_buff_head *xmitq); +static void tipc_group_decr_active(struct tipc_group *grp, + struct tipc_member *m) +{ + if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING) + grp->active_cnt--; +} + static int tipc_group_rcvbuf_limit(struct tipc_group *grp) { + int max_active, active_pool, idle_pool; int mcnt = grp->member_cnt + 1; + /* Limit simultaneous reception from other members */ + max_active = min(mcnt / 8, 64); + max_active = max(max_active, 16); + grp->max_active = max_active; + + /* Reserve blocks for active and idle members */ + active_pool = max_active * ADV_ACTIVE; + idle_pool = (mcnt - max_active) * ADV_IDLE; + /* Scale to bytes, considering worst-case truesize/msgsize ratio */ - return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4; + return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; } u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) @@ -143,6 +169,9 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, return NULL; tipc_nlist_init(&grp->dests, tipc_own_addr(net)); INIT_LIST_HEAD(&grp->congested); + INIT_LIST_HEAD(&grp->active); + INIT_LIST_HEAD(&grp->pending); + INIT_LIST_HEAD(&grp->reclaiming); grp->members = RB_ROOT; grp->net = net; grp->portid = portid; @@ -286,6 +315,7 @@ static void tipc_group_delete_member(struct tipc_group *grp, list_del_init(&m->list); list_del_init(&m->congested); + tipc_group_decr_active(grp, m); /* If last member on a node, remove node from dest list */ if (!tipc_group_find_node(grp, m->node)) @@ -378,6 +408,10 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, return true; if (state == MBR_JOINED && adv == ADV_IDLE) return true; + if (state == MBR_ACTIVE && adv == ADV_ACTIVE) + return true; + if (state == MBR_PENDING && adv == ADV_IDLE) + return true; skb_queue_head_init(&xmitq); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); tipc_node_distr_xmit(grp->net, &xmitq); @@ -523,7 +557,11 @@ drop: void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, u32 port, struct sk_buff_head *xmitq) { - struct tipc_member *m; + struct list_head *active = &grp->active; + int max_active = grp->max_active; + int reclaim_limit = max_active * 3 / 4; + int active_cnt = grp->active_cnt; + struct tipc_member *m, *rm; m = tipc_group_find_member(grp, node, port); if (!m) @@ -533,9 +571,41 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, switch (m->state) { case MBR_JOINED: - if (m->advertised <= (ADV_ACTIVE - ADV_UNIT)) + /* Reclaim advertised space from least active member */ + if (!list_empty(active) && active_cnt >= reclaim_limit) { + rm = list_first_entry(active, struct tipc_member, list); + rm->state = MBR_RECLAIMING; + list_move_tail(&rm->list, &grp->reclaiming); + tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); + } + /* If max active, become pending and wait for reclaimed space */ + if (active_cnt >= max_active) { + m->state = MBR_PENDING; + list_add_tail(&m->list, &grp->pending); + break; + } + /* Otherwise become active */ + m->state = MBR_ACTIVE; + list_add_tail(&m->list, &grp->active); + grp->active_cnt++; + /* Fall through */ + case MBR_ACTIVE: + if (!list_is_last(&m->list, &grp->active)) + list_move_tail(&m->list, &grp->active); + if (m->advertised > (ADV_ACTIVE * 3 / 4)) + break; + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + break; + case MBR_REMITTED: + if (m->advertised > ADV_IDLE) + break; + m->state = MBR_JOINED; + if (m->advertised < ADV_IDLE) { + pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + } break; + case MBR_RECLAIMING: case MBR_DISCOVERED: case MBR_JOINING: case MBR_LEAVING: @@ -557,8 +627,10 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, if (!skb) return; - if (m->state == MBR_JOINED) + if (m->state == MBR_ACTIVE) adv = ADV_ACTIVE - m->advertised; + else if (m->state == MBR_JOINED || m->state == MBR_PENDING) + adv = ADV_IDLE - m->advertised; hdr = buf_msg(skb); @@ -573,6 +645,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, m->advertised += adv; } else if (mtyp == GRP_ACK_MSG) { msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); + } else if (mtyp == GRP_REMIT_MSG) { + msg_set_grp_remitted(hdr, m->window); } __skb_queue_tail(xmitq, skb); } @@ -583,8 +657,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, { u32 node = msg_orignode(hdr); u32 port = msg_origport(hdr); - struct tipc_member *m; + struct tipc_member *m, *pm; struct tipc_msg *ehdr; + u16 remitted, in_flight; if (!grp) return; @@ -626,6 +701,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, /* Wait until WITHDRAW event is received */ if (m->state != MBR_LEAVING) { + tipc_group_decr_active(grp, m); m->state = MBR_LEAVING; return; } @@ -653,6 +729,48 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, *usr_wakeup = true; m->usr_pending = false; return; + case GRP_RECLAIM_MSG: + if (!m) + return; + *usr_wakeup = m->usr_pending; + m->usr_pending = false; + tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); + m->window = ADV_IDLE; + return; + case GRP_REMIT_MSG: + if (!m || m->state != MBR_RECLAIMING) + return; + + list_del_init(&m->list); + grp->active_cnt--; + remitted = msg_grp_remitted(hdr); + + /* Messages preceding the REMIT still in receive queue */ + if (m->advertised > remitted) { + m->state = MBR_REMITTED; + in_flight = m->advertised - remitted; + } + /* All messages preceding the REMIT have been read */ + if (m->advertised <= remitted) { + m->state = MBR_JOINED; + in_flight = 0; + } + /* ..and the REMIT overtaken by more messages => re-advertise */ + if (m->advertised < remitted) + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + + m->advertised = ADV_IDLE + in_flight; + + /* Set oldest pending member to active and advertise */ + if (list_empty(&grp->pending)) + return; + pm = list_first_entry(&grp->pending, struct tipc_member, list); + pm->state = MBR_ACTIVE; + list_move_tail(&pm->list, &grp->active); + grp->active_cnt++; + if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) + tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); + return; default: pr_warn("Received unknown GROUP_PROTO message\n"); } @@ -735,6 +853,7 @@ void tipc_group_member_evt(struct tipc_group *grp, /* Hold back event if more messages might be expected */ if (m->state != MBR_LEAVING && node_up) { m->event_msg = skb; + tipc_group_decr_active(grp, m); m->state = MBR_LEAVING; } else { if (node_up) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 52c6a2e01995..cedf811317fb 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -548,6 +548,8 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) #define GRP_LEAVE_MSG 1 #define GRP_ADV_MSG 2 #define GRP_ACK_MSG 3 +#define GRP_RECLAIM_MSG 4 +#define GRP_REMIT_MSG 5 /* * Word 1 @@ -850,6 +852,16 @@ static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n) msg_set_bits(m, 9, 16, 0xffff, n); } +static inline u16 msg_grp_remitted(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_remitted(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + /* Word 10 */ static inline u16 msg_grp_evt(struct tipc_msg *m) -- cgit v1.2.3