Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ enum-ordinalize = { workspace = true }
indicatif = { workspace = true }
sqllogictest = { workspace = true }
toml = { workspace = true }
serde = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }

[package.metadata.cargo-machete]
# These dependencies are added to ensure minimal dependency version
Expand Down
24 changes: 12 additions & 12 deletions crates/sqllogictest/src/engine/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,15 @@ use indicatif::ProgressBar;
use sqllogictest::runner::AsyncDB;
use toml::Table as TomlTable;

use crate::engine::Engine;
use crate::engine::EngineRunner;
use crate::error::Result;

pub struct DataFusionEngine {
datafusion: DataFusion,
}

#[async_trait::async_trait]
impl Engine for DataFusionEngine {
async fn new(config: TomlTable) -> Result<Self> {
let session_config = SessionConfig::new().with_target_partitions(4);
let ctx = SessionContext::new_with_config(session_config);
ctx.register_catalog("default", Self::create_catalog(&config).await?);

Ok(Self {
datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)),
})
}

impl EngineRunner for DataFusionEngine {
async fn run_slt_file(&mut self, path: &Path) -> Result<()> {
let content = std::fs::read_to_string(path)
.with_context(|| format!("Failed to read slt file {:?}", path))
Expand All @@ -61,6 +51,16 @@ impl Engine for DataFusionEngine {
}

impl DataFusionEngine {
pub async fn new(config: TomlTable) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we move this out of trait?

let session_config = SessionConfig::new().with_target_partitions(4);
let ctx = SessionContext::new_with_config(session_config);
ctx.register_catalog("default", Self::create_catalog(&config).await?);

Ok(Self {
datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)),
})
}

async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn CatalogProvider>> {
todo!()
}
Expand Down
62 changes: 60 additions & 2 deletions crates/sqllogictest/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,68 @@ use std::path::Path;

use toml::Table as TomlTable;

use crate::engine::datafusion::DataFusionEngine;
use crate::error::Result;

const KEY_TYPE: &str = "type";
const TYPE_DATAFUSION: &str = "datafusion";

#[async_trait::async_trait]
pub trait Engine: Sized {
async fn new(config: TomlTable) -> Result<Self>;
pub trait EngineRunner: Sized {
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
}

pub enum Engine {
DataFusion(DataFusionEngine),
}

impl Engine {
pub async fn new(config: TomlTable) -> Result<Self> {
let engine_type = config
.get(KEY_TYPE)
.ok_or_else(|| anyhow::anyhow!("Missing required key: {KEY_TYPE}"))?
.as_str()
.ok_or_else(|| anyhow::anyhow!("Config value for {KEY_TYPE} must be a string"))?;

match engine_type {
TYPE_DATAFUSION => {
let engine = DataFusionEngine::new(config).await?;
Ok(Engine::DataFusion(engine))
}
_ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()),
}
}

pub async fn run_slt_file(&mut self, path: &Path) -> Result<()> {
match self {
Engine::DataFusion(engine) => engine.run_slt_file(path).await,
}
}
}

#[cfg(test)]
mod tests {
use toml::Table as TomlTable;

use crate::engine::Engine;

#[tokio::test]
async fn test_engine_new_missing_type_key() {
let config = TomlTable::new();
let result = Engine::new(config).await;

assert!(result.is_err());
}

#[tokio::test]
async fn test_engine_invalid_type() {
let input = r#"
[engines]
random = { type = "random_engine", url = "http://localhost:8181" }
"#;
let tbl = toml::from_str(input).unwrap();
let result = Engine::new(tbl).await;

assert!(result.is_err());
}
}
2 changes: 2 additions & 0 deletions crates/sqllogictest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@
mod engine;
#[allow(dead_code)]
mod error;
#[allow(dead_code)]
mod schedule;
183 changes: 183 additions & 0 deletions crates/sqllogictest/src/schedule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fs::read_to_string;
use std::path::{Path, PathBuf};

