Skip to content

Commit 18539b1

Browse files
authored
Merge pull request #3834 from dusk-network/blob_expire
node: delete expired blobs
2 parents 0ec1bb6 + 0bbd1fb commit 18539b1

File tree

13 files changed

+198
-6
lines changed

13 files changed

+198
-6
lines changed

core/src/transfer.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,15 @@ impl Transaction {
304304
}
305305
}
306306

307+
/// Returns the Blob used with the transaction, if any.
308+
#[must_use]
309+
pub fn blob_mut(&mut self) -> Option<&mut Vec<BlobData>> {
310+
match self {
311+
Self::Phoenix(tx) => tx.blob_mut(),
312+
Self::Moonlight(tx) => tx.blob_mut(),
313+
}
314+
}
315+
307316
/// Extracts and removes the blob sidecar from the transaction, if any, and
308317
/// returns it as a vector of tuples containing the blob hash and the
309318
/// corresponding blob sidecar.

node/src/chain.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
6262
genesis_timestamp: u64,
6363
dusk_key: BlsPublicKey,
6464
finality_activation: u64,
65+
blob_expire_after: u64,
6566
#[cfg(feature = "archive")]
6667
archive: Archive,
6768
}
@@ -96,6 +97,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
9697
self.event_sender.clone(),
9798
self.dusk_key,
9899
self.finality_activation,
100+
self.blob_expire_after,
99101
)
100102
.await?;
101103

@@ -249,13 +251,15 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
249251
}
250252

251253
impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
254+
#[allow(clippy::too_many_arguments)]
252255
pub fn new(
253256
keys_path: String,
254257
max_inbound_size: usize,
255258
event_sender: Sender<Event>,
256259
genesis_timestamp: u64,
257260
dusk_key: BlsPublicKey,
258261
finality_activation: u64,
262+
blob_expire_after: u64,
259263
#[cfg(feature = "archive")] archive: Archive,
260264
) -> Self {
261265
info!(
@@ -272,6 +276,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
272276
genesis_timestamp,
273277
dusk_key,
274278
finality_activation,
279+
blob_expire_after,
275280
#[cfg(feature = "archive")]
276281
archive,
277282
}

node/src/chain/acceptor.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ pub(crate) struct Acceptor<N: Network, DB: database::DB, VM: vm::VMExecution> {
103103
dusk_key: bls::PublicKey,
104104

105105
finality_activation: u64,
106+
blob_expire_after: u64,
106107
}
107108

108109
impl<DB: database::DB, VM: vm::VMExecution, N: Network> Drop
@@ -206,6 +207,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
206207
event_sender: Sender<Event>,
207208
dusk_key: bls::PublicKey,
208209
finality_activation: u64,
210+
blob_expire_after: u64,
209211
) -> anyhow::Result<Self> {
210212
let tip_height = tip.inner().header().height;
211213
let tip_state_hash = tip.inner().header().state_hash;
@@ -235,6 +237,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
235237
event_sender,
236238
dusk_key,
237239
finality_activation,
240+
blob_expire_after,
238241
};
239242

240243
// NB. After restart, state_root returned by VM is always the last
@@ -888,6 +891,9 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
888891
{
889892
let legacy = blk.header().height < self.finality_activation;
890893

894+
let new_final_heights: Vec<_> =
895+
new_finals.keys().cloned().collect();
896+
891897
let (_, new_final_state) =
892898
new_finals.pop_last().expect("new_finals to be not empty");
893899
let new_final_state_root = new_final_state.state_root;
@@ -904,6 +910,24 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
904910
};
905911

906912
vm.finalize_state(new_final_state_root, old_final_state_roots)?;
913+
914+
if self.blob_expire_after > 0 {
915+
let _ = self.db.read().await.update(|db| {
916+
for height in new_final_heights {
917+
if height < self.blob_expire_after {
918+
// Skip the check for first finalized blocks and prevent underflow error
919+
continue;
920+
}
921+
let expired_block = height - self.blob_expire_after;
922+
let _ = db.delete_blobs_by_height(expired_block).inspect_err(|e| {
923+
warn!("Error while deleting blobs for finalized block {expired_block}: {e}");
924+
});
925+
}
926+
Ok(())
927+
}).inspect_err(|e| {
928+
warn!("Error while deleting blobs while accepting block {}: {e}", blk.header().height);
929+
});
930+
}
907931
}
908932

909933
anyhow::Ok((label, finalized))

