diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index 593f586545eba9477006405d288a731f67e4372e..39123c06a5661316a80dd677b43b1e581a17e2e7 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -119,6 +119,7 @@ enum rxrpc_recvmsg_trace {
 	rxrpc_recvmsg_full,
 	rxrpc_recvmsg_hole,
 	rxrpc_recvmsg_next,
+	rxrpc_recvmsg_requeue,
 	rxrpc_recvmsg_return,
 	rxrpc_recvmsg_terminal,
 	rxrpc_recvmsg_to_be_accepted,
@@ -277,6 +278,7 @@ enum rxrpc_congest_change {
 	EM(rxrpc_recvmsg_full,			"FULL") \
 	EM(rxrpc_recvmsg_hole,			"HOLE") \
 	EM(rxrpc_recvmsg_next,			"NEXT") \
+	EM(rxrpc_recvmsg_requeue,		"REQU") \
 	EM(rxrpc_recvmsg_return,		"RETN") \
 	EM(rxrpc_recvmsg_terminal,		"TERM") \
 	EM(rxrpc_recvmsg_to_be_accepted,	"TBAC") \
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 199b46e93e64ee7786e8a8d441ba5eb5b02bf31f..7fb59c3f1542af319b882399b4a0f563dc0b8a0d 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -290,10 +290,11 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
 	cp.exclusive		= false;
 	cp.service_id		= srx->srx_service;
 	call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, gfp);
+	/* The socket has been unlocked. */
 	if (!IS_ERR(call))
 		call->notify_rx = notify_rx;
 
-	release_sock(&rx->sk);
+	mutex_unlock(&call->user_mutex);
 	_leave(" = %p", call);
 	return call;
 }
@@ -310,7 +311,10 @@ EXPORT_SYMBOL(rxrpc_kernel_begin_call);
 void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
 {
 	_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
+
+	mutex_lock(&call->user_mutex);
 	rxrpc_release_call(rxrpc_sk(sock->sk), call);
+	mutex_unlock(&call->user_mutex);
 	rxrpc_put_call(call, rxrpc_call_put_kernel);
 }
 EXPORT_SYMBOL(rxrpc_kernel_end_call);
@@ -450,14 +454,16 @@ static int rxrpc_sendmsg(struct socket *sock, struct msghdr *m, size_t len)
 	case RXRPC_SERVER_BOUND:
 	case RXRPC_SERVER_LISTENING:
 		ret = rxrpc_do_sendmsg(rx, m, len);
-		break;
+		/* The socket has been unlocked */
+		goto out;
 	default:
 		ret = -EINVAL;
-		break;
+		goto error_unlock;
 	}
 
 error_unlock:
 	release_sock(&rx->sk);
+out:
 	_leave(" = %d", ret);
 	return ret;
 }
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 12be432be9b2feb5dc9a85716c676888599b5624..26a7b1db1361e554733b0ff40a54d8e68e59af09 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -467,6 +467,7 @@ struct rxrpc_call {
 	struct rxrpc_connection	*conn;		/* connection carrying call */
 	struct rxrpc_peer	*peer;		/* Peer record for remote address */
 	struct rxrpc_sock __rcu	*socket;	/* socket responsible */
+	struct mutex		user_mutex;	/* User access mutex */
 	ktime_t			ack_at;		/* When deferred ACK needs to happen */
 	ktime_t			resend_at;	/* When next resend needs to happen */
 	ktime_t			ping_at;	/* When next to send a ping */
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 7c4c64ab8da2e241d63ee16a0ae3521c98dc2e5c..0ed181f53f32a0145c03b0006b92de5c7a0101aa 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -323,6 +323,8 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
  *
  * If we want to report an error, we mark the skb with the packet type and
  * abort code and return NULL.
+ *
+ * The call is returned with the user access mutex held.
  */
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 					   struct rxrpc_connection *conn,
@@ -371,6 +373,18 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 	trace_rxrpc_receive(call, rxrpc_receive_incoming,
 			    sp->hdr.serial, sp->hdr.seq);
 
+	/* Lock the call to prevent rxrpc_kernel_send/recv_data() and
+	 * sendmsg()/recvmsg() inconveniently stealing the mutex once the
+	 * notification is generated.
+	 *
+	 * The BUG should never happen because the kernel should be well
+	 * behaved enough not to access the call before the first notification
+	 * event and userspace is prevented from doing so until the state is
+	 * appropriate.
+	 */
+	if (!mutex_trylock(&call->user_mutex))
+		BUG();
+
 	/* Make the call live. */
 	rxrpc_incoming_call(rx, call, skb);
 	conn = call->conn;
@@ -429,10 +443,12 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 /*
  * handle acceptance of a call by userspace
  * - assign the user call ID to the call at the front of the queue
+ * - called with the socket locked.
  */
 struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
 				     unsigned long user_call_ID,
 				     rxrpc_notify_rx_t notify_rx)
