diff options
author | David S. Miller <davem@davemloft.net> | 2018-01-05 13:39:19 -0500 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2018-01-05 13:39:19 -0500 |
commit | ad521763e6060b3cb11df572cab3339526cd82c0 (patch) | |
tree | 67f61aa4784da3fba56aa43e3626d3f3b6562c78 | |
parent | eb9aa1bfbad8c9fc280adf43cb480911295cfa3f (diff) | |
parent | 3db6e0d172c94bd9953a1347c55ffb64b1d2e74f (diff) |
Merge branch 'rds-use-RCU-between-work-enqueue-and-connection-teardown'
Sowmini Varadhan says:
====================
rds: use RCU between work-enqueue and connection teardown
This patchset follows up on the root-cause mentioned in
https://www.spinics.net/lists/netdev/msg472849.html
Patch1 implements some code refactoring that was suggeseted
as an enhancement in http://patchwork.ozlabs.org/patch/843157/
It replaces the c_destroy_in_prog bit in rds_connection with
an atomically managed flag in rds_conn_path.
Patch2 builds on Patch1 and uses RCU to make sure that
work is only enqueued if the connection destroy is not already
in progress: the test-flag-and-enqueue is done under rcu_read_lock,
while destroy first sets the flag, uses synchronize_rcu to
wait for existing reader threads to complete, and then starts
all the work-cancellation.
Since I have not been able to reproduce the original stack traces
reported by syszbot, and these are fixes for a race condition that
are based on code-inspection I am not marking these as reported-by
at this time.
====================
Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r-- | net/rds/cong.c | 10 | ||||
-rw-r--r-- | net/rds/connection.c | 24 | ||||
-rw-r--r-- | net/rds/rds.h | 4 | ||||
-rw-r--r-- | net/rds/send.c | 37 | ||||
-rw-r--r-- | net/rds/tcp_connect.c | 2 | ||||
-rw-r--r-- | net/rds/tcp_recv.c | 8 | ||||
-rw-r--r-- | net/rds/tcp_send.c | 5 | ||||
-rw-r--r-- | net/rds/threads.c | 20 |
8 files changed, 86 insertions, 24 deletions
diff --git a/net/rds/cong.c b/net/rds/cong.c index 8398fee7c866..8d19fd25dce3 100644 --- a/net/rds/cong.c +++ b/net/rds/cong.c @@ -219,7 +219,11 @@ void rds_cong_queue_updates(struct rds_cong_map *map) spin_lock_irqsave(&rds_cong_lock, flags); list_for_each_entry(conn, &map->m_conn_list, c_map_item) { - if (!test_and_set_bit(0, &conn->c_map_queued)) { + struct rds_conn_path *cp = &conn->c_path[0]; + + rcu_read_lock(); + if (!test_and_set_bit(0, &conn->c_map_queued) && + !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) { rds_stats_inc(s_cong_update_queued); /* We cannot inline the call to rds_send_xmit() here * for two reasons (both pertaining to a TCP transport): @@ -235,9 +239,9 @@ void rds_cong_queue_updates(struct rds_cong_map *map) * therefore trigger warnings. * Defer the xmit to rds_send_worker() instead. */ - queue_delayed_work(rds_wq, - &conn->c_path[0].cp_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); } + rcu_read_unlock(); } spin_unlock_irqrestore(&rds_cong_lock, flags); diff --git a/net/rds/connection.c b/net/rds/connection.c index 6492c0b608a4..b10c0ef36d8d 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -366,8 +366,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp) * to the conn hash, so we never trigger a reconnect on this * conn - the reconnect is always triggered by the active peer. */ cancel_delayed_work_sync(&cp->cp_conn_w); - if (conn->c_destroy_in_prog) - return; rcu_read_lock(); if (!hlist_unhashed(&conn->c_hash_node)) { rcu_read_unlock(); @@ -384,10 +382,13 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp) { struct rds_message *rm, *rtmp; + set_bit(RDS_DESTROY_PENDING, &cp->cp_flags); + if (!cp->cp_transport_data) return; /* make sure lingering queued work won't try to ref the conn */ + synchronize_rcu(); cancel_delayed_work_sync(&cp->cp_send_w); cancel_delayed_work_sync(&cp->cp_recv_w); @@ -405,6 +406,11 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp) if (cp->cp_xmit_rm) rds_message_put(cp->cp_xmit_rm); + WARN_ON(delayed_work_pending(&cp->cp_send_w)); + WARN_ON(delayed_work_pending(&cp->cp_recv_w)); + WARN_ON(delayed_work_pending(&cp->cp_conn_w)); + WARN_ON(work_pending(&cp->cp_down_w)); + cp->cp_conn->c_trans->conn_free(cp->cp_transport_data); } @@ -426,7 +432,6 @@ void rds_conn_destroy(struct rds_connection *conn) "%pI4\n", conn, &conn->c_laddr, &conn->c_faddr); - conn->c_destroy_in_prog = 1; /* Ensure conn will not be scheduled for reconnect */ spin_lock_irq(&rds_conn_lock); hlist_del_init_rcu(&conn->c_hash_node); @@ -685,10 +690,13 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy) { atomic_set(&cp->cp_state, RDS_CONN_ERROR); - if (!destroy && cp->cp_conn->c_destroy_in_prog) + rcu_read_lock(); + if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) { + rcu_read_unlock(); return; - + } queue_work(rds_wq, &cp->cp_down_w); + rcu_read_unlock(); } EXPORT_SYMBOL_GPL(rds_conn_path_drop); @@ -705,9 +713,15 @@ EXPORT_SYMBOL_GPL(rds_conn_drop); */ void rds_conn_path_connect_if_down(struct rds_conn_path *cp) { + rcu_read_lock(); + if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) { + rcu_read_unlock(); + return; + } if (rds_conn_path_state(cp) == RDS_CONN_DOWN && !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); + rcu_read_unlock(); } EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down); diff --git a/net/rds/rds.h b/net/rds/rds.h index d09f6c1facb4..374ae83b60d4 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -88,6 +88,7 @@ enum { #define RDS_RECONNECT_PENDING 1 #define RDS_IN_XMIT 2 #define RDS_RECV_REFILL 3 +#define RDS_DESTROY_PENDING 4 /* Max number of multipaths per RDS connection. Must be a power of 2 */ #define RDS_MPATH_WORKERS 8 @@ -139,8 +140,7 @@ struct rds_connection { __be32 c_faddr; unsigned int c_loopback:1, c_ping_triggered:1, - c_destroy_in_prog:1, - c_pad_to_32:29; + c_pad_to_32:30; int c_npaths; struct rds_connection *c_passive; struct rds_transport *c_trans; diff --git a/net/rds/send.c b/net/rds/send.c index f72466c63f0c..d3e32d1f3c7d 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -162,6 +162,12 @@ restart: goto out; } + if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) { + release_in_xmit(cp); + ret = -ENETUNREACH; /* dont requeue send work */ + goto out; + } + /* * we record the send generation after doing the xmit acquire. * if someone else manages to jump in and do some work, we'll use @@ -437,7 +443,12 @@ over_batch: !list_empty(&cp->cp_send_queue)) && !raced) { if (batch_count < send_batch_count) goto restart; - queue_delayed_work(rds_wq, &cp->cp_send_w, 1); + rcu_read_lock(); + if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) + ret = -ENETUNREACH; + else + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); + rcu_read_unlock(); } else if (raced) { rds_stats_inc(s_send_lock_queue_raced); } @@ -1151,6 +1162,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) else cpath = &conn->c_path[0]; + if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) { + ret = -EAGAIN; + goto out; + } + rds_conn_path_connect_if_down(cpath); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); @@ -1190,9 +1206,17 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) rds_stats_inc(s_send_queued); ret = rds_send_xmit(cpath); - if (ret == -ENOMEM || ret == -EAGAIN) - queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); - + if (ret == -ENOMEM || ret == -EAGAIN) { + ret = 0; + rcu_read_lock(); + if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) + ret = -ENETUNREACH; + else + queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); + rcu_read_unlock(); + } + if (ret) + goto out; rds_message_put(rm); return payload_len; @@ -1270,7 +1294,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport, rds_stats_inc(s_send_pong); /* schedule the send work on rds_wq */ - queue_delayed_work(rds_wq, &cp->cp_send_w, 1); + rcu_read_lock(); + if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_send_w, 1); + rcu_read_unlock(); rds_message_put(rm); return 0; diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c index 46f74dad0e16..534c67aeb20f 100644 --- a/net/rds/tcp_connect.c +++ b/net/rds/tcp_connect.c @@ -170,7 +170,7 @@ void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp) cp->cp_conn, tc, sock); if (sock) { - if (cp->cp_conn->c_destroy_in_prog) + if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) rds_tcp_set_linger(sock); sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN); lock_sock(sock->sk); diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index e006ef8e6d40..dd707b9e73e5 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -321,8 +321,12 @@ void rds_tcp_data_ready(struct sock *sk) ready = tc->t_orig_data_ready; rds_tcp_stats_inc(s_tcp_data_ready_calls); - if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) - queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); + if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) { + rcu_read_lock(); + if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); + rcu_read_unlock(); + } out: read_unlock_bh(&sk->sk_callback_lock); ready(sk); diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c index dc860d1bb608..73c74763ca72 100644 --- a/net/rds/tcp_send.c +++ b/net/rds/tcp_send.c @@ -202,8 +202,11 @@ void rds_tcp_write_space(struct sock *sk) tc->t_last_seen_una = rds_tcp_snd_una(tc); rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked); - if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) + rcu_read_lock(); + if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf && + !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) queue_delayed_work(rds_wq, &cp->cp_send_w, 0); + rcu_read_unlock(); out: read_unlock_bh(&sk->sk_callback_lock); diff --git a/net/rds/threads.c b/net/rds/threads.c index f121daa402c8..eb76db1360b0 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -87,8 +87,12 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr) cp->cp_reconnect_jiffies = 0; set_bit(0, &cp->cp_conn->c_map_queued); - queue_delayed_work(rds_wq, &cp->cp_send_w, 0); - queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); + rcu_read_lock(); + if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) { + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); + } + rcu_read_unlock(); } EXPORT_SYMBOL_GPL(rds_connect_path_complete); @@ -133,7 +137,10 @@ void rds_queue_reconnect(struct rds_conn_path *cp) set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); if (cp->cp_reconnect_jiffies == 0) { cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; - queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); + rcu_read_lock(); + if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); + rcu_read_unlock(); return; } @@ -141,8 +148,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp) rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n", rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies, conn, &conn->c_laddr, &conn->c_faddr); - queue_delayed_work(rds_wq, &cp->cp_conn_w, - rand % cp->cp_reconnect_jiffies); + rcu_read_lock(); + if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_conn_w, + rand % cp->cp_reconnect_jiffies); + rcu_read_unlock(); cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2, rds_sysctl_reconnect_max_jiffies); |