Skip to content

Commit 11df843

Browse files
refactor(scheduler): remove scheduler options & adding graceful shutdown to producer (#840)
Co-authored-by: Arun Raj M <[email protected]>
1 parent b8bcba4 commit 11df843

File tree

10 files changed

+60
-91
lines changed

10 files changed

+60
-91
lines changed

config/config.example.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ cards = [
170170
# It defines the the streams/queues name and configuration as well as event selection variables
171171
[scheduler]
172172
stream = "SCHEDULER_STREAM"
173+
graceful_shutdown_interval = 60000 # Specifies how much time to wait while re-attempting shutdown for a service (in milliseconds)
174+
loop_interval = 5000 # Specifies how much time to wait before starting the defined behaviour of producer or consumer (in milliseconds)
173175

174176
[scheduler.consumer]
175177
consumer_group = "SCHEDULER_GROUP"

crates/router/src/bin/scheduler.rs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,28 +38,6 @@ async fn start_scheduler(
3838
) -> CustomResult<(), errors::ProcessTrackerError> {
3939
use std::str::FromStr;
4040

41-
let options = scheduler::SchedulerOptions {
42-
looper_interval: scheduler::Milliseconds {
43-
milliseconds: 5_000,
44-
},
45-
db_name: "".to_string(),
46-
cache_name: "".to_string(),
47-
schema_name: "".to_string(),
48-
cache_expiry: scheduler::Milliseconds {
49-
milliseconds: 30_000_000,
50-
},
51-
runners: vec![],
52-
fetch_limit: 30,
53-
fetch_limit_product_factor: 1,
54-
query_order: "".to_string(),
55-
readiness: scheduler::options::ReadinessOptions {
56-
is_ready: true,
57-
graceful_termination_duration: scheduler::Milliseconds {
58-
milliseconds: 60_000,
59-
},
60-
},
61-
};
62-
6341
#[allow(clippy::expect_used)]
6442
let flow = std::env::var(SCHEDULER_FLOW).expect("SCHEDULER_FLOW environment variable not set");
6543
#[allow(clippy::expect_used)]
@@ -71,6 +49,5 @@ async fn start_scheduler(
7149
.scheduler
7250
.clone()
7351
.ok_or(errors::ProcessTrackerError::ConfigurationError)?;
74-
scheduler::start_process_tracker(state, Arc::new(options), flow, Arc::new(scheduler_settings))
75-
.await
52+
scheduler::start_process_tracker(state, flow, Arc::new(scheduler_settings)).await
7653
}

crates/router/src/configs/defaults.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ impl Default for super::settings::SchedulerSettings {
9090
stream: "SCHEDULER_STREAM".into(),
9191
producer: super::settings::ProducerSettings::default(),
9292
consumer: super::settings::ConsumerSettings::default(),
93+
graceful_shutdown_interval: 60000,
94+
loop_interval: 5000,
9395
}
9496
}
9597
}

crates/router/src/configs/settings.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,8 @@ pub struct SchedulerSettings {
294294
pub stream: String,
295295
pub producer: ProducerSettings,
296296
pub consumer: ConsumerSettings,
297+
pub loop_interval: u64,
298+
pub graceful_shutdown_interval: u64,
297299
}
298300

299301
#[derive(Debug, Clone, Deserialize)]

crates/router/src/scheduler.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,12 @@ use crate::{
1919

2020
pub async fn start_process_tracker(
2121
state: &AppState,
22-
options: Arc<SchedulerOptions>,
2322
scheduler_flow: SchedulerFlow,
2423
scheduler_settings: Arc<SchedulerSettings>,
2524
) -> CustomResult<(), errors::ProcessTrackerError> {
2625
match scheduler_flow {
27-
SchedulerFlow::Producer => {
28-
producer::start_producer(state, Arc::clone(&options), scheduler_settings).await?
29-
}
30-
SchedulerFlow::Consumer => {
31-
consumer::start_consumer(state, Arc::clone(&options), scheduler_settings).await?
32-
}
26+
SchedulerFlow::Producer => producer::start_producer(state, scheduler_settings).await?,
27+
SchedulerFlow::Consumer => consumer::start_consumer(state, scheduler_settings).await?,
3328
SchedulerFlow::Cleaner => {
3429
error!("This flow has not been implemented yet!");
3530
}

crates/router/src/scheduler/consumer.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,19 @@ pub fn valid_business_statuses() -> Vec<&'static str> {
3535
#[instrument(skip_all)]
3636
pub async fn start_consumer(
3737
state: &AppState,
38-
options: sync::Arc<super::SchedulerOptions>,
3938
settings: sync::Arc<settings::SchedulerSettings>,
4039
) -> CustomResult<(), errors::ProcessTrackerError> {
4140
use std::time::Duration;
4241

4342
use rand::Rng;
4443

45-
let timeout = rand::thread_rng().gen_range(0..=options.looper_interval.milliseconds);
44+
let timeout = rand::thread_rng().gen_range(0..=settings.loop_interval);
4645
tokio::time::sleep(Duration::from_millis(timeout)).await;
4746

48-
let mut interval =
49-
tokio::time::interval(Duration::from_millis(options.looper_interval.milliseconds));
47+
let mut interval = tokio::time::interval(Duration::from_millis(settings.loop_interval));
5048

51-
let mut shutdown_interval = tokio::time::interval(Duration::from_millis(
52-
options.readiness.graceful_termination_duration.milliseconds,
53-
));
49+
let mut shutdown_interval =
50+
tokio::time::interval(Duration::from_millis(settings.graceful_shutdown_interval));
5451

5552
let consumer_operation_counter = sync::Arc::new(atomic::AtomicU64::new(0));
5653
let signal = get_allowed_signals()
@@ -76,7 +73,6 @@ pub async fn start_consumer(
7673

7774
tokio::task::spawn(pt_utils::consumer_operation_handler(
7875
state.clone(),
79-
options.clone(),
8076
settings.clone(),
8177
|err| {
8278
logger::error!(%err);
@@ -111,7 +107,6 @@ pub async fn start_consumer(
111107
#[instrument(skip_all)]
112108
pub async fn consumer_operations(
113109
state: &AppState,
114-
_options: &super::SchedulerOptions,
115110
settings: &settings::SchedulerSettings,
116111
) -> CustomResult<(), errors::ProcessTrackerError> {
117112
let stream_name = settings.stream.clone();

crates/router/src/scheduler/producer.rs

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

3-
use error_stack::{report, ResultExt};
3+
use common_utils::signals::oneshot;
4+
use error_stack::{report, IntoReport, ResultExt};
45
use router_env::{instrument, tracing};
56
use time::Duration;
67

@@ -9,61 +10,81 @@ use crate::{
910
configs::settings::SchedulerSettings,
1011
core::errors::{self, CustomResult},
1112
db::StorageInterface,
12-
logger::{debug, error, info, warn},
13+
logger::{self, debug, error, warn},
1314
routes::AppState,
14-
scheduler::{utils::*, SchedulerFlow, SchedulerOptions},
15+
scheduler::{utils::*, SchedulerFlow},
1516
types::storage::{self, enums::ProcessTrackerStatus},
1617
};
1718

1819
#[instrument(skip_all)]
1920
pub async fn start_producer(
2021
state: &AppState,
21-
options: Arc<SchedulerOptions>,
2222
scheduler_settings: Arc<SchedulerSettings>,
2323
) -> CustomResult<(), errors::ProcessTrackerError> {
2424
use rand::Rng;
2525

26-
let timeout = rand::thread_rng().gen_range(0..=options.looper_interval.milliseconds);
26+
let timeout = rand::thread_rng().gen_range(0..=scheduler_settings.loop_interval);
2727
tokio::time::sleep(std::time::Duration::from_millis(timeout)).await;
2828

2929
let mut interval = tokio::time::interval(std::time::Duration::from_millis(
30-
options.looper_interval.milliseconds,
30+
scheduler_settings.loop_interval,
3131
));
3232

33+
let mut shutdown_interval = tokio::time::interval(std::time::Duration::from_millis(
34+
scheduler_settings.graceful_shutdown_interval,
35+
));
36+
37+
let signal = common_utils::signals::get_allowed_signals()
38+
.map_err(|error| {
39+
logger::error!("Signal Handler Error: {:?}", error);
40+
errors::ProcessTrackerError::ConfigurationError
41+
})
42+
.into_report()
43+
.attach_printable("Failed while creating a signals handler")?;
44+
let (sx, mut rx) = oneshot::channel();
45+
let handle = signal.handle();
46+
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, sx));
47+
3348
loop {
34-
interval.tick().await;
35-
36-
let is_ready = options.readiness.is_ready;
37-
if is_ready {
38-
match run_producer_flow(state, &options, &scheduler_settings).await {
39-
Ok(_) => (),
40-
Err(error) => {
41-
// Intentionally not propagating error to caller.
42-
// Any errors that occur in the producer flow must be handled here only, as
43-
// this is the topmost level function which is concerned with the producer flow.
44-
error!(%error);
49+
match rx.try_recv() {
50+
Err(oneshot::error::TryRecvError::Empty) => {
51+
interval.tick().await;
52+
53+
match run_producer_flow(state, &scheduler_settings).await {
54+
Ok(_) => (),
55+
Err(error) => {
56+
// Intentionally not propagating error to caller.
57+
// Any errors that occur in the producer flow must be handled here only, as
58+
// this is the topmost level function which is concerned with the producer flow.
59+
error!(%error);
60+
}
4561
}
4662
}
47-
} else {
48-
// Currently the producer workflow isn't parallel and a direct termination
49-
// will not cause any loss of data.
50-
// [#268]: resolving this issue will require a different logic for handling this termination.
51-
info!("Terminating producer");
52-
break;
63+
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
64+
logger::debug!("Awaiting shutdown!");
65+
shutdown_interval.tick().await;
66+
67+
logger::info!("Terminating consumer");
68+
break;
69+
}
5370
}
5471
}
72+
handle.close();
73+
task_handle
74+
.await
75+
.into_report()
76+
.change_context(errors::ProcessTrackerError::UnexpectedFlow)?;
5577

5678
Ok(())
5779
}
5880

5981
#[instrument(skip_all)]
6082
pub async fn run_producer_flow(
6183
state: &AppState,
62-
op: &SchedulerOptions,
6384
settings: &SchedulerSettings,
6485
) -> CustomResult<(), errors::ProcessTrackerError> {
6586
lock_acquire_release::<_, _>(state, settings, move || async {
66-
let tasks = fetch_producer_tasks(&*state.store, op, settings).await?;
87+
let tasks = fetch_producer_tasks(&*state.store, settings).await?;
6788
debug!("Producer count of tasks {}", tasks.len());
6889

6990
// [#268]: Allow task based segregation of tasks
@@ -80,7 +101,6 @@ pub async fn run_producer_flow(
80101
#[instrument(skip_all)]
81102
pub async fn fetch_producer_tasks(
82103
db: &dyn StorageInterface,
83-
_options: &SchedulerOptions,
84104
conf: &SchedulerSettings,
85105
) -> CustomResult<Vec<storage::ProcessTracker>, errors::ProcessTrackerError> {
86106
let upper = conf.producer.upper_fetch_limit;

crates/router/src/scheduler/types.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
pub mod batch;
22
pub mod config;
33
pub mod flow;
4-
pub mod options;
54
pub mod process_data;
65
pub mod state;
76

87
pub use self::{
98
batch::ProcessTrackerBatch,
109
config::SchedulerConfig,
1110
flow::SchedulerFlow,
12-
options::{Milliseconds, SchedulerOptions},
1311
process_data::ProcessData,
1412
state::{DummyWorkflowState, WorkflowState},
1513
};

crates/router/src/scheduler/types/options.rs

Lines changed: 0 additions & 21 deletions
This file was deleted.

crates/router/src/scheduler/utils.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ pub fn get_time_from_delta(delta: Option<i32>) -> Option<time::PrimitiveDateTime
243243

244244
pub async fn consumer_operation_handler<E>(
245245
state: AppState,
246-
options: sync::Arc<super::SchedulerOptions>,
247246
settings: sync::Arc<SchedulerSettings>,
248247
error_handler_fun: E,
249248
consumer_operation_counter: sync::Arc<atomic::AtomicU64>,
@@ -254,7 +253,7 @@ pub async fn consumer_operation_handler<E>(
254253
consumer_operation_counter.fetch_add(1, atomic::Ordering::Release);
255254
let start_time = std_time::Instant::now();
256255

257-
match consumer::consumer_operations(&state, &options, &settings).await {
256+
match consumer::consumer_operations(&state, &settings).await {
258257
Ok(_) => (),
259258
Err(err) => error_handler_fun(err),
260259
}

0 commit comments

Comments
 (0)