use anyhow::{Context, anyhow};
use serde::{Deserialize, Serialize};
use toml::{Table as TomlTable, Value};
use tracing::info;

use crate::engine::Engine;

pub struct Schedule {
/// Engine names to engine instances
engines: HashMap<String, Engine>,
/// List of test steps to run
steps: Vec<Step>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Step {
/// Engine name
engine: String,
/// Stl file path
slt: String,
}

impl Schedule {
pub fn new(engines: HashMap<String, Engine>, steps: Vec<Step>) -> Self {
Self { engines, steps }
}

pub async fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
let content = read_to_string(path)?;
let toml_value = content.parse::<Value>()?;
let toml_table = toml_value
.as_table()
.ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?;

let engines = Schedule::parse_engines(toml_table).await?;
let steps = Schedule::parse_steps(toml_table)?;

Ok(Self::new(engines, steps))
}

pub async fn run(mut self) -> anyhow::Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log about current schedule file?

for (idx, step) in self.steps.iter().enumerate() {
info!(
"Running step {}/{}, using engine {}, slt file path: {}",
idx + 1,
self.steps.len(),
&step.engine,
&step.slt
);

let engine = self
.engines
.get_mut(&step.engine)
.ok_or_else(|| anyhow!("Engine {} not found", step.engine))?;

engine
.run_slt_file(&PathBuf::from(step.slt.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maybe a potential issue, the file path is relative to testdata dir, but we could fix it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.await?;
info!(
"Step {}/{}, engine {}, slt file path: {} finished",
idx + 1,
self.steps.len(),
&step.engine,
&step.slt
);
}
Ok(())
}

async fn parse_engines(table: &TomlTable) -> anyhow::Result<HashMap<String, Engine>> {
let engines_tbl = table
.get("engines")
.with_context(|| "Schedule file must have an 'engines' table")?
.as_table()
.ok_or_else(|| anyhow!("'engines' must be a table"))?;

let mut engines = HashMap::new();

for (name, engine_val) in engines_tbl {
let cfg_tbl = engine_val
.as_table()
.ok_or_else(|| anyhow!("Config of engine '{name}' is not a table"))?
.clone();

let engine = Engine::new(cfg_tbl)
.await
.with_context(|| format!("Failed to construct engine '{name}'"))?;

if engines.insert(name.clone(), engine).is_some() {
return Err(anyhow!("Duplicate engine '{name}'"));
}
}

Ok(engines)
}

fn parse_steps(table: &TomlTable) -> anyhow::Result<Vec<Step>> {
let steps_val = table
.get("steps")
.with_context(|| "Schedule file must have a 'steps' array")?;

let steps: Vec<Step> = steps_val
.clone()
.try_into()
.with_context(|| "Failed to deserialize steps")?;

Ok(steps)
}
}

#[cfg(test)]
mod tests {
use toml::Table as TomlTable;

use crate::schedule::Schedule;

#[test]
fn test_parse_steps() {
let input = r#"
[[steps]]
engine = "datafusion"
slt = "test.slt"
[[steps]]
engine = "spark"
slt = "test2.slt"
"#;

let tbl: TomlTable = toml::from_str(input).unwrap();
let steps = Schedule::parse_steps(&tbl).unwrap();

assert_eq!(steps.len(), 2);
assert_eq!(steps[0].engine, "datafusion");
assert_eq!(steps[0].slt, "test.slt");
assert_eq!(steps[1].engine, "spark");
assert_eq!(steps[1].slt, "test2.slt");
}

#[test]
fn test_parse_steps_empty() {
let input = r#"
[[steps]]
"#;

let tbl: TomlTable = toml::from_str(input).unwrap();
let steps = Schedule::parse_steps(&tbl);

assert!(steps.is_err());
}

#[tokio::test]
async fn test_parse_engines_invalid_table() {
let toml_content = r#"
engines = "not_a_table"
"#;

let table: TomlTable = toml::from_str(toml_content).unwrap();
let result = Schedule::parse_engines(&table).await;

assert!(result.is_err());
}
}
Loading