summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--net/rxrpc/af_rxrpc.c4
-rw-r--r--net/rxrpc/ar-internal.h5
-rw-r--r--net/rxrpc/input.c16
-rw-r--r--net/rxrpc/local_object.c86
4 files changed, 72 insertions, 39 deletions
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index d09eaf153544..8c9bd3ae9edf 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -193,7 +193,7 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
service_in_use:
write_unlock(&local->services_lock);
- rxrpc_put_local(local);
+ rxrpc_unuse_local(local);
ret = -EADDRINUSE;
error_unlock:
release_sock(&rx->sk);
@@ -901,7 +901,7 @@ static int rxrpc_release_sock(struct sock *sk)
rxrpc_queue_work(&rxnet->service_conn_reaper);
rxrpc_queue_work(&rxnet->client_conn_reaper);
- rxrpc_put_local(rx->local);
+ rxrpc_unuse_local(rx->local);
rx->local = NULL;
key_put(rx->key);
rx->key = NULL;
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 822f45386e31..9796c45d2f6a 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -254,7 +254,8 @@ struct rxrpc_security {
*/
struct rxrpc_local {
struct rcu_head rcu;
- atomic_t usage;
+ atomic_t active_users; /* Number of users of the local endpoint */
+ atomic_t usage; /* Number of references to the structure */
struct rxrpc_net *rxnet; /* The network ns in which this resides */
struct list_head link;
struct socket *socket; /* my UDP socket */
@@ -1002,6 +1003,8 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *, const struct sockaddr_rxrpc
struct rxrpc_local *rxrpc_get_local(struct rxrpc_local *);
struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *);
void rxrpc_put_local(struct rxrpc_local *);
+struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *);
+void rxrpc_unuse_local(struct rxrpc_local *);
void rxrpc_queue_local(struct rxrpc_local *);
void rxrpc_destroy_all_locals(struct rxrpc_net *);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 5bd6f1546e5c..ee95d1cd1cdf 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1108,8 +1108,12 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
{
_enter("%p,%p", local, skb);
- skb_queue_tail(&local->event_queue, skb);
- rxrpc_queue_local(local);
+ if (rxrpc_get_local_maybe(local)) {
+ skb_queue_tail(&local->event_queue, skb);
+ rxrpc_queue_local(local);
+ } else {
+ rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ }
}
/*
@@ -1119,8 +1123,12 @@ static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
{
CHECK_SLAB_OKAY(&local->usage);
- skb_queue_tail(&local->reject_queue, skb);
- rxrpc_queue_local(local);
+ if (rxrpc_get_local_maybe(local)) {
+ skb_queue_tail(&local->reject_queue, skb);
+ rxrpc_queue_local(local);
+ } else {
+ rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ }
}
/*
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index b1c71bad510b..9798159ee65f 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -79,6 +79,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
if (local) {
atomic_set(&local->usage, 1);
+ atomic_set(&local->active_users, 1);
local->rxnet = rxnet;
INIT_LIST_HEAD(&local->link);
INIT_WORK(&local->processor, rxrpc_local_processor);
@@ -266,11 +267,8 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
* bind the transport socket may still fail if we're attempting
* to use a local address that the dying object is still using.
*/
- if (!rxrpc_get_local_maybe(local)) {
- cursor = cursor->next;
- list_del_init(&local->link);
+ if (!rxrpc_use_local(local))
break;
- }
age = "old";
goto found;
@@ -284,7 +282,10 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
if (ret < 0)
goto sock_error;
- list_add_tail(&local->link, cursor);
+ if (cursor != &rxnet->local_endpoints)
+ list_replace(cursor, &local->link);
+ else
+ list_add_tail(&local->link, cursor);
age = "new";
found:
@@ -342,7 +343,8 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
}
/*
- * Queue a local endpoint.
+ * Queue a local endpoint unless it has become unreferenced and pass the
+ * caller's reference to the work item.
*/
void rxrpc_queue_local(struct rxrpc_local *local)
{
@@ -351,15 +353,8 @@ void rxrpc_queue_local(struct rxrpc_local *local)
if (rxrpc_queue_work(&local->processor))
trace_rxrpc_local(local, rxrpc_local_queued,
atomic_read(&local->usage), here);
-}
-
-/*
- * A local endpoint reached its end of life.
- */
-static void __rxrpc_put_local(struct rxrpc_local *local)
-{
- _enter("%d", local->debug_id);
- rxrpc_queue_work(&local->processor);
+ else
+ rxrpc_put_local(local);
}
/*
@@ -375,11 +370,46 @@ void rxrpc_put_local(struct rxrpc_local *local)
trace_rxrpc_local(local, rxrpc_local_put, n, here);
if (n == 0)
- __rxrpc_put_local(local);
+ call_rcu(&local->rcu, rxrpc_local_rcu);
}
}
/*
+ * Start using a local endpoint.
+ */
+struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local)
+{
+ unsigned int au;
+
+ local = rxrpc_get_local_maybe(local);
+ if (!local)
+ return NULL;
+
+ au = atomic_fetch_add_unless(&local->active_users, 1, 0);
+ if (au == 0) {
+ rxrpc_put_local(local);
+ return NULL;
+ }
+
+ return local;
+}
+
+/*
+ * Cease using a local endpoint. Once the number of active users reaches 0, we
+ * start the closure of the transport in the work processor.
+ */
+void rxrpc_unuse_local(struct rxrpc_local *local)
+{
+ unsigned int au;
+
+ au = atomic_dec_return(&local->active_users);
+ if (au == 0)
+ rxrpc_queue_local(local);
+ else
+ rxrpc_put_local(local);
+}
+
+/*
* Destroy a local endpoint's socket and then hand the record to RCU to dispose
* of.
*
@@ -393,16 +423,6 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
_enter("%d", local->debug_id);
- /* We can get a race between an incoming call packet queueing the
- * processor again and the work processor starting the destruction
- * process which will shut down the UDP socket.
- */
- if (local->dead) {
- _leave(" [already dead]");
- return;
- }
- local->dead = true;
-
mutex_lock(&rxnet->local_mutex);
list_del_init(&local->link);
mutex_unlock(&rxnet->local_mutex);
@@ -422,13 +442,11 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
*/
rxrpc_purge_queue(&local->reject_queue);
rxrpc_purge_queue(&local->event_queue);
-
- _debug("rcu local %d", local->debug_id);
- call_rcu(&local->rcu, rxrpc_local_rcu);
}
/*
- * Process events on an endpoint
+ * Process events on an endpoint. The work item carries a ref which
+ * we must release.
*/
static void rxrpc_local_processor(struct work_struct *work)
{
@@ -441,8 +459,10 @@ static void rxrpc_local_processor(struct work_struct *work)
do {
again = false;
- if (atomic_read(&local->usage) == 0)
- return rxrpc_local_destroyer(local);
+ if (atomic_read(&local->active_users) == 0) {
+ rxrpc_local_destroyer(local);
+ break;
+ }
if (!skb_queue_empty(&local->reject_queue)) {
rxrpc_reject_packets(local);
@@ -454,6 +474,8 @@ static void rxrpc_local_processor(struct work_struct *work)
again = true;
}
} while (again);
+
+ rxrpc_put_local(local);
}
/*