node/src/database.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ pub trait Ledger {
7878

7979
fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()>;
8080
fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>>;
81+
fn store_blobs_height(
82+
&self,
83+
block_height: u64,
84+
blob_hashes: &[[u8; 32]],
85+
) -> Result<()>;
86+
fn blobs_by_height(
87+
&self,
88+
block_height: u64,
89+
) -> Result<Option<Vec<[u8; 32]>>>;
90+
fn delete_blobs_by_height(&self, block_height: u64) -> Result<()>;
8191

8292
fn block_exists(&self, hash: &[u8]) -> Result<bool>;
8393

node/src/database/rocksdb.rs

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::sync::Arc;
1212
use std::{io, vec};
1313

1414
use anyhow::Result;
15+
use dusk_core::transfer::data::BlobSidecar;
1516
use node_data::ledger::{
1617
Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
1718
};
@@ -34,6 +35,7 @@ use crate::database::Mempool;
3435
const CF_LEDGER_HEADER: &str = "cf_ledger_header";
3536
const CF_LEDGER_TXS: &str = "cf_ledger_txs";
3637
const CF_LEDGER_BLOBS: &str = "cf_ledger_blobs";
38+
const CF_LEDGER_BLOBS_HEIGHT: &str = "cf_ledger_blobs_height";
3739
const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
3840
const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
3941
const CF_CANDIDATES: &str = "cf_candidates";
@@ -128,6 +130,11 @@ impl Backend {
128130
.cf_handle(CF_LEDGER_BLOBS)
129131
.expect("CF_LEDGER_BLOBS column family must exist");
130132

133+
let ledger_blobs_height_cf = self
134+
.rocksdb
135+
.cf_handle(CF_LEDGER_BLOBS_HEIGHT)
136+
.expect("CF_LEDGER_BLOBS_HEIGHT column family must exist");
137+
131138
DBTransaction::<'_, OptimisticTransactionDB> {
132139
inner,
133140
candidates_cf,
@@ -141,6 +148,7 @@ impl Backend {
141148
fees_cf,
142149
ledger_height_cf,
143150
ledger_blobs_cf,
151+
ledger_blobs_height_cf,
144152
metadata_cf,
145153
cumulative_inner_size: RefCell::new(0),
146154
}
@@ -211,6 +219,10 @@ impl DB for Backend {
211219
CF_LEDGER_BLOBS,
212220
blocks_cf_opts.clone(),
213221
),
222+
ColumnFamilyDescriptor::new(
223+
CF_LEDGER_BLOBS_HEIGHT,
224+
blocks_cf_opts.clone(),
225+
),
214226
ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
215227
ColumnFamilyDescriptor::new(
216228
CF_CANDIDATES_HEIGHT,
@@ -305,6 +317,7 @@ pub struct DBTransaction<'db, DB: DBAccess> {
305317
ledger_txs_cf: &'db ColumnFamily,
306318
ledger_height_cf: &'db ColumnFamily,
307319
ledger_blobs_cf: &'db ColumnFamily,
320+
ledger_blobs_height_cf: &'db ColumnFamily,
308321

309322
// Mempool column families
310323
mempool_cf: &'db ColumnFamily,
@@ -347,15 +360,19 @@ impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
347360
{
348361
let cf = self.ledger_txs_cf;
349362

363+
let mut stored_blobs = Vec::with_capacity(6);
364+
350365
// store all block transactions
351366
for tx in txs {
352367
let mut d = vec![];
368+
353369
if tx.inner.inner.blob().is_some() {
354370
let mut strip_tx = tx.clone();
355371
if let Some(blobs) = strip_tx.inner.inner.strip_blobs() {
356372
for (hash, sidecar) in blobs.into_iter() {
357373
let sidecar_bytes = sidecar.to_var_bytes();
358374
self.store_blob_data(&hash, sidecar_bytes)?;
375+
stored_blobs.push(hash);
359376
}
360377
}
361378
strip_tx.write(&mut d)?;
@@ -364,6 +381,11 @@ impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
364381
}
365382
self.put_cf(cf, tx.inner.id(), d)?;
366383
}
384+
385+
if !stored_blobs.is_empty() {
386+
// Store all blobs hashes in the ledger
387+
self.store_blobs_height(header.height, &stored_blobs)?;
388+
}
367389
}
368390

369391
// COLUMN FAMILY: CF_LEDGER_FAULTS
@@ -438,6 +460,7 @@ impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
438460
self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
439461
}
440462

463+
self.delete_blobs_by_height(b.header().height)?;
441464
self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
442465

443466
Ok(())
@@ -477,6 +500,61 @@ impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
477500
self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
478501
Ok(())
479502
}
503+
fn store_blobs_height(
504+
&self,
505+
block_height: u64,
506+
blob_hashes: &[[u8; 32]],
507+
) -> Result<()> {
508+
if blob_hashes.is_empty() {
509+
return Ok(());
510+
}
511+
let blob_hashes_bytes: Vec<_> =
512+
blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
513+
self.inner.put_cf(
514+
self.ledger_blobs_height_cf,
515+
block_height.to_be_bytes(),
516+
blob_hashes_bytes,
517+
)?;
518+
Ok(())
519+
}
520+
521+
fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
522+
let blobs_to_delete = self.blobs_by_height(block_height)?;
523+
if let Some(blob_hashes) = blobs_to_delete {
524+
for hash in blob_hashes {
525+
// What happen if the blobs also exists linked to another
526+
// transaction?
527+
self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
528+
}
529+
self.inner.delete_cf(
530+
self.ledger_blobs_height_cf,
531+
block_height.to_be_bytes(),
532+
)?;
533+
}
534+
535+
Ok(())
536+
}
537+
538+
fn blobs_by_height(
539+
&self,
540+
block_height: u64,
541+
) -> Result<Option<Vec<[u8; 32]>>> {
542+
let blob_hashes_bytes = self
543+
.inner
544+
.get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
545+
546+
if let Some(blob_hashes_bytes) = blob_hashes_bytes {
547+
let mut blob_hashes = vec![];
548+
for chunk in blob_hashes_bytes.chunks(32) {
549+
let mut hash = [0u8; 32];
550+
hash.copy_from_slice(chunk);
551+
blob_hashes.push(hash);
552+
}
553+
Ok(Some(blob_hashes))
554+
} else {
555+
Ok(None)
556+
}
557+
}
480558

