Skip to content

Commit 3904983

Browse files
author
Goshawk
committed
Update comments
1 parent ebcbc59 commit 3904983

File tree

4 files changed

+15
-33
lines changed

4 files changed

+15
-33
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Data lake implementation integrated with AWS S3
66
- Async-Download chunks from AWS S3
77
- Persist on-disk in a lock-less manner
88
- List all persisted chunks by ID from a cache
9-
- Find and lock a chunk - Once locked, chunk cannot be deleted
9+
- Find and lock a chunk - Once locked, chunk cannot be deleted until the DataChunkRef is released/dropped.
1010
- Scheduled deletion - Scheduled for deletion, a chunk will be removed once it is no longer in use.
1111

1212
- Maximum allocated on-disk storage limit

src/data_manager.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,6 @@ impl<T: StorageEngine> DataManagerImpl<T> {
199199
.await
200200
.expect("valid data blob");
201201

202-
203202
let chunk: DataChunk =
204203
serde_binary::from_vec(vec, Endian::Big)
205204
.expect("valid chunk");
@@ -216,19 +215,21 @@ impl<T: StorageEngine> DataManagerImpl<T> {
216215
false
217216
} else {
218217
true
219-
}
218+
}
220219
};
221220

222221
if !exists {
223222
// Due to the usage RocksDB::OptimisticTransaction it is safe here to use read lock
224-
// Any write conflict will be resolved by the RocksDB::commit itself
223+
// Any write conflict will be detected by the RocksDB::commit itself.
224+
// However, we don't expect high lock contention here.
225225
if db
226226
.read()
227227
.await
228228
.persist_chunk(chunk, chunk_size)
229229
.is_err()
230230
{
231-
// chunk could not be persisted. Rollback the cache
231+
// chunk could not be persisted.
232+
// Rollback it from the cache to keep the cache consistent
232233
cache.write().await.remove(&id);
233234
}
234235
}

src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
pub mod data_manager;
22
pub mod rocksdb_storage_engine;
33

4+
use serde_binary::Encode;
45
use std::collections::HashMap;
56
use std::ops::Range;
67
use std::path::{Path, PathBuf};
7-
use serde_binary::Encode;
88
use thiserror::Error;
99

1010
pub type DatasetId = [u8; 32];
@@ -27,7 +27,10 @@ pub struct DataChunk {
2727
}
2828

2929
impl Encode for DataChunk {
30-
fn encode(&self, _serializer: &mut serde_binary::Serializer<'_>) -> Result<(), serde_binary::Error> {
30+
fn encode(
31+
&self,
32+
_serializer: &mut serde_binary::Serializer<'_>,
33+
) -> Result<(), serde_binary::Error> {
3134
// TODO: Implement
3235
Ok(())
3336
}
@@ -47,7 +50,6 @@ pub enum Error {
4750
MaxSizeAllocated(u32),
4851
}
4952

50-
// Implement From To rocksdb_lib::Error for Error
5153
impl From<rocksdb_lib::Error> for Error {
5254
fn from(e: rocksdb_lib::Error) -> Self {
5355
Self::InternalErr(e)

src/rocksdb_storage_engine.rs

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,12 @@ use serde_binary::binary_stream::Endian;
1212

1313
const SIZE_KEY: [u8; 1] = [1u8; 1];
1414

15+
/// Represents a storage engine implementation using RocksDB.
1516
pub struct StorageEngineImpl {
1617
rocksdb: OptimisticTransactionDB,
1718
conf: StorageConf,
1819
}
1920

20-
/// Implementation of storage engine for a data lake.
21-
///
22-
/// This storage engine provides methods for managing data chunks in a data lake.
23-
/// It uses an underlying RocksDB database for storing and retrieving data.
24-
///
25-
/// # Examples
26-
///
27-
/// ```
28-
/// use datalake::StorageEngine;
29-
///
30-
/// // Create a new storage engine
31-
/// let conf = StorageEngine::new("/path/to/database");
32-
/// let storage_engine = StorageEngine::from_conf(conf);
33-
///
34-
/// // Persist a data chunk
35-
/// let chunk = DataChunk::new(...);
36-
/// storage_engine.persist_chunk(&chunk, 100).unwrap();
37-
///
38-
/// // Find a chunk ID
39-
/// let dataset_id = DatasetId::new(...);
40-
/// let block_number = 42;
41-
/// let chunk_id = storage_engine.find_chunk_id(dataset_id, block_number).unwrap();
42-
/// ```
4321
impl StorageEngine for StorageEngineImpl {
4422
fn from_conf(conf: StorageConf) -> Self {
4523
let rocksdb =
@@ -137,8 +115,9 @@ impl StorageEngine for StorageEngineImpl {
137115
// Persist KEY - VALUE
138116
// Chunk_ID -> SizeU32 + Chunk Bytes
139117
let id = chunk.id;
140-
let chunk_bytes = serde_binary::encode(&chunk, Endian::Big)
141-
.expect("should encode");
118+
let chunk_bytes =
119+
serde_binary::encode(&chunk, Endian::Big)
120+
.expect("should encode");
142121

143122
// Chunk is now in form of chunk_bytes, drop the chunk struct to free memory
144123
drop(chunk);

0 commit comments

Comments
 (0)