net/rds: Use the first lane until RDS_EXTHDR_NPATHS arrives

Instead of just blocking the sender until "c_npaths" is known
(it gets updated upon the receipt of a MPRDS PONG message),
simply use the first lane (cp_index#0).

But just using the first lane isn't enough.

As soon as we enqueue messages on a different lane, we'd run the risk
of out-of-order delivery of RDS messages.

Earlier messages enqueued on "cp_index == 0" could be delivered later
than more recent messages enqueued on "cp_index > 0", mostly because of
possible head of line blocking issues causing the first lane to be
slower.

To avoid that, we simply take a snapshot of "cp_next_tx_seq" at the
time we're about to fan-out to more lanes.

Then we delay the transmission of messages enqueued on other lanes
with "cp_index > 0" until cp_index#0 caught up with the delivery of
new messages (from "cp_send_queue") as well as in-flight
messages (from "cp_retrans") that haven't been acknowledged yet
by the receiver.

We also add a new counter "mprds_catchup_tx0_retries" to keep track
of how many times "rds_send_xmit" had to suspend activities,
because it was waiting for the first lane to catch up.

Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
Link: https://patch.msgid.link/20260203055723.1085751-8-achender@kernel.org
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
Gerd Rausch 2026-02-02 22:57:22 -07:00 committed by Jakub Kicinski
parent 9d30ad8a8b
commit a1f53d5fb6
4 changed files with 90 additions and 39 deletions

View File

@ -170,6 +170,8 @@ struct rds_connection {
u32 c_my_gen_num;
u32 c_peer_gen_num;
u64 c_cp0_mprds_catchup_tx_seq;
};
static inline
@ -749,6 +751,7 @@ struct rds_statistics {
u64 s_recv_bytes_added_to_socket;
u64 s_recv_bytes_removed_from_socket;
u64 s_send_stuck_rm;
u64 s_mprds_catchup_tx0_retries;
};
/* af_rds.c */

View File

@ -208,6 +208,9 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
} buffer;
bool new_with_sport_idx = false;
u32 new_peer_gen_num = 0;
int new_npaths;
new_npaths = conn->c_npaths;
while (1) {
len = sizeof(buffer);
@ -217,8 +220,8 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
/* Process extension header here */
switch (type) {
case RDS_EXTHDR_NPATHS:
conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
be16_to_cpu(buffer.rds_npaths));
new_npaths = min_t(int, RDS_MPATH_WORKERS,
be16_to_cpu(buffer.rds_npaths));
break;
case RDS_EXTHDR_GEN_NUM:
new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
@ -233,8 +236,22 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
}
conn->c_with_sport_idx = new_with_sport_idx;
if (new_npaths > 1 && new_npaths != conn->c_npaths) {
/* We're about to fan-out.
* Make sure that messages from cp_index#0
* are sent prior to handling other lanes.
*/
struct rds_conn_path *cp0 = conn->c_path;
unsigned long flags;
spin_lock_irqsave(&cp0->cp_lock, flags);
conn->c_cp0_mprds_catchup_tx_seq = cp0->cp_next_tx_seq;
spin_unlock_irqrestore(&cp0->cp_lock, flags);
}
/* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
conn->c_npaths = max_t(int, conn->c_npaths, 1);
conn->c_npaths = max_t(int, new_npaths, 1);
conn->c_ping_triggered = 0;
rds_conn_peer_gen_update(conn, new_peer_gen_num);

View File

@ -119,6 +119,57 @@ static void release_in_xmit(struct rds_conn_path *cp)
wake_up_all(&cp->cp_waitq);
}
/*
* Helper function for multipath fanout to ensure lane 0 transmits queued
* messages before other lanes to prevent out-of-order delivery.
*
* Returns true if lane 0 still has messages or false otherwise
*/
static bool rds_mprds_cp0_catchup(struct rds_connection *conn)
{
struct rds_conn_path *cp0 = conn->c_path;
struct rds_message *rm0;
unsigned long flags;
bool ret = false;
spin_lock_irqsave(&cp0->cp_lock, flags);
/* the oldest / first message in the retransmit queue
* has to be at or beyond c_cp0_mprds_catchup_tx_seq
*/
if (!list_empty(&cp0->cp_retrans)) {
rm0 = list_entry(cp0->cp_retrans.next, struct rds_message,
m_conn_item);
if (be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
conn->c_cp0_mprds_catchup_tx_seq) {
/* the retransmit queue of cp_index#0 has not
* quite caught up yet
*/
ret = true;
goto unlock;
}
}
/* the oldest / first message of the send queue
* has to be at or beyond c_cp0_mprds_catchup_tx_seq
*/
rm0 = cp0->cp_xmit_rm;
if (!rm0 && !list_empty(&cp0->cp_send_queue))
rm0 = list_entry(cp0->cp_send_queue.next, struct rds_message,
m_conn_item);
if (rm0 && be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
conn->c_cp0_mprds_catchup_tx_seq) {
/* the send queue of cp_index#0 has not quite
* caught up yet
*/
ret = true;
}
unlock:
spin_unlock_irqrestore(&cp0->cp_lock, flags);
return ret;
}
/*
* We're making the conscious trade-off here to only send one message
* down the connection at a time.
@ -248,6 +299,14 @@ int rds_send_xmit(struct rds_conn_path *cp)
if (batch_count >= send_batch_count)
goto over_batch;
/* make sure cp_index#0 caught up during fan-out in
* order to avoid lane races
*/
if (cp->cp_index > 0 && rds_mprds_cp0_catchup(conn)) {
rds_stats_inc(s_mprds_catchup_tx0_retries);
goto over_batch;
}
spin_lock_irqsave(&cp->cp_lock, flags);
if (!list_empty(&cp->cp_send_queue)) {
@ -1042,39 +1101,6 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
return ret;
}
static int rds_send_mprds_hash(struct rds_sock *rs,
struct rds_connection *conn, int nonblock)
{
int hash;
if (conn->c_npaths == 0)
hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
else
hash = RDS_MPATH_HASH(rs, conn->c_npaths);
if (conn->c_npaths == 0 && hash != 0) {
rds_send_ping(conn, 0);
/* The underlying connection is not up yet. Need to wait
* until it is up to be sure that the non-zero c_path can be
* used. But if we are interrupted, we have to use the zero
* c_path in case the connection ends up being non-MP capable.
*/
if (conn->c_npaths == 0) {
/* Cannot wait for the connection be made, so just use
* the base c_path.
*/
if (nonblock)
return 0;
if (wait_event_interruptible(conn->c_hs_waitq,
conn->c_npaths != 0))
hash = 0;
}
if (conn->c_npaths == 1)
hash = 0;
}
return hash;
}
static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
{
struct rds_rdma_args *args;
@ -1304,10 +1330,14 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
rs->rs_conn = conn;
}
if (conn->c_trans->t_mp_capable)
cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
else
if (conn->c_trans->t_mp_capable) {
/* Use c_path[0] until we learn that
* the peer supports more (c_npaths > 1)
*/
cpath = &conn->c_path[RDS_MPATH_HASH(rs, conn->c_npaths ? : 1)];
} else {
cpath = &conn->c_path[0];
}
rm->m_conn_path = cpath;

View File

@ -79,6 +79,7 @@ static const char *const rds_stat_names[] = {
"recv_bytes_added_to_sock",
"recv_bytes_freed_fromsock",
"send_stuck_rm",
"mprds_catchup_tx0_retries",
};
void rds_stats_info_copy(struct rds_info_iterator *iter,