Skip to content

Commit c4aa729

Browse files
akhi3030qkniep
andauthored
refactor: handle repair requests and replies on separate networks and tokio tasks (#160)
Closes: #146 --------- Co-authored-by: Quentin Kniep <[email protected]>
1 parent 002c3b8 commit c4aa729

File tree

15 files changed

+255
-139
lines changed

15 files changed

+255
-139
lines changed

src/all2all/robust.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ mod tests {
9494
voting_pubkey: voting_sk.to_pk(),
9595
all2all_address: localhost_ip_sockaddr(i.try_into().unwrap()),
9696
disseminator_address: dontcare_sockaddr(),
97-
repair_address: dontcare_sockaddr(),
97+
repair_request_address: dontcare_sockaddr(),
98+
repair_response_address: dontcare_sockaddr(),
9899
});
99100
}
100101

src/all2all/trivial.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ mod tests {
8080
voting_pubkey: voting_sk.to_pk(),
8181
all2all_address: localhost_ip_sockaddr(i.try_into().unwrap()),
8282
disseminator_address: dontcare_sockaddr(),
83-
repair_address: dontcare_sockaddr(),
83+
repair_request_address: dontcare_sockaddr(),
84+
repair_response_address: dontcare_sockaddr(),
8485
});
8586
}
8687

src/bin/node.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,15 @@ fn create_node(config: ConfigFile) -> color_eyre::Result<Node> {
120120
let network = UdpNetwork::new(start_port + 1);
121121
let disseminator = Rotor::new(network, epoch_info.clone());
122122
let repair_network = UdpNetwork::new(start_port + 2);
123-
let txs_receiver = UdpNetwork::new(start_port + 3);
123+
let repair_request_network = UdpNetwork::new(start_port + 3);
124+
let txs_receiver = UdpNetwork::new(start_port + 4);
124125
Ok(Alpenglow::new(
125126
config.identity_key,
126127
config.voting_key,
127128
all2all,
128129
disseminator,
129130
repair_network,
131+
repair_request_network,
130132
epoch_info,
131133
txs_receiver,
132134
))
@@ -161,7 +163,8 @@ async fn create_node_configs(
161163
voting_pubkey: voting_sks[id as usize].to_pk(),
162164
all2all_address: sockaddr,
163165
disseminator_address: SocketAddr::new(sockaddr.ip(), sockaddr.port() + 1),
164-
repair_address: SocketAddr::new(sockaddr.ip(), sockaddr.port() + 2),
166+
repair_request_address: SocketAddr::new(sockaddr.ip(), sockaddr.port() + 2),
167+
repair_response_address: SocketAddr::new(sockaddr.ip(), sockaddr.port() + 3),
165168
});
166169
}
167170