+	__releases(&rx->sk.sk_lock.slock)
 {
 	struct rxrpc_call *call;
 	struct rb_node *parent, **pp;
@@ -446,6 +462,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
 
 	if (list_empty(&rx->to_be_accepted)) {
 		write_unlock(&rx->call_lock);
+		release_sock(&rx->sk);
 		kleave(" = -ENODATA [empty]");
 		return ERR_PTR(-ENODATA);
 	}
@@ -470,10 +487,39 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
 	 */
 	call = list_entry(rx->to_be_accepted.next,
 			  struct rxrpc_call, accept_link);
+	write_unlock(&rx->call_lock);
+
+	/* We need to gain the mutex from the interrupt handler without
+	 * upsetting lockdep, so we have to release it there and take it here.
+	 * We are, however, still holding the socket lock, so other accepts
+	 * must wait for us and no one can add the user ID behind our backs.
+	 */
+	if (mutex_lock_interruptible(&call->user_mutex) < 0) {
+		release_sock(&rx->sk);
+		kleave(" = -ERESTARTSYS");
+		return ERR_PTR(-ERESTARTSYS);
+	}
+
+	write_lock(&rx->call_lock);
 	list_del_init(&call->accept_link);
 	sk_acceptq_removed(&rx->sk);
 	rxrpc_see_call(call);
 
+	/* Find the user ID insertion point. */
+	pp = &rx->calls.rb_node;
+	parent = NULL;
+	while (*pp) {
+		parent = *pp;
+		call = rb_entry(parent, struct rxrpc_call, sock_node);
+
+		if (user_call_ID < call->user_call_ID)
+			pp = &(*pp)->rb_left;
+		else if (user_call_ID > call->user_call_ID)
+			pp = &(*pp)->rb_right;
+		else
+			BUG();
+	}
+
 	write_lock_bh(&call->state_lock);
 	switch (call->state) {
 	case RXRPC_CALL_SERVER_ACCEPTING:
@@ -499,6 +545,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
 	write_unlock(&rx->call_lock);
 	rxrpc_notify_socket(call);
 	rxrpc_service_prealloc(rx, GFP_KERNEL);
+	release_sock(&rx->sk);
 	_leave(" = %p{%d}", call, call->debug_id);
 	return call;
 
@@ -515,6 +562,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
 	write_unlock(&rx->call_lock);
 out:
 	rxrpc_service_prealloc(rx, GFP_KERNEL);
+	release_sock(&rx->sk);
 	_leave(" = %d", ret);
 	return ERR_PTR(ret);
 }
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 8b94db3c9b2ecb5f093798eeae0e8630ac0114ab..d79cd36987a95b86f2af9fac4688ab86e20f41d5 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -115,6 +115,7 @@ struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
 	if (!call->rxtx_annotations)
 		goto nomem_2;
 
+	mutex_init(&call->user_mutex);
 	setup_timer(&call->timer, rxrpc_call_timer_expired,
 		    (unsigned long)call);
 	INIT_WORK(&call->processor, &rxrpc_process_call);
@@ -194,14 +195,16 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call)
 }
 
 /*
- * set up a call for the given data
- * - called in process context with IRQs enabled
+ * Set up a call for the given parameters.
+ * - Called with the socket lock held, which it must release.
+ * - If it returns a call, the call's lock will need releasing by the caller.
  */
 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 					 struct rxrpc_conn_parameters *cp,
 					 struct sockaddr_rxrpc *srx,
 					 unsigned long user_call_ID,
 					 gfp_t gfp)
+	__releases(&rx->sk.sk_lock.slock)
 {
 	struct rxrpc_call *call, *xcall;
 	struct rb_node *parent, **pp;
@@ -212,6 +215,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 
 	call = rxrpc_alloc_client_call(srx, gfp);
 	if (IS_ERR(call)) {
+		release_sock(&rx->sk);
 		_leave(" = %ld", PTR_ERR(call));
 		return call;
 	}
@@ -219,6 +223,11 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 	trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage),
 			 here, (const void *)user_call_ID);
 
+	/* We need to protect a partially set up call against the user as we
+	 * will be acting outside the socket lock.
+	 */
+	mutex_lock(&call->user_mutex);
+
 	/* Publish the call, even though it is incompletely set up as yet */
 	write_lock(&rx->call_lock);
 
@@ -250,6 +259,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 	list_add_tail(&call->link, &rxrpc_calls);
 	write_unlock(&rxrpc_call_lock);
 
+	/* From this point on, the call is protected by its own lock. */
+	release_sock(&rx->sk);
+
 	/* Set up or get a connection record and set the protocol parameters,
 	 * including channel number and call ID.
 	 */
