diff options
author | David S. Miller <davem@davemloft.net> | 2017-10-13 08:46:01 -0700 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2017-10-13 08:46:01 -0700 |
commit | a00344bd1bbea2ba40719ae0eb3b6da7fae08cf2 (patch) | |
tree | 3224d16251675b877b9a7d7b94420de7bbd36a9e /net/tipc/server.c | |
parent | 2d0d21c12dfa3851620f1fa9fe2d444538f1fad4 (diff) | |
parent | 04d7b574b245c66001a33cb9da2c0311063af73f (diff) |
Merge branch 'tipc-comm-groups'
Jon Maloy says:
====================
tipc: Introduce Communcation Group feature
With this commit series we introduce a 'Group Communication' feature in
order to resolve the datagram and multicast flow control problem. This
new feature makes it possible for a user to instantiate multiple private
virtual brokerless message buses by just creating and joining member
sockets.
The main features are as follows:
---------------------------------
- Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN.
If it is the first socket of the group this implies creation of the
group. This call takes four parameters: 'type' serves as group
identifier, 'instance' serves as member identifier, and 'scope'
indicates the visibility of the group (node/cluster/zone). Finally,
'flags' indicates different options for the socket joining the group.
For the time being, there are only two such flags: 1) 'LOOPBACK'
indicates if the creator of the socket wants to receive a copy of
broadcast or multicast messages it sends to the group, 2) EVENTS
indicates if it wants to receive membership (JOINED/LEFT) events for
the other members of the group.
- Groups are closed, i.e., sockets which have not joined a group will
not be able to send messages to or receive messages from members of
the group, and vice versa. A socket can only be member of one group
at a time.
- There are four transmission modes.
1: Unicast. The sender transmits a message using the port identity
(node:port tuple) of the receiving socket.
2: Anycast. The sender transmits a message using a port name (type:
instance:scope) of one of the receiving sockets. If more than
one member socket matches the given address a destination is
selected according to a round-robin algorithm, but also considering
the destination load (advertised window size) as an additional
criteria.
3: Multicast. The sender transmits a message using a port name
(type:instance:scope) of one or more of the receiving sockets.
All sockets in the group matching the given address will receive
a copy of the message.
4: Broadcast. The sender transmits a message using the primtive
send(). All members of the group, irrespective of their member
identity (instance) number receive a copy of the message.
- TIPC broadcast is used for carrying messages in mode 3 or 4 when
this is deemed more efficient, i.e., depending on number of actual
destinations.
- All transmission modes are flow controlled, so that messages never
are dropped or rejected, just like we are used to from connection
oriented communication. A special algorithm guarantees that this is
true even for multipoint-to-point communication, i.e., at occasions
where many source sockets may decide to send simultaneously towards
the same destination socket.
- Sequence order is always guaranteed, even between the different
transmission modes.
- Member join/leave events are received in all other member sockets
in guaranteed order. I.e., a 'JOINED' (an empty message with the OOB
bit set) will always be received before the first data message from
a new member, and a 'LEAVE' (like 'JOINED', but with EOR bit set) will
always arrive after the last data message from a leaving member.
-----
v2: Reordered variable declarations in descending length order, as per
feedback from David Miller. This was done as far as permitted by the
the initialization order.
====================
Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/server.c')
-rw-r--r-- | net/tipc/server.c | 121 |
1 files changed, 94 insertions, 27 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c index 3cd6402e812c..713077536d0c 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -36,6 +36,8 @@ #include "server.h" #include "core.h" #include "socket.h" +#include "addr.h" +#include "msg.h" #include <net/sock.h> #include <linux/module.h> @@ -105,13 +107,11 @@ static void tipc_conn_kref_release(struct kref *kref) kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); sock_release(sock); con->sock = NULL; - - spin_lock_bh(&s->idr_lock); - idr_remove(&s->conn_idr, con->conid); - s->idr_in_use--; - spin_unlock_bh(&s->idr_lock); } - + spin_lock_bh(&s->idr_lock); + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + spin_unlock_bh(&s->idr_lock); tipc_clean_outqueues(con); kfree(con); } @@ -197,7 +197,8 @@ static void tipc_close_conn(struct tipc_conn *con) struct tipc_server *s = con->server; if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { - tipc_unregister_callbacks(con); + if (con->sock) + tipc_unregister_callbacks(con); if (con->conid) s->tipc_conn_release(con->conid, con->usr_data); @@ -207,8 +208,8 @@ static void tipc_close_conn(struct tipc_conn *con) * are harmless for us here as we have already deleted this * connection from server connection list. */ - kernel_sock_shutdown(con->sock, SHUT_RDWR); - + if (con->sock) + kernel_sock_shutdown(con->sock, SHUT_RDWR); conn_put(con); } } @@ -487,38 +488,104 @@ void tipc_conn_terminate(struct tipc_server *s, int conid) } } +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, + u32 lower, u32 upper, int *conid) +{ + struct tipc_subscriber *scbr; + struct tipc_subscr sub; + struct tipc_server *s; + struct tipc_conn *con; + + sub.seq.type = type; + sub.seq.lower = lower; + sub.seq.upper = upper; + sub.timeout = TIPC_WAIT_FOREVER; + sub.filter = TIPC_SUB_PORTS; + *(u32 *)&sub.usr_handle = port; + + con = tipc_alloc_conn(tipc_topsrv(net)); + if (!con) + return false; + + *conid = con->conid; + s = con->server; + scbr = s->tipc_conn_new(*conid); + if (!scbr) { + tipc_close_conn(con); + return false; + } + + con->usr_data = scbr; + con->sock = NULL; + s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub)); + return true; +} + +void tipc_topsrv_kern_unsubscr(struct net *net, int conid) +{ + struct tipc_conn *con; + + con = tipc_conn_lookup(tipc_topsrv(net), conid); + if (!con) + return; + tipc_close_conn(con); + conn_put(con); +} + +static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) +{ + u32 port = *(u32 *)&evt->s.usr_handle; + u32 self = tipc_own_addr(net); + struct sk_buff_head evtq; + struct sk_buff *skb; + + skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), + self, self, port, port, 0); + if (!skb) + return; + msg_set_dest_droppable(buf_msg(skb), true); + memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); + skb_queue_head_init(&evtq); + __skb_queue_tail(&evtq, skb); + tipc_sk_rcv(net, &evtq); +} + static void tipc_send_to_sock(struct tipc_conn *con) { - int count = 0; struct tipc_server *s = con->server; struct outqueue_entry *e; + struct tipc_event *evt; struct msghdr msg; + int count = 0; int ret; spin_lock_bh(&con->outqueue_lock); while (test_bit(CF_CONNECTED, &con->flags)) { - e = list_entry(con->outqueue.next, struct outqueue_entry, - list); + e = list_entry(con->outqueue.next, struct outqueue_entry, list); if ((struct list_head *) e == &con->outqueue) break; - spin_unlock_bh(&con->outqueue_lock); - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT; + spin_unlock_bh(&con->outqueue_lock); - if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { - msg.msg_name = &e->dest; - msg.msg_namelen = sizeof(struct sockaddr_tipc); - } - ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, - e->iov.iov_len); - if (ret == -EWOULDBLOCK || ret == 0) { - cond_resched(); - goto out; - } else if (ret < 0) { - goto send_err; + if (con->sock) { + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { + msg.msg_name = &e->dest; + msg.msg_namelen = sizeof(struct sockaddr_tipc); + } + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, + e->iov.iov_len); + if (ret == -EWOULDBLOCK || ret == 0) { + cond_resched(); + goto out; + } else if (ret < 0) { + goto send_err; + } + } else { + evt = e->iov.iov_base; + tipc_send_kern_top_evt(s->net, evt); } - /* Don't starve users filling buffers */ if (++count >= MAX_SEND_MSG_COUNT) { cond_resched(); |