src/bin/performance_test.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ async fn create_test_nodes(count: u64) -> Vec<TestNode> {
4141
let core = Arc::new(SimulatedNetworkCore::default().with_packet_loss(0.0));
4242
let mut networks = VecDeque::new();
4343
let mut udp_networks = VecDeque::new();
44-
for i in 0..4 * count {
45-
if i % 4 == 2 || i % 4 == 3 {
44+
for i in 0..5 * count {
45+
if i % 5 == 2 || i % 5 == 3 || i % 5 == 4 {
4646
udp_networks.push_back(UdpNetwork::new_with_any_port());
4747
} else {
4848
networks.push_back(core.join_unlimited(i).await);
@@ -51,19 +51,19 @@ async fn create_test_nodes(count: u64) -> Vec<TestNode> {
5151
for a in 0..count {
5252
for b in 0..count {
5353
if a < 6 && b < 6 {
54-
core.set_latency(4 * a, 4 * b, Duration::from_millis(20))
54+
core.set_latency(5 * a, 5 * b, Duration::from_millis(20))
5555
.await;
56-
core.set_latency(4 * a + 1, 4 * b + 1, Duration::from_millis(20))
56+
core.set_latency(5 * a + 1, 5 * b + 1, Duration::from_millis(20))
5757
.await;
5858
} else if (6..10).contains(&a) && (6..10).contains(&b) {
59-
core.set_latency(4 * a, 4 * b, Duration::from_millis(60))
59+
core.set_latency(5 * a, 5 * b, Duration::from_millis(60))
6060
.await;
61-
core.set_latency(4 * a + 1, 4 * b + 1, Duration::from_millis(60))
61+
core.set_latency(5 * a + 1, 5 * b + 1, Duration::from_millis(60))
6262
.await;
6363
} else {
64-
core.set_latency(4 * a, 4 * b, Duration::from_millis(100))
64+
core.set_latency(5 * a, 5 * b, Duration::from_millis(100))
6565
.await;
66-
core.set_latency(4 * a + 1, 4 * b + 1, Duration::from_millis(100))
66+
core.set_latency(5 * a + 1, 5 * b + 1, Duration::from_millis(100))
6767
.await;
6868
}
6969
}
@@ -77,17 +77,19 @@ async fn create_test_nodes(count: u64) -> Vec<TestNode> {
7777
for id in 0..count {
7878
sks.push(SecretKey::new(&mut rng));
7979
voting_sks.push(aggsig::SecretKey::new(&mut rng));
80-
let all2all_address = localhost_ip_sockaddr((4 * id).try_into().unwrap());
81-
let disseminator_address = localhost_ip_sockaddr((4 * id + 1).try_into().unwrap());
82-
let repair_address = localhost_ip_sockaddr(udp_networks[id as usize].port());
80+
let all2all_address = localhost_ip_sockaddr((5 * id).try_into().unwrap());
81+
let disseminator_address = localhost_ip_sockaddr((5 * id + 1).try_into().unwrap());
82+
let repair_request_address = localhost_ip_sockaddr(udp_networks[id as usize].port());
83+
let repair_response_address = localhost_ip_sockaddr(udp_networks[id as usize].port());
8384
validators.push(ValidatorInfo {
8485
id,
8586
stake: 1,
8687
pubkey: sks[id as usize].to_pk(),
8788
voting_pubkey: voting_sks[id as usize].to_pk(),
8889
all2all_address,
8990
disseminator_address,
90-
repair_address,
91+
repair_request_address,
92+
repair_response_address,
9193
});
9294
}
9395

@@ -99,13 +101,15 @@ async fn create_test_nodes(count: u64) -> Vec<TestNode> {
99101
let all2all = TrivialAll2All::new(validators.clone(), networks.pop_front().unwrap());
100102
let disseminator = Rotor::new(networks.pop_front().unwrap(), epoch_info.clone());
101103
let repair_network = udp_networks.pop_front().unwrap();
104+
let repair_request_network = udp_networks.pop_front().unwrap();
102105
let txs_receiver = udp_networks.pop_front().unwrap();
103106
Alpenglow::new(
104107
sks[v.id as usize].clone(),
105108
voting_sks[v.id as usize].clone(),
106109
all2all,
107110
disseminator,
108111
repair_network,
112+
repair_request_network,
109113
epoch_info,
110114
txs_receiver,
111115
)

src/consensus.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use self::votor::Votor;
4646
use crate::consensus::block_producer::BlockProducer;
4747
use crate::crypto::{aggsig, signature};
4848
use crate::network::{Network, NetworkMessage, NetworkSendError};
49-
use crate::repair::Repair;
49+
use crate::repair::{Repair, RepairRequestHandler};
5050
use crate::shredder::Shred;
5151
use crate::{All2All, Disseminator, Slot, ValidatorInfo};
5252

@@ -93,13 +93,18 @@ where
9393
T: Network + Sync + Send + 'static,
9494
{
9595
/// Creates a new Alpenglow consensus node.
96+
///
97+
/// `repair_network` - Network from which the node sends [`RepairRequest`] messages and receives [`RepairResponse`] messages.
98+
/// `repair_request_network` - Network where the node receives [`RepairRequest`] messages and sends [`RepairResponse`] messages.
9699
#[must_use]
100+
#[allow(clippy::too_many_arguments)]
97101
pub fn new<R: Network + Sync + Send + 'static>(
98102
secret_key: signature::SecretKey,
99103
voting_secret_key: aggsig::SecretKey,
100104
all2all: A,
101105
disseminator: D,
102106
repair_network: R,
107+
repair_request_network: R,
103108
epoch_info: Arc<EpochInfo>,
104109
txs_receiver: T,
105110
) -> Self {
@@ -119,6 +124,14 @@ where
119124
));
120125
let pool = Arc::new(RwLock::new(pool));
121126

127+
let repair_request_handler = RepairRequestHandler::new(
128+
epoch_info.clone(),
129+
blockstore.clone(),
130+
repair_request_network,
131+
);
132+
let _repair_request_handler =
133+
tokio::spawn(async move { repair_request_handler.run().await });
134+
122135
let mut repair = Repair::new(
123136
Arc::clone(&blockstore),
124137
Arc::clone(&pool),

src/consensus/blockstore.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,8 @@ mod tests {
347347
voting_pubkey: voting_sk.to_pk(),
348348
all2all_address: dontcare_sockaddr(),
349349
disseminator_address: dontcare_sockaddr(),
350-
repair_address: dontcare_sockaddr(),
350+
repair_request_address: dontcare_sockaddr(),
351+
repair_response_address: dontcare_sockaddr(),
351352
};
352353
let validators = vec![info];
353354
let epoch_info = EpochInfo::new(0, validators);

src/consensus/cert.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,8 @@ mod tests {
577577
voting_pubkey: voting_sks.last().unwrap().to_pk(),
578578
all2all_address: dontcare_sockaddr(),
579579
disseminator_address: dontcare_sockaddr(),
580-
repair_address: dontcare_sockaddr(),
580+
repair_request_address: dontcare_sockaddr(),
581+
repair_response_address: dontcare_sockaddr(),
581582
});
582583
}
583584

src/disseminator/rotor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ mod tests {
164164
voting_pubkey: voting_sks[i as usize].to_pk(),
165165
all2all_address: dontcare_sockaddr(),
166166
disseminator_address: localhost_ip_sockaddr(base_port + i as u16),
167-
repair_address: dontcare_sockaddr(),
167+
repair_request_address: dontcare_sockaddr(),
168+
repair_response_address: dontcare_sockaddr(),
168169
});
169170
}
170171

src/disseminator/rotor/sampling_strategy.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,8 @@ mod tests {
672672
voting_pubkey: voting_sk.to_pk(),
673673
all2all_address: dontcare_sockaddr(),
674674
disseminator_address: dontcare_sockaddr(),
675-
repair_address: dontcare_sockaddr(),
675+
repair_request_address: dontcare_sockaddr(),
676+
repair_response_address: dontcare_sockaddr(),
676677
});
677678
}
678679
validators

src/disseminator/trivial.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ mod tests {
8282
voting_pubkey: voting_sks[i as usize].to_pk(),
8383
all2all_address: dontcare_sockaddr(),
8484
disseminator_address: localhost_ip_sockaddr(base_port + i as u16),
85-
repair_address: dontcare_sockaddr(),
85+
repair_request_address: dontcare_sockaddr(),
86+
repair_response_address: dontcare_sockaddr(),
8687
});
8788
}
8889

0 commit comments

Comments
 (0)