@@ -279,6 +291,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 	 */
 error_dup_user_ID:
 	write_unlock(&rx->call_lock);
+	release_sock(&rx->sk);
 	ret = -EEXIST;
 
 error:
@@ -287,6 +300,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 	trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
 			 here, ERR_PTR(ret));
 	rxrpc_release_call(rx, call);
+	mutex_unlock(&call->user_mutex);
 	rxrpc_put_call(call, rxrpc_call_put);
 	_leave(" = %d", ret);
 	return ERR_PTR(ret);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 78ec33477adf6c516fc26fd3c4991280164a6666..9f4cfa25af7c92c406e81d8003b8aa07c7892a04 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1194,6 +1194,7 @@ void rxrpc_data_ready(struct sock *udp_sk)
 			goto reject_packet;
 		}
 		rxrpc_send_ping(call, skb, skew);
+		mutex_unlock(&call->user_mutex);
 	}
 
 	rxrpc_input_call_packet(call, skb, skew);
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index f3a688e108430a9e9d32e822e54df0700940aacf..22447dbcc380211f1bd1eb84ad9fc2b48137ff0c 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -487,6 +487,20 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 
 	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 
+	/* We're going to drop the socket lock, so we need to lock the call
+	 * against interference by sendmsg.
+	 */
+	if (!mutex_trylock(&call->user_mutex)) {
+		ret = -EWOULDBLOCK;
+		if (flags & MSG_DONTWAIT)
+			goto error_requeue_call;
+		ret = -ERESTARTSYS;
+		if (mutex_lock_interruptible(&call->user_mutex) < 0)
+			goto error_requeue_call;
+	}
+
+	release_sock(&rx->sk);
+
 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 		BUG();
 
@@ -502,7 +516,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 				       &call->user_call_ID);
 		}
 		if (ret < 0)
-			goto error;
+			goto error_unlock_call;
 	}
 
 	if (msg->msg_name) {
@@ -533,12 +547,12 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 	}
 
 	if (ret < 0)
-		goto error;
+		goto error_unlock_call;
 
 	if (call->state == RXRPC_CALL_COMPLETE) {
 		ret = rxrpc_recvmsg_term(call, msg);
 		if (ret < 0)
-			goto error;
+			goto error_unlock_call;
 		if (!(flags & MSG_PEEK))
 			rxrpc_release_call(rx, call);
 		msg->msg_flags |= MSG_EOR;
@@ -551,8 +565,21 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 		msg->msg_flags &= ~MSG_MORE;
 	ret = copied;
 
-error:
+error_unlock_call:
+	mutex_unlock(&call->user_mutex);
 	rxrpc_put_call(call, rxrpc_call_put);
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
+	return ret;
+
+error_requeue_call:
+	if (!(flags & MSG_PEEK)) {
+		write_lock_bh(&rx->recvmsg_lock);
+		list_add(&call->recvmsg_link, &rx->recvmsg_q);
+		write_unlock_bh(&rx->recvmsg_lock);
+		trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
+	} else {
+		rxrpc_put_call(call, rxrpc_call_put);
+	}
 error_no_call:
 	release_sock(&rx->sk);
 	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
@@ -609,7 +636,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 	iov.iov_len = size - *_offset;
 	iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
 
-	lock_sock(sock->sk);
+	mutex_lock(&call->user_mutex);
 
 	switch (call->state) {
 	case RXRPC_CALL_CLIENT_RECV_REPLY:
@@ -648,7 +675,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 read_phase_complete:
 	ret = 1;
 out:
-	release_sock(sock->sk);
+	mutex_unlock(&call->user_mutex);
 	_leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
 	return ret;
 
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 0a6ef217aa8ada693f570ae03e9bede1e261e687..31c1538c1a8de69b8afe4335b9e5b263700ce980 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -59,9 +59,12 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
 		}
 
 		trace_rxrpc_transmit(call, rxrpc_transmit_wait);
-		release_sock(&rx->sk);
+		mutex_unlock(&call->user_mutex);
 		*timeo = schedule_timeout(*timeo);
-		lock_sock(&rx->sk);
+		if (mutex_lock_interruptible(&call->user_mutex) < 0) {
+			ret = sock_intr_errno(*timeo);
+			break;
+		}
 	}
 
 	remove_wait_queue(&call->waitq, &myself);
