Skip to content

Commit 1dee60b

Browse files
author
Goshawk
committed
Wait until all spawned tasks complete
1 parent cfadf32 commit 1dee60b

File tree

2 files changed

+26
-14
lines changed

2 files changed

+26
-14
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ Data lake implementation integrated with AWS S3
99
- Find and lock a chunk - Once locked, chunk cannot be deleted
1010
- Scheduled deletion - Scheduled for deletion, a chunk will be removed once it is no longer in use.
1111

12+
- Maximum allocated on-disk storage limit
1213
- Backend-agnostic datamanager. The RocksDB backend can be substituted with any in-process NoSQL or SQL storage engine.g
13-
14+
- Simple Prompt UI
1415

1516
# Design
1617

src/bin/main.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use datalake::{
22
data_manager::DataManagerImpl,
33
rocksdb_storage_engine::StorageEngineImpl,
4-
StorageEngine,
5-
DataChunkRef
4+
DataChunkRef, StorageEngine,
65
};
76

87
enum Commands {
@@ -33,30 +32,33 @@ fn prompt() -> requestty::Result<Commands> {
3332

3433
#[tokio::main]
3534
async fn main() {
36-
// TODO: Read conf from TOML file
35+
// TODO: Read conf from TOML file
3736

3837
// Initialize the data manager
3938
let storage =
4039
StorageEngineImpl::from_conf(Default::default());
4140

42-
let data_manager: DataManagerImpl<StorageEngineImpl> = DataManagerImpl::new(storage);
41+
let data_manager: DataManagerImpl<StorageEngineImpl> =
42+
DataManagerImpl::new(storage);
4343

4444
// Assigned subset of data chunks in S3
4545
let vec_buckets_and_keys = [
4646
("bucket1".to_string(), "key1".to_string()),
4747
("bucket2".to_string(), "key2".to_string()),
4848
("bucket3".to_string(), "key3".to_string()),
49+
("bucket4".to_string(), "key4".to_string()),
4950
];
5051

5152
// Spawn tasks to concurrently download and persist chunks
52-
vec_buckets_and_keys.iter().for_each(
53-
|(bucket, key)| {
53+
let mut tasks = vec_buckets_and_keys
54+
.iter()
55+
.map(|(bucket, key)| {
5456
data_manager.spawn_download_chunk(
5557
bucket.clone(),
5658
key.clone(),
57-
);
58-
},
59-
);
59+
)
60+
})
61+
.collect::<Vec<_>>();
6062

6163
// Serve queries for the downloaded data chunks
6264
loop {
@@ -95,16 +97,25 @@ async fn main() {
9597
}
9698
Commands::ScheduledDelete => {
9799
// TODO: Read input from user
100+
98101
// schedule a chunk for deletion
99102
let chunk_id = [0u8; 32];
100-
data_manager
103+
104+
if let Some(handle) = data_manager
101105
.spawn_delete_chunk(chunk_id)
102-
.await;
106+
.await
107+
{
108+
tasks.push(handle);
109+
}
103110
}
104111
Commands::Exit => {
105-
// Implement a graceful shutdown
106-
todo!()
112+
break;
107113
}
108114
}
109115
}
116+
117+
// Wait until all spawned tasks complete
118+
for task in tasks {
119+
task.await.unwrap();
120+
}
110121
}

0 commit comments

Comments
 (0)