summaryrefslogtreecommitdiff
path: root/net/rxrpc/call_object.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc/call_object.c')
-rw-r--r--net/rxrpc/call_object.c224
1 files changed, 88 insertions, 136 deletions
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 83019e489555..be5733d55794 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -24,11 +24,6 @@
*/
unsigned int rxrpc_max_call_lifetime = 60 * HZ;
-/*
- * Time till dead call expires after last use (in jiffies).
- */
-unsigned int rxrpc_dead_call_expiry = 2 * HZ;
-
const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
[RXRPC_CALL_UNINITIALISED] = "Uninit ",
[RXRPC_CALL_CLIENT_AWAIT_CONN] = "ClWtConn",
@@ -43,7 +38,6 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
[RXRPC_CALL_SERVER_SEND_REPLY] = "SvSndRpl",
[RXRPC_CALL_SERVER_AWAIT_ACK] = "SvAwtACK",
[RXRPC_CALL_COMPLETE] = "Complete",
- [RXRPC_CALL_DEAD] = "Dead ",
};
const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
@@ -74,11 +68,10 @@ struct kmem_cache *rxrpc_call_jar;
LIST_HEAD(rxrpc_calls);
DEFINE_RWLOCK(rxrpc_call_lock);
-static void rxrpc_destroy_call(struct work_struct *work);
static void rxrpc_call_life_expired(unsigned long _call);
-static void rxrpc_dead_call_expired(unsigned long _call);
static void rxrpc_ack_time_expired(unsigned long _call);
static void rxrpc_resend_time_expired(unsigned long _call);
+static void rxrpc_cleanup_call(struct rxrpc_call *call);
/*
* find an extant server call
@@ -138,13 +131,10 @@ static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
(unsigned long) call);
- setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
- (unsigned long) call);
setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
(unsigned long) call);
setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
(unsigned long) call);
- INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
INIT_WORK(&call->processor, &rxrpc_process_call);
INIT_LIST_HEAD(&call->link);
INIT_LIST_HEAD(&call->chan_wait_link);
@@ -185,11 +175,9 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
if (!call)
return ERR_PTR(-ENOMEM);
call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
-
- sock_hold(&rx->sk);
- call->socket = rx;
call->rx_data_post = 1;
call->service_id = srx->srx_service;
+ rcu_assign_pointer(call->socket, rx);
_leave(" = %p", call);
return call;
@@ -244,8 +232,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
return call;
}
- trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here,
- (const void *)user_call_ID);
+ trace_rxrpc_call(call, rxrpc_call_new_client,
+ atomic_read(&call->usage), 0,
+ here, (const void *)user_call_ID);
/* Publish the call, even though it is incompletely set up as yet */
call->user_call_ID = user_call_ID;
@@ -295,8 +284,10 @@ error:
list_del_init(&call->link);
write_unlock_bh(&rxrpc_call_lock);
+error_out:
+ __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
+ RX_CALL_DEAD, ret);
set_bit(RXRPC_CALL_RELEASED, &call->flags);
- call->state = RXRPC_CALL_DEAD;
rxrpc_put_call(call, rxrpc_call_put);
_leave(" = %d", ret);
return ERR_PTR(ret);
@@ -308,11 +299,8 @@ error:
*/
found_user_ID_now_present:
write_unlock(&rx->call_lock);
- set_bit(RXRPC_CALL_RELEASED, &call->flags);
- call->state = RXRPC_CALL_DEAD;
- rxrpc_put_call(call, rxrpc_call_put);
- _leave(" = -EEXIST [%p]", call);
- return ERR_PTR(-EEXIST);
+ ret = -EEXIST;
+ goto error_out;
}
/*
@@ -340,7 +328,6 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
atomic_read(&candidate->usage), 0, here, NULL);
chan = sp->hdr.cid & RXRPC_CHANNELMASK;
- candidate->socket = rx;
candidate->conn = conn;
candidate->peer = conn->params.peer;
candidate->cid = sp->hdr.cid;
@@ -351,6 +338,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
candidate->flags |= (1 << RXRPC_CALL_IS_SERVICE);
if (conn->security_ix > 0)
candidate->state = RXRPC_CALL_SERVER_SECURING;
+ rcu_assign_pointer(candidate->socket, rx);
spin_lock(&conn->channel_lock);
@@ -411,7 +399,6 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
candidate = NULL;
conn->channels[chan].call_counter = call_id;
rcu_assign_pointer(conn->channels[chan].call, call);
- sock_hold(&rx->sk);
rxrpc_get_connection(conn);
rxrpc_get_peer(call->peer);
spin_unlock(&conn->channel_lock);
@@ -453,6 +440,39 @@ old_call:
}
/*
+ * Queue a call's work processor, getting a ref to pass to the work queue.
+ */
+bool rxrpc_queue_call(struct rxrpc_call *call)
+{
+ const void *here = __builtin_return_address(0);
+ int n = __atomic_add_unless(&call->usage, 1, 0);
+ int m = atomic_read(&call->skb_count);
+ if (n == 0)
+ return false;
+ if (rxrpc_queue_work(&call->processor))
+ trace_rxrpc_call(call, rxrpc_call_queued, n + 1, m, here, NULL);
+ else
+ rxrpc_put_call(call, rxrpc_call_put_noqueue);
+ return true;
+}
+
+/*
+ * Queue a call's work processor, passing the callers ref to the work queue.
+ */
+bool __rxrpc_queue_call(struct rxrpc_call *call)
+{
+ const void *here = __builtin_return_address(0);
+ int n = atomic_read(&call->usage);
+ int m = atomic_read(&call->skb_count);
+ ASSERTCMP(n, >=, 1);
+ if (rxrpc_queue_work(&call->processor))
+ trace_rxrpc_call(call, rxrpc_call_queued_ref, n, m, here, NULL);
+ else
+ rxrpc_put_call(call, rxrpc_call_put_noqueue);
+ return true;
+}
+
+/*
* Note the re-emergence of a call.
*/
void rxrpc_see_call(struct rxrpc_call *call)
@@ -493,11 +513,8 @@ void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
/*
* detach a call from a socket and set up for release
*/
-void rxrpc_release_call(struct rxrpc_call *call)
+void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
{
- struct rxrpc_connection *conn = call->conn;
- struct rxrpc_sock *rx = call->socket;
-
_enter("{%d,%d,%d,%d}",
call->debug_id, atomic_read(&call->usage),
atomic_read(&call->ackr_not_idle),
@@ -513,7 +530,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
/* dissociate from the socket
* - the socket's ref on the call is passed to the death timer
*/
- _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
+ _debug("RELEASE CALL %p (%d)", call, call->debug_id);
if (call->peer) {
spin_lock(&call->peer->lock);
@@ -532,20 +549,30 @@ void rxrpc_release_call(struct rxrpc_call *call)
rb_erase(&call->sock_node, &rx->calls);
memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
+ rxrpc_put_call(call, rxrpc_call_put_userid);
}
write_unlock_bh(&rx->call_lock);
/* free up the channel for reuse */
- write_lock_bh(&call->state_lock);
+ if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK) {
+ clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
+ rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
+ rxrpc_call_completed(call);
+ } else {
+ write_lock_bh(&call->state_lock);
+
+ if (call->state < RXRPC_CALL_COMPLETE) {
+ _debug("+++ ABORTING STATE %d +++\n", call->state);
+ __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
+ clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
+ rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT);
+ }
- if (call->state < RXRPC_CALL_COMPLETE &&
- call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
- _debug("+++ ABORTING STATE %d +++\n", call->state);
- __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
+ write_unlock_bh(&call->state_lock);
}
- write_unlock_bh(&call->state_lock);
- rxrpc_disconnect_call(call);
+ if (call->conn)
+ rxrpc_disconnect_call(call);
/* clean up the Rx queue */
if (!skb_queue_empty(&call->rx_queue) ||
@@ -569,53 +596,16 @@ void rxrpc_release_call(struct rxrpc_call *call)
}
spin_unlock_bh(&call->lock);
}
+ rxrpc_purge_queue(&call->knlrecv_queue);
del_timer_sync(&call->resend_timer);
del_timer_sync(&call->ack_timer);
del_timer_sync(&call->lifetimer);
- call->deadspan.expires = jiffies + rxrpc_dead_call_expiry;
- add_timer(&call->deadspan);
_leave("");
}
/*
- * handle a dead call being ready for reaping
- */
-static void rxrpc_dead_call_expired(unsigned long _call)
-{
- struct rxrpc_call *call = (struct rxrpc_call *) _call;
-
- _enter("{%d}", call->debug_id);
-
- rxrpc_see_call(call);
- write_lock_bh(&call->state_lock);
- call->state = RXRPC_CALL_DEAD;
- write_unlock_bh(&call->state_lock);
- rxrpc_put_call(call, rxrpc_call_put);
-}
-
-/*
- * mark a call as to be released, aborting it if it's still in progress
- * - called with softirqs disabled
- */
-static void rxrpc_mark_call_released(struct rxrpc_call *call)
-{
- bool sched = false;
-
- rxrpc_see_call(call);
- write_lock(&call->state_lock);
- if (call->state < RXRPC_CALL_DEAD) {
- sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
- if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
- sched = true;
- }
- write_unlock(&call->state_lock);
- if (sched)
- rxrpc_queue_call(call);
-}
-
-/*
* release all the calls associated with a socket
*/
void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
@@ -629,17 +619,17 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
/* kill the not-yet-accepted incoming calls */
list_for_each_entry(call, &rx->secureq, accept_link) {
- rxrpc_mark_call_released(call);
+ rxrpc_release_call(rx, call);
}
list_for_each_entry(call, &rx->acceptq, accept_link) {
- rxrpc_mark_call_released(call);
+ rxrpc_release_call(rx, call);
}
/* mark all the calls as no longer wanting incoming packets */
for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
call = rb_entry(p, struct rxrpc_call, sock_node);
- rxrpc_mark_call_released(call);
+ rxrpc_release_call(rx, call);
}
read_unlock_bh(&rx->call_lock);
@@ -663,8 +653,7 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
if (n == 0) {
_debug("call %d dead", call->debug_id);
WARN_ON(m != 0);
- ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
- rxrpc_queue_work(&call->destroyer);
+ rxrpc_cleanup_call(call);
}
}
@@ -683,8 +672,7 @@ void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
if (n == 0) {
_debug("call %d dead", call->debug_id);
WARN_ON(m != 0);
- ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
- rxrpc_queue_work(&call->destroyer);
+ rxrpc_cleanup_call(call);
}
}
@@ -708,23 +696,19 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
{
_net("DESTROY CALL %d", call->debug_id);
- ASSERT(call->socket);
+ write_lock_bh(&rxrpc_call_lock);
+ list_del_init(&call->link);
+ write_unlock_bh(&rxrpc_call_lock);
memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
del_timer_sync(&call->lifetimer);
- del_timer_sync(&call->deadspan);
del_timer_sync(&call->ack_timer);
del_timer_sync(&call->resend_timer);
+ ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
- ASSERTCMP(call->events, ==, 0);
- if (work_pending(&call->processor)) {
- _debug("defer destroy");
- rxrpc_queue_work(&call->destroyer);
- return;
- }
-
+ ASSERT(!work_pending(&call->processor));
ASSERTCMP(call->conn, ==, NULL);
if (call->acks_window) {
@@ -753,40 +737,21 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
rxrpc_purge_queue(&call->rx_queue);
ASSERT(skb_queue_empty(&call->rx_oos_queue));
rxrpc_purge_queue(&call->knlrecv_queue);
- sock_put(&call->socket->sk);
call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
}
/*
- * destroy a call
- */
-static void rxrpc_destroy_call(struct work_struct *work)
-{
- struct rxrpc_call *call =
- container_of(work, struct rxrpc_call, destroyer);
-
- _enter("%p{%d,%x,%p}",
- call, atomic_read(&call->usage), call->cid, call->conn);
-
- ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
-
- write_lock_bh(&rxrpc_call_lock);
- list_del_init(&call->link);
- write_unlock_bh(&rxrpc_call_lock);
-
- rxrpc_cleanup_call(call);
- _leave("");
-}
-
-/*
- * preemptively destroy all the call records from a transport endpoint rather
- * than waiting for them to time out
+ * Make sure that all calls are gone.
*/
void __exit rxrpc_destroy_all_calls(void)
{
struct rxrpc_call *call;
_enter("");
+
+ if (list_empty(&rxrpc_calls))
+ return;
+
write_lock_bh(&rxrpc_call_lock);
while (!list_empty(&rxrpc_calls)) {
@@ -796,28 +761,15 @@ void __exit rxrpc_destroy_all_calls(void)
rxrpc_see_call(call);
list_del_init(&call->link);
- switch (atomic_read(&call->usage)) {
- case 0:
- ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
- break;
- case 1:
- if (del_timer_sync(&call->deadspan) != 0 &&
- call->state != RXRPC_CALL_DEAD)
- rxrpc_dead_call_expired((unsigned long) call);
- if (call->state != RXRPC_CALL_DEAD)
- break;
- default:
- pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
- call, atomic_read(&call->usage),
- atomic_read(&call->ackr_not_idle),
- rxrpc_call_states[call->state],
- call->flags, call->events);
- if (!skb_queue_empty(&call->rx_queue))
- pr_err("Rx queue occupied\n");
- if (!skb_queue_empty(&call->rx_oos_queue))
- pr_err("OOS queue occupied\n");
- break;
- }
+ pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
+ call, atomic_read(&call->usage),
+ atomic_read(&call->ackr_not_idle),
+ rxrpc_call_states[call->state],
+ call->flags, call->events);
+ if (!skb_queue_empty(&call->rx_queue))
+ pr_err("Rx queue occupied\n");
+ if (!skb_queue_empty(&call->rx_oos_queue))
+ pr_err("OOS queue occupied\n");
write_unlock_bh(&rxrpc_call_lock);
cond_resched();