@@ -171,7 +174,7 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
 /*
  * send data through a socket
  * - must be called in process context
- * - caller holds the socket locked
+ * - The caller holds the call user access mutex, but not the socket lock.
  */
 static int rxrpc_send_data(struct rxrpc_sock *rx,
 			   struct rxrpc_call *call,
@@ -437,10 +440,13 @@ static int rxrpc_sendmsg_cmsg(struct msghdr *msg,
 
 /*
  * Create a new client call for sendmsg().
+ * - Called with the socket lock held, which it must release.
+ * - If it returns a call, the call's lock will need releasing by the caller.
  */
 static struct rxrpc_call *
 rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
 				  unsigned long user_call_ID, bool exclusive)
+	__releases(&rx->sk.sk_lock.slock)
 {
 	struct rxrpc_conn_parameters cp;
 	struct rxrpc_call *call;
@@ -450,8 +456,10 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
 
 	_enter("");
 
-	if (!msg->msg_name)
+	if (!msg->msg_name) {
+		release_sock(&rx->sk);
 		return ERR_PTR(-EDESTADDRREQ);
+	}
 
 	key = rx->key;
 	if (key && !rx->key->payload.data[0])
@@ -464,6 +472,7 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
 	cp.exclusive		= rx->exclusive | exclusive;
 	cp.service_id		= srx->srx_service;
 	call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, GFP_KERNEL);
+	/* The socket is now unlocked */
 
 	_leave(" = %p\n", call);
 	return call;
@@ -475,6 +484,7 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
  * - the socket may be either a client socket or a server socket
  */
 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
+	__releases(&rx->sk.sk_lock.slock)
 {
 	enum rxrpc_command cmd;
 	struct rxrpc_call *call;
@@ -488,12 +498,14 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
 	ret = rxrpc_sendmsg_cmsg(msg, &user_call_ID, &cmd, &abort_code,
 				 &exclusive);
 	if (ret < 0)
-		return ret;
+		goto error_release_sock;
 
 	if (cmd == RXRPC_CMD_ACCEPT) {
+		ret = -EINVAL;
 		if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
-			return -EINVAL;
+			goto error_release_sock;
 		call = rxrpc_accept_call(rx, user_call_ID, NULL);
+		/* The socket is now unlocked. */
 		if (IS_ERR(call))
 			return PTR_ERR(call);
 		rxrpc_put_call(call, rxrpc_call_put);
@@ -502,12 +514,29 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
 
 	call = rxrpc_find_call_by_user_ID(rx, user_call_ID);
 	if (!call) {
+		ret = -EBADSLT;
 		if (cmd != RXRPC_CMD_SEND_DATA)
-			return -EBADSLT;
+			goto error_release_sock;
+		ret = -EBUSY;
+		if (call->state == RXRPC_CALL_UNINITIALISED ||
+		    call->state == RXRPC_CALL_CLIENT_AWAIT_CONN ||
+		    call->state == RXRPC_CALL_SERVER_PREALLOC ||
+		    call->state == RXRPC_CALL_SERVER_SECURING ||
+		    call->state == RXRPC_CALL_SERVER_ACCEPTING)
+			goto error_release_sock;
 		call = rxrpc_new_client_call_for_sendmsg(rx, msg, user_call_ID,
 							 exclusive);
+		/* The socket is now unlocked... */
 		if (IS_ERR(call))
 			return PTR_ERR(call);
+		/* ... and we have the call lock. */
+	} else {
+		ret = mutex_lock_interruptible(&call->user_mutex);
+		release_sock(&rx->sk);
+		if (ret < 0) {
+			ret = -ERESTARTSYS;
+			goto error_put;
+		}
 	}
 
 	_debug("CALL %d USR %lx ST %d on CONN %p",
@@ -535,9 +564,15 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
 		ret = rxrpc_send_data(rx, call, msg, len);
 	}
 
+	mutex_unlock(&call->user_mutex);
+error_put:
 	rxrpc_put_call(call, rxrpc_call_put);
 	_leave(" = %d", ret);
 	return ret;
+
+error_release_sock:
+	release_sock(&rx->sk);
+	return ret;
 }
 
 /**
@@ -562,7 +597,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
 	ASSERTCMP(msg->msg_name, ==, NULL);
 	ASSERTCMP(msg->msg_control, ==, NULL);
 
-	lock_sock(sock->sk);
+	mutex_lock(&call->user_mutex);
 
 	_debug("CALL %d USR %lx ST %d on CONN %p",
 	       call->debug_id, call->user_call_ID, call->state, call->conn);
@@ -577,7 +612,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
 		ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len);
 	}
 
-	release_sock(sock->sk);
+	mutex_unlock(&call->user_mutex);
 	_leave(" = %d", ret);
 	return ret;
 }
@@ -598,12 +633,12 @@ void rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
 {
 	_enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
 
-	lock_sock(sock->sk);
+	mutex_lock(&call->user_mutex);
 
 	if (rxrpc_abort_call(why, call, 0, abort_code, error))
 		rxrpc_send_abort_packet(call);
 
-	release_sock(sock->sk);
+	mutex_unlock(&call->user_mutex);
 	_leave("");
 }