Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions project/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions project/rks/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cargo.rust_binary(
named_deps = {
"async_trait": "//third-party/rust/crates/async-trait/0.1.89:async-trait-0.1.89",
"etcd_client": "//third-party/rust/crates/etcd-client/0.16.1:etcd-client-0.16.1",
"futures_util": "//third-party/rust/crates/futures-util/0.3.31:futures-util-0.3.31",
"native_tls": "//third-party/rust/crates/native-tls/0.2.14:native-tls-0.2.14",
"netlink_packet_route": "//third-party/rust/crates/netlink-packet-route/0.22.0:netlink-packet-route-0.22.0",
"tokio_util": "//third-party/rust/crates/tokio-util/0.7.16:tokio-util-0.7.16",
Expand Down Expand Up @@ -61,6 +62,7 @@ cargo.rust_binary(
"//third-party/rust/crates/serde_yaml/0.9.34+deprecated:serde_yaml-0.9.34+deprecated",
"//third-party/rust/crates/tempfile/3.22.0:tempfile-3.22.0",
"//third-party/rust/crates/thiserror/2.0.16:thiserror-2.0.16",
"//third-party/rust/crates/time/0.3.43:time-0.3.43",
"//third-party/rust/crates/tokio/1.47.1:tokio-1.47.1",
"//third-party/rust/crates/tonic/0.13.1:tonic-0.13.1",
],
Expand Down Expand Up @@ -91,6 +93,7 @@ cargo.rust_library(
named_deps = {
"async_trait": "//third-party/rust/crates/async-trait/0.1.89:async-trait-0.1.89",
"etcd_client": "//third-party/rust/crates/etcd-client/0.16.1:etcd-client-0.16.1",
"futures_util": "//third-party/rust/crates/futures-util/0.3.31:futures-util-0.3.31",
"native_tls": "//third-party/rust/crates/native-tls/0.2.14:native-tls-0.2.14",
"netlink_packet_route": "//third-party/rust/crates/netlink-packet-route/0.22.0:netlink-packet-route-0.22.0",
"tokio_util": "//third-party/rust/crates/tokio-util/0.7.16:tokio-util-0.7.16",
Expand Down Expand Up @@ -124,6 +127,7 @@ cargo.rust_library(
"//third-party/rust/crates/serde_yaml/0.9.34+deprecated:serde_yaml-0.9.34+deprecated",
"//third-party/rust/crates/tempfile/3.22.0:tempfile-3.22.0",
"//third-party/rust/crates/thiserror/2.0.16:thiserror-2.0.16",
"//third-party/rust/crates/time/0.3.43:time-0.3.43",
"//third-party/rust/crates/tokio/1.47.1:tokio-1.47.1",
"//third-party/rust/crates/tonic/0.13.1:tonic-0.13.1",
],
Expand Down
2 changes: 2 additions & 0 deletions project/rks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ tempfile = "3.20.0"
tokio-util = "0.7.0"
env_logger = "0.11.0"
once_cell = "1.21.0"
futures-util = "0.3"
time = "0.3"

[dev-dependencies]
serial_test = "3.2.0"
131 changes: 102 additions & 29 deletions project/rks/src/api/xlinestore.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,77 @@
#![allow(dead_code)]
use anyhow::Result;
use common::Node;
use etcd_client::{Client, GetOptions, PutOptions};
use serde_yaml;
use etcd_client::{Client, GetOptions, PutOptions, WatchOptions, WatchStream, Watcher};
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::protocol::config::NetworkConfig;
/// like etcd, k:/registry/pods/pod_name v:yaml file of pod
/// k:/registry/nodes/node_name v:yaml file of node

/// XlineStore provides an etcd-like API for managing pods and nodes.
/// Keys are stored under `/registry/pods/` and `/registry/nodes/`.
/// Values are YAML serialized definitions.
#[derive(Clone)]
pub struct XlineStore {
client: Arc<RwLock<Client>>,
}

#[allow(unused)]
impl XlineStore {
/// Create a new XlineStore instance by connecting to the given endpoints.
pub async fn new(endpoints: &[&str]) -> Result<Self> {
let client = Client::connect(endpoints, None).await?;
Ok(Self {
client: Arc::new(RwLock::new(client)),
})
}

pub async fn insert_pod_yaml(&self, pod_name: &str, pod_yaml: &str) -> Result<()> {
let key = format!("/registry/pods/{pod_name}");
let mut client = self.client.write().await;
client.put(key, pod_yaml, None).await?;
Ok(())
/// Get a read-only reference to the internal etcd client.
/// This is typically used for watch operations.
pub async fn client(&self) -> tokio::sync::RwLockReadGuard<'_, Client> {
self.client.read().await
}

pub async fn get_pod_yaml(&self, pod_name: &str) -> Result<Option<String>> {
let key = format!("/registry/pods/{pod_name}");
/// List all pod names (keys only, values are ignored).
pub async fn list_pods(&self) -> Result<Vec<String>> {
let key = "/registry/pods/".to_string();
let mut client = self.client.write().await;
let resp = client.get(key, None).await?;
let resp = client
.get(
key.clone(),
Some(GetOptions::new().with_prefix().with_keys_only()),
)
.await?;
Ok(resp
.kvs()
.first()
.map(|kv| String::from_utf8_lossy(kv.value()).to_string()))
.iter()
.map(|kv| String::from_utf8_lossy(kv.key()).replace("/registry/pods/", ""))
.collect())
}

pub async fn list_pods(&self) -> Result<Vec<String>> {
/// List all node names (keys only, values are ignored).
pub async fn list_nodes(&self) -> Result<Vec<String>> {
let key = "/registry/nodes/".to_string();
let mut client = self.client.write().await;
let resp = client
.get("/registry/pods/", Some(GetOptions::new().with_prefix()))
.get(
key.clone(),
Some(GetOptions::new().with_prefix().with_keys_only()),
)
.await?;
Ok(resp
.kvs()
.iter()
.map(|kv| String::from_utf8_lossy(kv.key()).replace("/registry/pods/", ""))
.map(|kv| String::from_utf8_lossy(kv.key()).replace("/registry/nodes/", ""))
.collect())
}

/// Insert a node YAML definition into xline.
pub async fn insert_node_yaml(&self, node_name: &str, node_yaml: &str) -> Result<()> {
let key = format!("/registry/nodes/{node_name}");
let mut client = self.client.write().await;
client.put(key, node_yaml, Some(PutOptions::new())).await?;
Ok(())
}

// Example (currently unused):
// pub async fn get_node_yaml(&self, node_name: &str) -> Result<Option<String>> {
// let key = format!("/registry/nodes/{node_name}");
// let mut client = self.client.write().await;
Expand All @@ -74,20 +88,27 @@ impl XlineStore {
// }
// }

pub async fn list_nodes(&self) -> Result<Vec<(String, Node)>> {
/// Insert a pod YAML definition into xline.
pub async fn insert_pod_yaml(&self, pod_name: &str, pod_yaml: &str) -> Result<()> {
let key = format!("/registry/pods/{pod_name}");
let mut client = self.client.write().await;
let resp = client
.get("/registry/nodes/", Some(GetOptions::new().with_prefix()))
.await?;
let mut result = Vec::new();
for kv in resp.kvs() {
let name = String::from_utf8_lossy(kv.key()).replace("/registry/nodes/", "");
let node: Node = serde_yaml::from_slice(kv.value())?;
result.push((name, node));
client.put(key, pod_yaml, Some(PutOptions::new())).await?;
Ok(())
}

/// Get a pod YAML definition from xline.
pub async fn get_pod_yaml(&self, pod_name: &str) -> Result<Option<String>> {
let key = format!("/registry/pods/{pod_name}");
let mut client = self.client.write().await;
let resp = client.get(key, None).await?;
if let Some(kv) = resp.kvs().first() {
Ok(Some(String::from_utf8_lossy(kv.value()).to_string()))
} else {
Ok(None)
}
Ok(result)
}

/// Delete a pod from xline.
pub async fn delete_pod(&self, pod_name: &str) -> Result<()> {
let key = format!("/registry/pods/{pod_name}");
let mut client = self.client.write().await;
Expand Down Expand Up @@ -123,4 +144,56 @@ impl XlineStore {
Ok(None)
}
}

/// Take a snapshot of all pods and return them with the current revision.
pub async fn pods_snapshot_with_rev(&self) -> Result<(Vec<(String, String)>, i64)> {
let key_prefix = "/registry/pods/".to_string();
let mut client = self.client.write().await;
let resp = client
.get(key_prefix.clone(), Some(GetOptions::new().with_prefix()))
.await?;
let rev = resp.header().map(|h| h.revision()).unwrap_or(0);
let items: Vec<(String, String)> = resp
.kvs()
.iter()
.map(|kv| {
(
String::from_utf8_lossy(kv.key()).replace("/registry/pods/", ""),
String::from_utf8_lossy(kv.value()).to_string(),
)
})
.collect();
Ok((items, rev))
}

/// Create a watch on all pods with prefix `/registry/pods/`, starting from a given revision.
pub async fn watch_pods(&self, start_rev: i64) -> Result<(Watcher, WatchStream)> {
let key_prefix = "/registry/pods/".to_string();
let opts = WatchOptions::new()
.with_prefix()
.with_prev_key()
.with_start_revision(start_rev);
let mut client = self.client.write().await;
let (watcher, stream) = client.watch(key_prefix, Some(opts)).await?;
Ok((watcher, stream))
}

/// Initialize Flannel CNI network configuration.
pub async fn init_flannel_config(&self) -> Result<()> {
let config_json = r#"{
"Network": "10.244.0.0/16",
"SubnetLen": 24,
"Backend": {
"Type": "vxlan",
"VNI": 1
}
}"#;

let key = "/coreos.com/network/config";
let mut client = self.client.write().await;
client
.put(key, config_json, Some(PutOptions::new()))
.await?;
Ok(())
}
}
68 changes: 33 additions & 35 deletions project/rks/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use anyhow::Result;
use common::{PodTask, RksMessage};
use quinn::Connection;
use std::sync::Arc;
use tokio::sync::broadcast;

/// Send a pod creation message to a specific worker node
#[allow(unused)]
pub async fn watch_create(pod_task: &PodTask, conn: &Connection, node_id: &str) -> Result<()> {
if pod_task.spec.node_name.as_deref() == Some(node_id) {
let msg = RksMessage::CreatePod(Box::new(pod_task.clone()));
Expand All @@ -13,55 +14,52 @@ pub async fn watch_create(pod_task: &PodTask, conn: &Connection, node_id: &str)
stream.write_all(&data).await?;
stream.finish()?;
println!(
"[watch_pods] send CreatePod for pod: {} to node: {}",
"[watch_create] sent CreatePod for pod: {} to node: {}",
pod_task.metadata.name, node_id
);
}
}
Ok(())
}

/// Handle user-requested pod creation, store pod in Xline
pub async fn user_create(
mut pod_task: Box<PodTask>,
pod_task: Box<PodTask>,
xline_store: &Arc<XlineStore>,
conn: &Connection,
tx: &broadcast::Sender<RksMessage>,
) -> Result<()> {
if let Ok(nodes) = xline_store.list_nodes().await
&& let Some((node_name, _)) = nodes.first()
{
pod_task.spec.node_name = Some(node_name.clone());
let pod_yaml = match serde_yaml::to_string(&pod_task) {
Ok(yaml) => yaml,
Err(e) => {
eprintln!("[user dispatch] Failed to serialize pod task: {e}");
let response = RksMessage::Error(format!("Serialization error: {e}"));
let data = bincode::serialize(&response).unwrap_or_else(|_| vec![]);
if let Ok(mut stream) = conn.open_uni().await {
stream.write_all(&data).await?;
stream.finish()?;
}
return Ok(());
// Serialize pod to YAML
let pod_yaml = match serde_yaml::to_string(&pod_task) {
Ok(yaml) => yaml,
Err(e) => {
eprintln!("[user_create] Failed to serialize pod task: {e}");
let response = RksMessage::Error(format!("Serialization error: {e}"));
let data = bincode::serialize(&response).unwrap_or_else(|_| vec![]);
if let Ok(mut stream) = conn.open_uni().await {
stream.write_all(&data).await?;
stream.finish()?;
}
};

xline_store
.insert_pod_yaml(&pod_task.metadata.name, &pod_yaml)
.await?;
return Ok(());
}
};

println!(
"[user dispatch] created pod: {}, assigned to node: {}",
pod_task.metadata.name, node_name
);
// Insert into Xline
xline_store
.insert_pod_yaml(&pod_task.metadata.name, &pod_yaml)
.await?;

let _ = tx.send(RksMessage::CreatePod(pod_task.clone()));
println!(
"[user_create] created pod {} (written to Xline)",
pod_task.metadata.name
);

let response = RksMessage::Ack;
let data = bincode::serialize(&response)?;
if let Ok(mut stream) = conn.open_uni().await {
stream.write_all(&data).await?;
stream.finish()?;
}
// Send ACK to user
let response = RksMessage::Ack;
let data = bincode::serialize(&response)?;
if let Ok(mut stream) = conn.open_uni().await {
stream.write_all(&data).await?;
stream.finish()?;
}

Ok(())
}
9 changes: 5 additions & 4 deletions project/rks/src/commands/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use anyhow::Result;
use common::{PodTask, RksMessage};
use quinn::Connection;
use std::sync::Arc;
use tokio::sync::broadcast;

#[allow(unused)]
pub async fn watch_delete(
pod_name: String,
conn: &Connection,
xline_store: &Arc<XlineStore>,
node_id: &str,
) -> Result<()> {
let msg = RksMessage::DeletePod(pod_name.clone());

if let Ok(pods) = xline_store.list_pods().await {
for p in pods {
if let Ok(Some(pod_yaml)) = xline_store.get_pod_yaml(&p).await {
Expand All @@ -37,11 +38,11 @@ pub async fn watch_delete(

pub async fn user_delete(
pod_name: String,
_xline_store: &Arc<XlineStore>,
xline_store: &Arc<XlineStore>,
conn: &Connection,
tx: &broadcast::Sender<RksMessage>,
) -> Result<()> {
let _ = tx.send(RksMessage::DeletePod(pod_name.clone()));
xline_store.delete_pod(&pod_name).await?;
println!("[user_delete] deleted pod {} (written to xline)", pod_name);

let response = RksMessage::Ack;
let data = bincode::serialize(&response)?;
Expand Down
2 changes: 2 additions & 0 deletions project/rks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();

let cli = Cli::parse();
use log::info;

Expand Down
1 change: 1 addition & 0 deletions project/rks/src/network/ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ mod tests {
}

#[tokio::test]
#[ignore]
async fn test_ifcanreach_gateway_should_succeed() {
use std::net::IpAddr;

Expand Down
Loading