Skip to content

Commit 91c7898

Browse files
author
Goshawk
committed
Update comments
1 parent ebcbc59 commit 91c7898

File tree

5 files changed

+20
-40
lines changed

5 files changed

+20
-40
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ edition = "2021"
66
[dependencies]
77
tokio = { version = "1", features = ["rt", "net", "sync", "time", "io-std", "rt-multi-thread", "macros"] }
88
thiserror = "1.0.63"
9-
async-trait = "0.1.81"
109
requestty = "0.5.0"
1110
hex = "0.4.3"
1211
aws-config = "1.5.4"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Data lake implementation integrated with AWS S3
66
- Async-Download chunks from AWS S3
77
- Persist on-disk in a lock-less manner
88
- List all persisted chunks by ID from a cache
9-
- Find and lock a chunk - Once locked, chunk cannot be deleted
9+
- Find and lock a chunk - Once locked, chunk cannot be deleted until the DataChunkRef is released/dropped.
1010
- Scheduled deletion - Scheduled for deletion, a chunk will be removed once it is no longer in use.
1111

1212
- Maximum allocated on-disk storage limit

src/data_manager.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
};
44
use aws_config::BehaviorVersion;
55
use serde_binary::binary_stream::Endian;
6-
use std::collections::HashMap;
6+
use std::collections::{hash_map, HashMap};
77
use std::path::PathBuf;
88
use std::sync::Arc;
99
use tokio::sync::{
@@ -199,7 +199,6 @@ impl<T: StorageEngine> DataManagerImpl<T> {
199199
.await
200200
.expect("valid data blob");
201201

202-
203202
let chunk: DataChunk =
204203
serde_binary::from_vec(vec, Endian::Big)
205204
.expect("valid chunk");
@@ -208,27 +207,28 @@ impl<T: StorageEngine> DataManagerImpl<T> {
208207
// Check if the chunk already exists in the cache
209208
let exists = {
210209
let mut cache = cache.write().await;
211-
if !cache.contains_key(&chunk.id) {
212-
cache.insert(
213-
chunk.id,
214-
Arc::new(Semaphore::new(1)),
215-
);
210+
if let hash_map::Entry::Vacant(e) =
211+
cache.entry(chunk.id)
212+
{
213+
e.insert(Arc::new(Semaphore::new(1)));
216214
false
217215
} else {
218216
true
219-
}
217+
}
220218
};
221219

222220
if !exists {
223221
// Due to the usage RocksDB::OptimisticTransaction it is safe here to use read lock
224-
// Any write conflict will be resolved by the RocksDB::commit itself
222+
// Any write conflict will be detected by the RocksDB::commit itself.
223+
// However, we don't expect high lock contention here.
225224
if db
226225
.read()
227226
.await
228227
.persist_chunk(chunk, chunk_size)
229228
.is_err()
230229
{
231-
// chunk could not be persisted. Rollback the cache
230+
// chunk could not be persisted.
231+
// Rollback it from the cache to keep the cache consistent
232232
cache.write().await.remove(&id);
233233
}
234234
}

src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
pub mod data_manager;
22
pub mod rocksdb_storage_engine;
33

4+
use serde_binary::Encode;
45
use std::collections::HashMap;
56
use std::ops::Range;
67
use std::path::{Path, PathBuf};
7-
use serde_binary::Encode;
88
use thiserror::Error;
99

1010
pub type DatasetId = [u8; 32];
@@ -27,7 +27,10 @@ pub struct DataChunk {
2727
}
2828

2929
impl Encode for DataChunk {
30-
fn encode(&self, _serializer: &mut serde_binary::Serializer<'_>) -> Result<(), serde_binary::Error> {
30+
fn encode(
31+
&self,
32+
_serializer: &mut serde_binary::Serializer<'_>,
33+
) -> Result<(), serde_binary::Error> {
3134
// TODO: Implement
3235
Ok(())
3336
}
@@ -47,7 +50,6 @@ pub enum Error {
4750
MaxSizeAllocated(u32),
4851
}
4952

50-
// Implement From To rocksdb_lib::Error for Error
5153
impl From<rocksdb_lib::Error> for Error {
5254
fn from(e: rocksdb_lib::Error) -> Self {
5355
Self::InternalErr(e)

src/rocksdb_storage_engine.rs

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,12 @@ use serde_binary::binary_stream::Endian;
1212

1313
const SIZE_KEY: [u8; 1] = [1u8; 1];
1414

15+
/// Represents a storage engine implementation using RocksDB.
1516
pub struct StorageEngineImpl {
1617
rocksdb: OptimisticTransactionDB,
1718
conf: StorageConf,
1819
}
1920

20-
/// Implementation of storage engine for a data lake.
21-
///
22-
/// This storage engine provides methods for managing data chunks in a data lake.
23-
/// It uses an underlying RocksDB database for storing and retrieving data.
24-
///
25-
/// # Examples
26-
///
27-
/// ```
28-
/// use datalake::StorageEngine;
29-
///
30-
/// // Create a new storage engine
31-
/// let conf = StorageEngine::new("/path/to/database");
32-
/// let storage_engine = StorageEngine::from_conf(conf);
33-
///
34-
/// // Persist a data chunk
35-
/// let chunk = DataChunk::new(...);
36-
/// storage_engine.persist_chunk(&chunk, 100).unwrap();
37-
///
38-
/// // Find a chunk ID
39-
/// let dataset_id = DatasetId::new(...);
40-
/// let block_number = 42;
41-
/// let chunk_id = storage_engine.find_chunk_id(dataset_id, block_number).unwrap();
42-
/// ```
4321
impl StorageEngine for StorageEngineImpl {
4422
fn from_conf(conf: StorageConf) -> Self {
4523
let rocksdb =
@@ -137,8 +115,9 @@ impl StorageEngine for StorageEngineImpl {
137115
// Persist KEY - VALUE
138116
// Chunk_ID -> SizeU32 + Chunk Bytes
139117
let id = chunk.id;
140-
let chunk_bytes = serde_binary::encode(&chunk, Endian::Big)
141-
.expect("should encode");
118+
let chunk_bytes =
119+
serde_binary::encode(&chunk, Endian::Big)
120+
.expect("should encode");
142121

143122
// Chunk is now in form of chunk_bytes, drop the chunk struct to free memory
144123
drop(chunk);

0 commit comments

Comments
 (0)