summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/rxrpc/ar-internal.h1
-rw-r--r--net/rxrpc/call_accept.c1
-rw-r--r--net/rxrpc/call_event.c3
-rw-r--r--net/rxrpc/call_object.c8
-rw-r--r--net/rxrpc/input.c12
-rw-r--r--net/rxrpc/recvmsg.c25
-rw-r--r--net/rxrpc/skbuff.c41
7 files changed, 45 insertions, 46 deletions
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 1bb9e7ac9e14..ff83fb1ddd47 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -425,6 +425,7 @@ struct rxrpc_call {
spinlock_t lock;
rwlock_t state_lock; /* lock for state transition */
atomic_t usage;
+ atomic_t skb_count; /* Outstanding packets on this call */
atomic_t sequence; /* Tx data packet sequence counter */
u32 local_abort; /* local abort code */
u32 remote_abort; /* remote abort code */
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 0b2832141bd0..9bae21e66d65 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -130,6 +130,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
call->state = RXRPC_CALL_SERVER_ACCEPTING;
list_add_tail(&call->accept_link, &rx->acceptq);
rxrpc_get_call(call);
+ atomic_inc(&call->skb_count);
nsp = rxrpc_skb(notification);
nsp->call = call;
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index fc32aa5764a2..f5e99163a09e 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -460,6 +460,7 @@ static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
ASSERTCMP(sp->call, ==, NULL);
sp->call = call;
rxrpc_get_call(call);
+ atomic_inc(&call->skb_count);
/* insert into the buffer in sequence order */
spin_lock_bh(&call->lock);
@@ -734,6 +735,7 @@ all_acked:
skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
sp->call = call;
rxrpc_get_call(call);
+ atomic_inc(&call->skb_count);
spin_lock_bh(&call->lock);
if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
BUG();
@@ -793,6 +795,7 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
sp->error = error;
sp->call = call;
rxrpc_get_call(call);
+ atomic_inc(&call->skb_count);
spin_lock_bh(&call->lock);
ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 91287c9d01bb..c47f14fc5e88 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -491,13 +491,6 @@ void rxrpc_release_call(struct rxrpc_call *call)
spin_lock_bh(&call->lock);
while ((skb = skb_dequeue(&call->rx_queue)) ||
(skb = skb_dequeue(&call->rx_oos_queue))) {
- sp = rxrpc_skb(skb);
- if (sp->call) {
- ASSERTCMP(sp->call, ==, call);
- rxrpc_put_call(call);
- sp->call = NULL;
- }
- skb->destructor = NULL;
spin_unlock_bh(&call->lock);
_debug("- zap %s %%%u #%u",
@@ -605,6 +598,7 @@ void __rxrpc_put_call(struct rxrpc_call *call)
if (atomic_dec_and_test(&call->usage)) {
_debug("call %d dead", call->debug_id);
+ WARN_ON(atomic_read(&call->skb_count) != 0);
ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
rxrpc_queue_work(&call->destroyer);
}
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 991a20d25093..9e0f58edcd01 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -55,9 +55,6 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
if (test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
_debug("already terminated");
ASSERTCMP(call->state, >=, RXRPC_CALL_COMPLETE);
- skb->destructor = NULL;
- sp->call = NULL;
- rxrpc_put_call(call);
rxrpc_free_skb(skb);
return 0;
}
@@ -111,13 +108,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
ret = 0;
out:
- /* release the socket buffer */
- if (skb) {
- skb->destructor = NULL;
- sp->call = NULL;
- rxrpc_put_call(call);
- rxrpc_free_skb(skb);
- }
+ rxrpc_free_skb(skb);
_leave(" = %d", ret);
return ret;
@@ -200,6 +191,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
sp->call = call;
rxrpc_get_call(call);
+ atomic_inc(&call->skb_count);
terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
!(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
ret = rxrpc_queue_rcv_skb(call, skb, false, terminal);
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index a3fa2ed85d63..9ed66d533002 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -203,6 +203,9 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
}
/* we transferred the whole data packet */
+ if (!(flags & MSG_PEEK))
+ rxrpc_kernel_data_consumed(call, skb);
+
if (sp->hdr.flags & RXRPC_LAST_PACKET) {
_debug("last");
if (rxrpc_conn_is_client(call->conn)) {
@@ -360,28 +363,6 @@ wait_error:
}
/**
- * rxrpc_kernel_data_delivered - Record delivery of data message
- * @skb: Message holding data
- *
- * Record the delivery of a data message. This permits RxRPC to keep its
- * tracking correct. The socket buffer will be deleted.
- */
-void rxrpc_kernel_data_delivered(struct sk_buff *skb)
-{
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
- struct rxrpc_call *call = sp->call;
-
- ASSERTCMP(sp->hdr.seq, >=, call->rx_data_recv);
- ASSERTCMP(sp->hdr.seq, <=, call->rx_data_recv + 1);
- call->rx_data_recv = sp->hdr.seq;
-
- ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten);
- rxrpc_free_skb(skb);
-}
-
-EXPORT_SYMBOL(rxrpc_kernel_data_delivered);
-
-/**
* rxrpc_kernel_is_data_last - Determine if data message is last one
* @skb: Message holding data
*
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index eee0cfd9ac8c..06c51d4b622d 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -98,11 +98,39 @@ static void rxrpc_hard_ACK_data(struct rxrpc_call *call,
spin_unlock_bh(&call->lock);
}
+/**
+ * rxrpc_kernel_data_consumed - Record consumption of data message
+ * @call: The call to which the message pertains.
+ * @skb: Message holding data
+ *
+ * Record the consumption of a data message and generate an ACK if appropriate.
+ * The call state is shifted if this was the final packet. The caller must be
+ * in process context with no spinlocks held.
+ *
+ * TODO: Actually generate the ACK here rather than punting this to the
+ * workqueue.
+ */
+void rxrpc_kernel_data_consumed(struct rxrpc_call *call, struct sk_buff *skb)
+{
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+
+ _enter("%d,%p{%u}", call->debug_id, skb, sp->hdr.seq);
+
+ ASSERTCMP(sp->call, ==, call);
+ ASSERTCMP(sp->hdr.type, ==, RXRPC_PACKET_TYPE_DATA);
+
+ /* TODO: Fix the sequence number tracking */
+ ASSERTCMP(sp->hdr.seq, >=, call->rx_data_recv);
+ ASSERTCMP(sp->hdr.seq, <=, call->rx_data_recv + 1);
+ ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten);
+
+ call->rx_data_recv = sp->hdr.seq;
+ rxrpc_hard_ACK_data(call, sp);
+}
+EXPORT_SYMBOL(rxrpc_kernel_data_consumed);
+
/*
- * destroy a packet that has an RxRPC control buffer
- * - advance the hard-ACK state of the parent call (done here in case something
- * in the kernel bypasses recvmsg() and steals the packet directly off of the
- * socket receive queue)
+ * Destroy a packet that has an RxRPC control buffer
*/
void rxrpc_packet_destructor(struct sk_buff *skb)
{
@@ -112,9 +140,8 @@ void rxrpc_packet_destructor(struct sk_buff *skb)
_enter("%p{%p}", skb, call);
if (call) {
- /* send the final ACK on a client call */
- if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
- rxrpc_hard_ACK_data(call, sp);
+ if (atomic_dec_return(&call->skb_count) < 0)
+ BUG();
rxrpc_put_call(call);
sp->call = NULL;
}