Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

if TYPE_CHECKING:
from openlineage.client.facet_v2 import RunFacet
pytestmark = pytest.mark.db_test


INPUTS = [Dataset(namespace="database://host:port", name="inputtable")]
OUTPUTS = [Dataset(namespace="database://host:port", name="inputtable")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datetime import datetime
from unittest.mock import patch

import pytest
from openlineage.client.facet_v2 import source_code_job

from airflow import DAG
Expand All @@ -30,8 +29,6 @@

from tests_common.test_utils.compat import BashOperator

pytestmark = pytest.mark.db_test

with DAG(
dag_id="test_dummy_dag",
description="Test dummy DAG",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ def get_openlineage_facets_on_start(self):
assert metadata.outputs == []


@pytest.mark.db_test
@pytest.mark.skipif(
AIRFLOW_V_3_0_PLUS,
reason="Test for hook level lineage in Airflow < 3.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from datetime import datetime
from unittest.mock import patch

import pytest
from openlineage.client.facet_v2 import source_code_job

from airflow import DAG
Expand All @@ -32,8 +31,6 @@

from tests_common.test_utils.compat import BashOperator, PythonOperator

pytestmark = pytest.mark.db_test

dag = DAG(
dag_id="test_dummy_dag",
description="Test dummy DAG",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

pytestmark = pytest.mark.db_test


@pytest.mark.parametrize(
"env_vars, expected_logging",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def setup_job(self, task_name, run_id):

return job_runner.task_runner.return_code(timeout=60)

@pytest.mark.db_test
@conf_vars({("openlineage", "transport"): f'{{"type": "file", "log_file_path": "{listener_path}"}}'})
def test_not_stalled_task_emits_proper_lineage(self):
task_name = "execute_no_stall"
Expand All @@ -122,7 +121,6 @@ def test_not_stalled_task_emits_proper_lineage(self):
assert has_value_in_events(events, ["inputs", "name"], "on-start")
assert has_value_in_events(events, ["inputs", "name"], "on-complete")

@pytest.mark.db_test
@conf_vars({("openlineage", "transport"): f'{{"type": "file", "log_file_path": "{listener_path}"}}'})
def test_not_stalled_failing_task_emits_proper_lineage(self):
task_name = "execute_fail"
Expand All @@ -139,7 +137,6 @@ def test_not_stalled_failing_task_emits_proper_lineage(self):
("openlineage", "execution_timeout"): "15",
}
)
@pytest.mark.db_test
def test_short_stalled_task_emits_proper_lineage(self):
self.setup_job("execute_short_stall", "test_short_stalled_task_emits_proper_lineage")
events = get_sorted_events(tmp_dir)
Expand All @@ -152,7 +149,6 @@ def test_short_stalled_task_emits_proper_lineage(self):
("openlineage", "execution_timeout"): "3",
}
)
@pytest.mark.db_test
def test_short_stalled_task_extraction_with_low_execution_is_killed_by_ol_timeout(self):
self.setup_job(
"execute_short_stall",
Expand All @@ -163,7 +159,6 @@ def test_short_stalled_task_extraction_with_low_execution_is_killed_by_ol_timeou
assert not has_value_in_events(events, ["inputs", "name"], "on-complete")

@conf_vars({("openlineage", "transport"): f'{{"type": "file", "log_file_path": "{listener_path}"}}'})
@pytest.mark.db_test
def test_mid_stalled_task_is_killed_by_ol_timeout(self):
self.setup_job("execute_mid_stall", "test_mid_stalled_task_is_killed_by_openlineage")
events = get_sorted_events(tmp_dir)
Expand All @@ -177,7 +172,6 @@ def test_mid_stalled_task_is_killed_by_ol_timeout(self):
("core", "task_success_overtime"): "3",
}
)
@pytest.mark.db_test
def test_success_overtime_kills_tasks(self):
# This test checks whether LocalTaskJobRunner kills OL listener which take
# longer time than permitted by core.task_success_overtime setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
from tests_common.test_utils.db import clear_db_runs
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

pytestmark = pytest.mark.db_test

EXPECTED_TRY_NUMBER_1 = 1

TRY_NUMBER_BEFORE_EXECUTION = 0
Expand Down Expand Up @@ -757,6 +755,7 @@ def test_listener_on_dag_run_state_changes_configure_process_pool_size(
mock_executor.assert_called_once_with(max_workers=expected, initializer=mock.ANY)
mock_executor.return_value.submit.assert_called_once()

@pytest.mark.db_test
@pytest.mark.parametrize(
("method", "dag_run_state"),
[
Expand Down Expand Up @@ -1574,6 +1573,7 @@ def test_listener_on_dag_run_state_changes_configure_process_pool_size(
mock_executor.assert_called_once_with(max_workers=expected, initializer=mock.ANY)
mock_executor.return_value.submit.assert_called_once()

@pytest.mark.db_test
@pytest.mark.parametrize(
("method", "dag_run_state"),
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,6 @@ def test_dagrun_info_af3(mocked_dag_versions):


@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 2 test")
@pytest.mark.db_test
def test_dagrun_info_af2():
date = datetime.datetime(2024, 6, 1, tzinfo=datetime.timezone.utc)
dag = DAG(
Expand Down
Loading