summaryrefslogtreecommitdiff
path: root/net/tipc/server.c
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2017-10-13 08:46:01 -0700
committerDavid S. Miller <davem@davemloft.net>2017-10-13 08:46:01 -0700
commita00344bd1bbea2ba40719ae0eb3b6da7fae08cf2 (patch)
tree3224d16251675b877b9a7d7b94420de7bbd36a9e /net/tipc/server.c
parent2d0d21c12dfa3851620f1fa9fe2d444538f1fad4 (diff)
parent04d7b574b245c66001a33cb9da2c0311063af73f (diff)
Merge branch 'tipc-comm-groups'
Jon Maloy says: ==================== tipc: Introduce Communcation Group feature With this commit series we introduce a 'Group Communication' feature in order to resolve the datagram and multicast flow control problem. This new feature makes it possible for a user to instantiate multiple private virtual brokerless message buses by just creating and joining member sockets. The main features are as follows: --------------------------------- - Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN. If it is the first socket of the group this implies creation of the group. This call takes four parameters: 'type' serves as group identifier, 'instance' serves as member identifier, and 'scope' indicates the visibility of the group (node/cluster/zone). Finally, 'flags' indicates different options for the socket joining the group. For the time being, there are only two such flags: 1) 'LOOPBACK' indicates if the creator of the socket wants to receive a copy of broadcast or multicast messages it sends to the group, 2) EVENTS indicates if it wants to receive membership (JOINED/LEFT) events for the other members of the group. - Groups are closed, i.e., sockets which have not joined a group will not be able to send messages to or receive messages from members of the group, and vice versa. A socket can only be member of one group at a time. - There are four transmission modes. 1: Unicast. The sender transmits a message using the port identity (node:port tuple) of the receiving socket. 2: Anycast. The sender transmits a message using a port name (type: instance:scope) of one of the receiving sockets. If more than one member socket matches the given address a destination is selected according to a round-robin algorithm, but also considering the destination load (advertised window size) as an additional criteria. 3: Multicast. The sender transmits a message using a port name (type:instance:scope) of one or more of the receiving sockets. All sockets in the group matching the given address will receive a copy of the message. 4: Broadcast. The sender transmits a message using the primtive send(). All members of the group, irrespective of their member identity (instance) number receive a copy of the message. - TIPC broadcast is used for carrying messages in mode 3 or 4 when this is deemed more efficient, i.e., depending on number of actual destinations. - All transmission modes are flow controlled, so that messages never are dropped or rejected, just like we are used to from connection oriented communication. A special algorithm guarantees that this is true even for multipoint-to-point communication, i.e., at occasions where many source sockets may decide to send simultaneously towards the same destination socket. - Sequence order is always guaranteed, even between the different transmission modes. - Member join/leave events are received in all other member sockets in guaranteed order. I.e., a 'JOINED' (an empty message with the OOB bit set) will always be received before the first data message from a new member, and a 'LEAVE' (like 'JOINED', but with EOR bit set) will always arrive after the last data message from a leaving member. ----- v2: Reordered variable declarations in descending length order, as per feedback from David Miller. This was done as far as permitted by the the initialization order. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/server.c')
-rw-r--r--net/tipc/server.c121
1 files changed, 94 insertions, 27 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 3cd6402e812c..713077536d0c 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -36,6 +36,8 @@
#include "server.h"
#include "core.h"
#include "socket.h"
+#include "addr.h"
+#include "msg.h"
#include <net/sock.h>
#include <linux/module.h>
@@ -105,13 +107,11 @@ static void tipc_conn_kref_release(struct kref *kref)
kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr));
sock_release(sock);
con->sock = NULL;
-
- spin_lock_bh(&s->idr_lock);
- idr_remove(&s->conn_idr, con->conid);
- s->idr_in_use--;
- spin_unlock_bh(&s->idr_lock);
}
-
+ spin_lock_bh(&s->idr_lock);
+ idr_remove(&s->conn_idr, con->conid);
+ s->idr_in_use--;
+ spin_unlock_bh(&s->idr_lock);
tipc_clean_outqueues(con);
kfree(con);
}
@@ -197,7 +197,8 @@ static void tipc_close_conn(struct tipc_conn *con)
struct tipc_server *s = con->server;
if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
- tipc_unregister_callbacks(con);
+ if (con->sock)
+ tipc_unregister_callbacks(con);
if (con->conid)
s->tipc_conn_release(con->conid, con->usr_data);
@@ -207,8 +208,8 @@ static void tipc_close_conn(struct tipc_conn *con)
* are harmless for us here as we have already deleted this
* connection from server connection list.
*/
- kernel_sock_shutdown(con->sock, SHUT_RDWR);
-
+ if (con->sock)
+ kernel_sock_shutdown(con->sock, SHUT_RDWR);
conn_put(con);
}
}
@@ -487,38 +488,104 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
}
}
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
+ u32 lower, u32 upper, int *conid)
+{
+ struct tipc_subscriber *scbr;
+ struct tipc_subscr sub;
+ struct tipc_server *s;
+ struct tipc_conn *con;
+
+ sub.seq.type = type;
+ sub.seq.lower = lower;
+ sub.seq.upper = upper;
+ sub.timeout = TIPC_WAIT_FOREVER;
+ sub.filter = TIPC_SUB_PORTS;
+ *(u32 *)&sub.usr_handle = port;
+
+ con = tipc_alloc_conn(tipc_topsrv(net));
+ if (!con)
+ return false;
+
+ *conid = con->conid;
+ s = con->server;
+ scbr = s->tipc_conn_new(*conid);
+ if (!scbr) {
+ tipc_close_conn(con);
+ return false;
+ }
+
+ con->usr_data = scbr;
+ con->sock = NULL;
+ s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub));
+ return true;
+}
+
+void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
+{
+ struct tipc_conn *con;
+
+ con = tipc_conn_lookup(tipc_topsrv(net), conid);
+ if (!con)
+ return;
+ tipc_close_conn(con);
+ conn_put(con);
+}
+
+static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
+{
+ u32 port = *(u32 *)&evt->s.usr_handle;
+ u32 self = tipc_own_addr(net);
+ struct sk_buff_head evtq;
+ struct sk_buff *skb;
+
+ skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
+ self, self, port, port, 0);
+ if (!skb)
+ return;
+ msg_set_dest_droppable(buf_msg(skb), true);
+ memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
+ skb_queue_head_init(&evtq);
+ __skb_queue_tail(&evtq, skb);
+ tipc_sk_rcv(net, &evtq);
+}
+
static void tipc_send_to_sock(struct tipc_conn *con)
{
- int count = 0;
struct tipc_server *s = con->server;
struct outqueue_entry *e;
+ struct tipc_event *evt;
struct msghdr msg;
+ int count = 0;
int ret;
spin_lock_bh(&con->outqueue_lock);
while (test_bit(CF_CONNECTED, &con->flags)) {
- e = list_entry(con->outqueue.next, struct outqueue_entry,
- list);
+ e = list_entry(con->outqueue.next, struct outqueue_entry, list);
if ((struct list_head *) e == &con->outqueue)
break;
- spin_unlock_bh(&con->outqueue_lock);
- memset(&msg, 0, sizeof(msg));
- msg.msg_flags = MSG_DONTWAIT;
+ spin_unlock_bh(&con->outqueue_lock);
- if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
- msg.msg_name = &e->dest;
- msg.msg_namelen = sizeof(struct sockaddr_tipc);
- }
- ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
- e->iov.iov_len);
- if (ret == -EWOULDBLOCK || ret == 0) {
- cond_resched();
- goto out;
- } else if (ret < 0) {
- goto send_err;
+ if (con->sock) {
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags = MSG_DONTWAIT;
+ if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
+ msg.msg_name = &e->dest;
+ msg.msg_namelen = sizeof(struct sockaddr_tipc);
+ }
+ ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
+ e->iov.iov_len);
+ if (ret == -EWOULDBLOCK || ret == 0) {
+ cond_resched();
+ goto out;
+ } else if (ret < 0) {
+ goto send_err;
+ }
+ } else {
+ evt = e->iov.iov_base;
+ tipc_send_kern_top_evt(s->net, evt);
}
-
/* Don't starve users filling buffers */
if (++count >= MAX_SEND_MSG_COUNT) {
cond_resched();