Skip to content

Commit 458cdc9

Browse files
Flash0615InChh
andauthored
feat: Implement Xline native watch mechanism for rks (#137)
* refactor: implement xline-based watch mechanism Replace broadcast simulation with xline watch for pods and nodes. Signed-off-by: Haojun Xie <[email protected]> * refactor: implement xline-based watch mechanism >> Replace broadcast simulation with xline watch for pods and nodes. Signed-off-by: Haojun Xie <[email protected]> * fix: fix check errors Signed-off-by: InChh <[email protected]> * chore: fix test errors * chore: apply copilot suggestion --------- Signed-off-by: Haojun Xie <[email protected]> Signed-off-by: InChh <[email protected]> Co-authored-by: InChh <[email protected]>
1 parent 0419b63 commit 458cdc9

File tree

21 files changed

+606
-241
lines changed

21 files changed

+606
-241
lines changed

project/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

project/rks/BUCK

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ cargo.rust_binary(
2828
named_deps = {
2929
"async_trait": "//third-party/rust/crates/async-trait/0.1.89:async-trait-0.1.89",
3030
"etcd_client": "//third-party/rust/crates/etcd-client/0.16.1:etcd-client-0.16.1",
31+
"futures_util": "//third-party/rust/crates/futures-util/0.3.31:futures-util-0.3.31",
3132
"native_tls": "//third-party/rust/crates/native-tls/0.2.14:native-tls-0.2.14",
3233
"netlink_packet_route": "//third-party/rust/crates/netlink-packet-route/0.22.0:netlink-packet-route-0.22.0",
3334
"tokio_util": "//third-party/rust/crates/tokio-util/0.7.16:tokio-util-0.7.16",
@@ -61,6 +62,7 @@ cargo.rust_binary(
6162
"//third-party/rust/crates/serde_yaml/0.9.34+deprecated:serde_yaml-0.9.34+deprecated",
6263
"//third-party/rust/crates/tempfile/3.22.0:tempfile-3.22.0",
6364
"//third-party/rust/crates/thiserror/2.0.16:thiserror-2.0.16",
65+
"//third-party/rust/crates/time/0.3.43:time-0.3.43",
6466
"//third-party/rust/crates/tokio/1.47.1:tokio-1.47.1",
6567
"//third-party/rust/crates/tonic/0.13.1:tonic-0.13.1",
6668
],
@@ -91,6 +93,7 @@ cargo.rust_library(
9193
named_deps = {
9294
"async_trait": "//third-party/rust/crates/async-trait/0.1.89:async-trait-0.1.89",
9395
"etcd_client": "//third-party/rust/crates/etcd-client/0.16.1:etcd-client-0.16.1",
96+
"futures_util": "//third-party/rust/crates/futures-util/0.3.31:futures-util-0.3.31",
9497
"native_tls": "//third-party/rust/crates/native-tls/0.2.14:native-tls-0.2.14",
9598
"netlink_packet_route": "//third-party/rust/crates/netlink-packet-route/0.22.0:netlink-packet-route-0.22.0",
9699
"tokio_util": "//third-party/rust/crates/tokio-util/0.7.16:tokio-util-0.7.16",
@@ -124,6 +127,7 @@ cargo.rust_library(
124127
"//third-party/rust/crates/serde_yaml/0.9.34+deprecated:serde_yaml-0.9.34+deprecated",
125128
"//third-party/rust/crates/tempfile/3.22.0:tempfile-3.22.0",
126129
"//third-party/rust/crates/thiserror/2.0.16:thiserror-2.0.16",
130+
"//third-party/rust/crates/time/0.3.43:time-0.3.43",
127131
"//third-party/rust/crates/tokio/1.47.1:tokio-1.47.1",
128132
"//third-party/rust/crates/tonic/0.13.1:tonic-0.13.1",
129133
],

project/rks/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ tempfile = "3.20.0"
4141
tokio-util = "0.7.0"
4242
env_logger = "0.11.0"
4343
once_cell = "1.21.0"
44+
futures-util = "0.3"
45+
time = "0.3"
4446

4547
[dev-dependencies]
4648
serial_test = "3.2.0"

project/rks/src/api/xlinestore.rs

Lines changed: 102 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,77 @@
1-
#![allow(dead_code)]
21
use anyhow::Result;
3-
use common::Node;
4-
use etcd_client::{Client, GetOptions, PutOptions};
5-
use serde_yaml;
2+
use etcd_client::{Client, GetOptions, PutOptions, WatchOptions, WatchStream, Watcher};
63
use std::sync::Arc;
74
use tokio::sync::RwLock;
85

96
use crate::protocol::config::NetworkConfig;
10-
/// like etcd, k:/registry/pods/pod_name v:yaml file of pod
11-
/// k:/registry/nodes/node_name v:yaml file of node
7+
8+
/// XlineStore provides an etcd-like API for managing pods and nodes.
9+
/// Keys are stored under `/registry/pods/` and `/registry/nodes/`.
10+
/// Values are YAML serialized definitions.
1211
#[derive(Clone)]
1312
pub struct XlineStore {
1413
client: Arc<RwLock<Client>>,
1514
}
1615

16+
#[allow(unused)]
1717
impl XlineStore {
18+
/// Create a new XlineStore instance by connecting to the given endpoints.
1819
pub async fn new(endpoints: &[&str]) -> Result<Self> {
1920
let client = Client::connect(endpoints, None).await?;
2021
Ok(Self {
2122
client: Arc::new(RwLock::new(client)),
2223
})
2324
}
2425

25-
pub async fn insert_pod_yaml(&self, pod_name: &str, pod_yaml: &str) -> Result<()> {
26-
let key = format!("/registry/pods/{pod_name}");
27-
let mut client = self.client.write().await;
28-
client.put(key, pod_yaml, None).await?;
29-
Ok(())
26+
/// Get a read-only reference to the internal etcd client.
27+
/// This is typically used for watch operations.
28+
pub async fn client(&self) -> tokio::sync::RwLockReadGuard<'_, Client> {
29+
self.client.read().await
3030
}
3131

32-
pub async fn get_pod_yaml(&self, pod_name: &str) -> Result<Option<String>> {
33-
let key = format!("/registry/pods/{pod_name}");
32+
/// List all pod names (keys only, values are ignored).
33+
pub async fn list_pods(&self) -> Result<Vec<String>> {
34+
let key = "/registry/pods/".to_string();
3435
let mut client = self.client.write().await;
35-
let resp = client.get(key, None).await?;
36+
let resp = client
37+
.get(
38+
key.clone(),
39+
Some(GetOptions::new().with_prefix().with_keys_only()),
40+
)
41+
.await?;
3642
Ok(resp
3743
.kvs()
38-
.first()
39-
.map(|kv| String::from_utf8_lossy(kv.value()).to_string()))
44+
.iter()
45+
.map(|kv| String::from_utf8_lossy(kv.key()).replace("/registry/pods/", ""))
46+
.collect())
4047
}
4148

42-
pub async fn list_pods(&self) -> Result<Vec<String>> {
49+
/// List all node names (keys only, values are ignored).
50+
pub async fn list_nodes(&self) -> Result<Vec<String>> {
51+
let key = "/registry/nodes/".to_string();
4352
let mut client = self.client.write().await;
4453
let resp = client
45-
.get("/registry/pods/", Some(GetOptions::new().with_prefix()))
54+
.get(
55+
key.clone(),
56+
Some(GetOptions::new().with_prefix().with_keys_only()),
57+
)
4658
.await?;
4759
Ok(resp
4860
.kvs()
4961
.iter()
50-
.map(|kv| String::from_utf8_lossy(kv.key()).replace("/registry/pods/", ""))
62+
.map(|kv| String::from_utf8_lossy(kv.key()).replace("/registry/nodes/", ""))
5163
.collect())
5264
}
5365

66+
/// Insert a node YAML definition into xline.
5467
pub async fn insert_node_yaml(&self, node_name: &str, node_yaml: &str) -> Result<()> {
5568
let key = format!("/registry/nodes/{node_name}");
5669
let mut client = self.client.write().await;
5770
client.put(key, node_yaml, Some(PutOptions::new())).await?;
5871
Ok(())
5972
}
6073

74+
// Example (currently unused):
6175
// pub async fn get_node_yaml(&self, node_name: &str) -> Result<Option<String>> {
6276
// let key = format!("/registry/nodes/{node_name}");
6377
// let mut client = self.client.write().await;
@@ -74,20 +88,27 @@ impl XlineStore {
7488
// }
7589
// }
7690

77-
pub async fn list_nodes(&self) -> Result<Vec<(String, Node)>> {
91+
/// Insert a pod YAML definition into xline.
92+
pub async fn insert_pod_yaml(&self, pod_name: &str, pod_yaml: &str) -> Result<()> {
93+
let key = format!("/registry/pods/{pod_name}");
7894
let mut client = self.client.write().await;
79-
let resp = client
80-
.get("/registry/nodes/", Some(GetOptions::new().with_prefix()))
81-
.await?;
82-
let mut result = Vec::new();
83-
for kv in resp.kvs() {
84-
let name = String::from_utf8_lossy(kv.key()).replace("/registry/nodes/", "");
85-
let node: Node = serde_yaml::from_slice(kv.value())?;
86-
result.push((name, node));
95+
client.put(key, pod_yaml, Some(PutOptions::new())).await?;
96+
Ok(())
97+
}
98+
99+
/// Get a pod YAML definition from xline.
100+
pub async fn get_pod_yaml(&self, pod_name: &str) -> Result<Option<String>> {
101+
let key = format!("/registry/pods/{pod_name}");
102+
let mut client = self.client.write().await;
103+
let resp = client.get(key, None).await?;
104+
if let Some(kv) = resp.kvs().first() {
105+
Ok(Some(String::from_utf8_lossy(kv.value()).to_string()))
106+
} else {
107+
Ok(None)
87108
}
88-
Ok(result)
89109
}
90110

111+
/// Delete a pod from xline.
91112
pub async fn delete_pod(&self, pod_name: &str) -> Result<()> {
92113
let key = format!("/registry/pods/{pod_name}");
93114
let mut client = self.client.write().await;
@@ -123,4 +144,56 @@ impl XlineStore {
123144
Ok(None)
124145
}
125146
}
147+
148+
/// Take a snapshot of all pods and return them with the current revision.
149+
pub async fn pods_snapshot_with_rev(&self) -> Result<(Vec<(String, String)>, i64)> {
150+
let key_prefix = "/registry/pods/".to_string();
151+
let mut client = self.client.write().await;
152+
let resp = client
153+
.get(key_prefix.clone(), Some(GetOptions::new().with_prefix()))
154+
.await?;
155+
let rev = resp.header().map(|h| h.revision()).unwrap_or(0);
156+
let items: Vec<(String, String)> = resp
157+
.kvs()
158+
.iter()
159+
.map(|kv| {
160+
(
161+
String::from_utf8_lossy(kv.key()).replace("/registry/pods/", ""),
162+
String::from_utf8_lossy(kv.value()).to_string(),
163+
)
164+
})
165+
.collect();
166+
Ok((items, rev))
167+
}
168+
169+
/// Create a watch on all pods with prefix `/registry/pods/`, starting from a given revision.
170+
pub async fn watch_pods(&self, start_rev: i64) -> Result<(Watcher, WatchStream)> {
171+
let key_prefix = "/registry/pods/".to_string();
172+
let opts = WatchOptions::new()
173+
.with_prefix()
174+
.with_prev_key()
175+
.with_start_revision(start_rev);
176+
let mut client = self.client.write().await;
177+
let (watcher, stream) = client.watch(key_prefix, Some(opts)).await?;
178+
Ok((watcher, stream))
179+
}
180+
181+
/// Initialize Flannel CNI network configuration.
182+
pub async fn init_flannel_config(&self) -> Result<()> {
183+
let config_json = r#"{
184+
"Network": "10.244.0.0/16",
185+
"SubnetLen": 24,
186+
"Backend": {
187+
"Type": "vxlan",
188+
"VNI": 1
189+
}
190+
}"#;
191+
192+
let key = "/coreos.com/network/config";
193+
let mut client = self.client.write().await;
194+
client
195+
.put(key, config_json, Some(PutOptions::new()))
196+
.await?;
197+
Ok(())
198+
}
126199
}

project/rks/src/commands/create.rs

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use anyhow::Result;
33
use common::{PodTask, RksMessage};
44
use quinn::Connection;
55
use std::sync::Arc;
6-
use tokio::sync::broadcast;
76

7+
/// Send a pod creation message to a specific worker node
8+
#[allow(unused)]
89
pub async fn watch_create(pod_task: &PodTask, conn: &Connection, node_id: &str) -> Result<()> {
910
if pod_task.spec.node_name.as_deref() == Some(node_id) {
1011
let msg = RksMessage::CreatePod(Box::new(pod_task.clone()));
@@ -13,55 +14,52 @@ pub async fn watch_create(pod_task: &PodTask, conn: &Connection, node_id: &str)
1314
stream.write_all(&data).await?;
1415
stream.finish()?;
1516
println!(
16-
"[watch_pods] send CreatePod for pod: {} to node: {}",
17+
"[watch_create] sent CreatePod for pod: {} to node: {}",
1718
pod_task.metadata.name, node_id
1819
);
1920
}
2021
}
2122
Ok(())
2223
}
2324

25+
/// Handle user-requested pod creation, store pod in Xline
2426
pub async fn user_create(
25-
mut pod_task: Box<PodTask>,
27+
pod_task: Box<PodTask>,
2628
xline_store: &Arc<XlineStore>,
2729
conn: &Connection,
28-
tx: &broadcast::Sender<RksMessage>,
2930
) -> Result<()> {
30-
if let Ok(nodes) = xline_store.list_nodes().await
31-
&& let Some((node_name, _)) = nodes.first()
32-
{
33-
pod_task.spec.node_name = Some(node_name.clone());
34-
let pod_yaml = match serde_yaml::to_string(&pod_task) {
35-
Ok(yaml) => yaml,
36-
Err(e) => {
37-
eprintln!("[user dispatch] Failed to serialize pod task: {e}");
38-
let response = RksMessage::Error(format!("Serialization error: {e}"));
39-
let data = bincode::serialize(&response).unwrap_or_else(|_| vec![]);
40-
if let Ok(mut stream) = conn.open_uni().await {
41-
stream.write_all(&data).await?;
42-
stream.finish()?;
43-
}
44-
return Ok(());
31+
// Serialize pod to YAML
32+
let pod_yaml = match serde_yaml::to_string(&pod_task) {
33+
Ok(yaml) => yaml,
34+
Err(e) => {
35+
eprintln!("[user_create] Failed to serialize pod task: {e}");
36+
let response = RksMessage::Error(format!("Serialization error: {e}"));
37+
let data = bincode::serialize(&response).unwrap_or_else(|_| vec![]);
38+
if let Ok(mut stream) = conn.open_uni().await {
39+
stream.write_all(&data).await?;
40+
stream.finish()?;
4541
}
46-
};
47-
48-
xline_store
49-
.insert_pod_yaml(&pod_task.metadata.name, &pod_yaml)
50-
.await?;
42+
return Ok(());
43+
}
44+
};
5145

52-
println!(
53-
"[user dispatch] created pod: {}, assigned to node: {}",
54-
pod_task.metadata.name, node_name
55-
);
46+
// Insert into Xline
47+
xline_store
48+
.insert_pod_yaml(&pod_task.metadata.name, &pod_yaml)
49+
.await?;
5650

57-
let _ = tx.send(RksMessage::CreatePod(pod_task.clone()));
51+
println!(
52+
"[user_create] created pod {} (written to Xline)",
53+
pod_task.metadata.name
54+
);
5855

59-
let response = RksMessage::Ack;
60-
let data = bincode::serialize(&response)?;
61-
if let Ok(mut stream) = conn.open_uni().await {
62-
stream.write_all(&data).await?;
63-
stream.finish()?;
64-
}
56+
// Send ACK to user
57+
let response = RksMessage::Ack;
58+
let data = bincode::serialize(&response)?;
59+
if let Ok(mut stream) = conn.open_uni().await {
60+
stream.write_all(&data).await?;
61+
stream.finish()?;
6562
}
63+
6664
Ok(())
6765
}

project/rks/src/commands/delete.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ use anyhow::Result;
33
use common::{PodTask, RksMessage};
44
use quinn::Connection;
55
use std::sync::Arc;
6-
use tokio::sync::broadcast;
76

7+
#[allow(unused)]
88
pub async fn watch_delete(
99
pod_name: String,
1010
conn: &Connection,
1111
xline_store: &Arc<XlineStore>,
1212
node_id: &str,
1313
) -> Result<()> {
1414
let msg = RksMessage::DeletePod(pod_name.clone());
15+
1516
if let Ok(pods) = xline_store.list_pods().await {
1617
for p in pods {
1718
if let Ok(Some(pod_yaml)) = xline_store.get_pod_yaml(&p).await {
@@ -37,11 +38,11 @@ pub async fn watch_delete(
3738

3839
pub async fn user_delete(
3940
pod_name: String,
40-
_xline_store: &Arc<XlineStore>,
41+
xline_store: &Arc<XlineStore>,
4142
conn: &Connection,
42-
tx: &broadcast::Sender<RksMessage>,
4343
) -> Result<()> {
44-
let _ = tx.send(RksMessage::DeletePod(pod_name.clone()));
44+
xline_store.delete_pod(&pod_name).await?;
45+
println!("[user_delete] deleted pod {} (written to xline)", pod_name);
4546

4647
let response = RksMessage::Ack;
4748
let data = bincode::serialize(&response)?;

project/rks/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::sync::Arc;
1919

2020
#[tokio::main]
2121
async fn main() -> anyhow::Result<()> {
22+
env_logger::init();
23+
2224
let cli = Cli::parse();
2325
use log::info;
2426

project/rks/src/network/ip.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ mod tests {
365365
}
366366

367367
#[tokio::test]
368+
#[ignore]
368369
async fn test_ifcanreach_gateway_should_succeed() {
369370
use std::net::IpAddr;
370371

0 commit comments

Comments
 (0)