481559
fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
482560
match self.inner.get_cf(self.ledger_cf, hash)? {
@@ -495,7 +573,24 @@ impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
495573
let mut txs = vec![];
496574
for buf in txs_buffers {
497575
let buf = buf?.unwrap();
498-
let tx = SpentTransaction::read(&mut &buf[..])?;
576+
let mut tx = SpentTransaction::read(&mut &buf[..])?;
577+
if let Some(blobs) = tx.inner.inner.blob_mut() {
578+
for blob in blobs {
579+
// Retrieve blob data from the ledger
580+
let sidecar = self
581+
.blob_data_by_hash(&blob.hash)?
582+
.map(|bytes| {
583+
BlobSidecar::from_buf(&mut &bytes[..])
584+
})
585+
.transpose()
586+
.map_err(|e| {
587+
anyhow::anyhow!(
588+
"Failed to parse blob sidecar: {e:?}"
589+
)
590+
})?;
591+
blob.data = sidecar;
592+
}
593+
}
499594
txs.push(tx.inner);
500595
}
501596

rusk-wallet/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub mod currency;
2828
pub mod dat;
2929
pub mod gas;
3030

31+
pub use dusk_core::stake::EPOCH;
3132
pub use error::Error;
3233
pub use gql::{BlockData, BlockTransaction, GraphQL};
3334
pub use rues::HttpClient as RuesHttpClient;
@@ -43,8 +44,6 @@ pub const MAX_FUNCTION_NAME_SIZE: usize = 64;
4344
pub const MAX_CONVERTIBLE: Dusk = Dusk::MAX;
4445
/// The smallest amount of Dusk that is possible to convert
4546
pub const MIN_CONVERTIBLE: Dusk = Dusk::new(1);
46-
/// The length of an epoch in blocks
47-
pub const EPOCH: u64 = 2160;
4847
/// Max addresses the wallet can store
4948
pub const MAX_PROFILES: usize = get_max_profiles();
5049
/// Size in bytes of the IV used to encrypt wallet data

rusk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
- Add `mempool_nonce` field to `/on/account/status` response
1414
- Add `status` to GQL block fields
1515
- Add activaction height for host queries
16+
- Add blob config section
1617

1718
### Changed
1819

rusk/default.config.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@
2323
#consensus_keys_path = '/home/user/.dusk/rusk/consensus.keys'
2424
min_gas_limit = 150000
2525

26+
[blob]
27+
# If no setting is provided, it falls back to default value according to feature flag:
28+
# - `archive`: 0 (never expire)
29+
# - !`archive`: 86400 block (40 epochs -> 10 days)
30+
31+
# expire_after = <#blocks>
32+
2633
# Note: changing the vm settings is equivalent to forking the chain.
2734
[vm]
2835
generation_timeout = '3s'

rusk/src/bin/config.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
//
55
// Copyright (c) DUSK NETWORK. All rights reserved.
66

7+
#[cfg(feature = "chain")]
8+
pub mod blob;
79
#[cfg(feature = "chain")]
810
pub mod chain;
911
#[cfg(feature = "chain")]
@@ -22,8 +24,8 @@ use std::str::FromStr;
2224

2325
#[cfg(feature = "chain")]
2426
use self::{
25-
chain::ChainConfig, databroker::DataBrokerConfig, kadcast::KadcastConfig,
26-
mempool::MempoolConfig, telemetry::TelemetryConfig,
27+
blob::BlobConfig, chain::ChainConfig, databroker::DataBrokerConfig,
28+
kadcast::KadcastConfig, mempool::MempoolConfig, telemetry::TelemetryConfig,
2729
};
2830

2931
#[cfg(feature = "chain")]
@@ -67,6 +69,10 @@ pub(crate) struct Config {
6769
#[cfg(feature = "chain")]
6870
#[serde(default = "MempoolConfig::default")]
6971
pub(crate) mempool: MempoolConfig,
72+
73+
#[cfg(feature = "chain")]
74+
#[serde(default = "BlobConfig::default")]
75+
pub(crate) blob: BlobConfig,
7076
}
7177

7278
/// Default log_level.

rusk/src/bin/config/blob.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
//
5+
// Copyright (c) DUSK NETWORK. All rights reserved.
6+
7+
use serde::{Deserialize, Serialize};
8+
9+
#[derive(Serialize, Deserialize, Clone, Default)]
10+
pub(crate) struct BlobConfig {
11+
pub expire_after: Option<u64>,
12+
}

0 commit comments

Comments
 (0)