diff --git a/postgres/tests/test_cursor.py b/postgres/tests/test_cursor.py index 2482d4d5b0c68..589bb9cdcfea4 100644 --- a/postgres/tests/test_cursor.py +++ b/postgres/tests/test_cursor.py @@ -7,10 +7,10 @@ @pytest.mark.integration -@pytest.mark.flaky @pytest.mark.usefixtures('dd_environment') @pytest.mark.parametrize('ignore', [True, False]) def test_integration_connection_with_commenter_cursor(integration_check, pg_instance, ignore): + pg_instance['application_name'] = 'test_integration_connection_with_commenter_cursor_{}'.format(ignore) check = integration_check(pg_instance) with check.db() as conn: @@ -44,9 +44,10 @@ def __check_prepand_sql_comment(pg_instance, ignore): with super_conn.cursor() as cursor: cursor.execute( ( - "SELECT query FROM pg_stat_activity where query like '%generate_series%' " + "SELECT query FROM pg_stat_activity where application_name = %s and query like '%%generate_series%%' " "and query not like '%%pg_stat_activity%%'" - ) + ), + (pg_instance['application_name'],), ) result = cursor.fetchall() assert len(result) > 0 diff --git a/postgres/tests/test_progress_stats.py b/postgres/tests/test_progress_stats.py index 444de384b7fd3..11ccde6eae837 100644 --- a/postgres/tests/test_progress_stats.py +++ b/postgres/tests/test_progress_stats.py @@ -30,8 +30,8 @@ pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] -def _check_analyze_progress(check, pg_instance, table): - thread = run_vacuum_thread(pg_instance, f'ANALYZE {table}') +def _check_analyze_progress(check, pg_instance, table, application_name): + thread = run_vacuum_thread(pg_instance, f'ANALYZE {table}', application_name) # Wait for vacuum to be reported _wait_for_value( @@ -44,14 +44,15 @@ def _check_analyze_progress(check, pg_instance, table): check.check(pg_instance) # Kill vacuum and cleanup thread - kill_vacuum(pg_instance) + kill_vacuum(pg_instance, application_name) thread.join() @requires_over_13 def test_analyze_progress_inherited(aggregator, integration_check, pg_instance): + pg_instance['application_name'] = 'test_analyze_progress_inherited' check = integration_check(pg_instance) - _check_analyze_progress(check, pg_instance, 'test_part') + _check_analyze_progress(check, pg_instance, 'test_part', pg_instance['application_name']) expected_tags = _get_expected_tags(check, pg_instance) + [ 'child_relation:test_part1', 'phase:acquiring inherited sample rows', @@ -64,8 +65,9 @@ def test_analyze_progress_inherited(aggregator, integration_check, pg_instance): @requires_over_13 def test_analyze_progress(aggregator, integration_check, pg_instance): + pg_instance['application_name'] = 'test_analyze_progress' check = integration_check(pg_instance) - _check_analyze_progress(check, pg_instance, 'test_part1') + _check_analyze_progress(check, pg_instance, 'test_part1', pg_instance['application_name']) expected_tags = _get_expected_tags(check, pg_instance) + [ 'phase:acquiring sample rows', 'table:test_part1', @@ -77,10 +79,13 @@ def test_analyze_progress(aggregator, integration_check, pg_instance): @requires_over_17 def test_vacuum_progress(aggregator, integration_check, pg_instance): + pg_instance['application_name'] = 'test_vacuum_progress' check = integration_check(pg_instance) # Start vacuum - thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1') + thread = run_vacuum_thread( + pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1', pg_instance['application_name'] + ) # Wait for vacuum to be reported _wait_for_value( @@ -93,7 +98,7 @@ def test_vacuum_progress(aggregator, integration_check, pg_instance): check.check(pg_instance) # Kill vacuum and cleanup thread - kill_vacuum(pg_instance) + kill_vacuum(pg_instance, pg_instance['application_name']) thread.join() expected_tags = _get_expected_tags(check, pg_instance) + [ @@ -108,10 +113,13 @@ def test_vacuum_progress(aggregator, integration_check, pg_instance): @requires_over_12 @requires_under_17 def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance): + pg_instance['application_name'] = 'test_vacuum_progress_lt_17' check = integration_check(pg_instance) # Start vacuum - thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1') + thread = run_vacuum_thread( + pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1', pg_instance['application_name'] + ) # Wait for vacuum to be reported _wait_for_value( @@ -124,7 +132,7 @@ def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance): check.check(pg_instance) # Kill vacuum and cleanup thread - kill_vacuum(pg_instance) + kill_vacuum(pg_instance, pg_instance['application_name']) thread.join() expected_tags = _get_expected_tags(check, pg_instance) + [ @@ -138,13 +146,18 @@ def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance): @requires_over_12 def test_index_progress(aggregator, integration_check, pg_instance): + pg_instance['application_name'] = 'test_index_progress' check = integration_check(pg_instance) # Keep test_part locked to prevent create index concurrently from finishing conn = lock_table(pg_instance, 'test_part1', 'ROW EXCLUSIVE') # Start vacuum in a thread - thread = run_query_thread(pg_instance, 'CREATE INDEX CONCURRENTLY test_progress_index ON test_part1 (id);') + thread = run_query_thread( + pg_instance, + 'CREATE INDEX CONCURRENTLY test_progress_index ON test_part1 (id);', + pg_instance['application_name'], + ) # Wait for blocked created index to appear _wait_for_value( @@ -156,7 +169,7 @@ def test_index_progress(aggregator, integration_check, pg_instance): check.check(pg_instance) # Kill the create index - kill_session(pg_instance, 'CREATE INDEX') + kill_session(pg_instance, 'CREATE INDEX', pg_instance['application_name']) # Cleanup connection and thread conn.close() @@ -177,15 +190,21 @@ def test_index_progress(aggregator, integration_check, pg_instance): @requires_over_12 def test_cluster_vacuum_progress(aggregator, integration_check, pg_instance): + pg_instance['application_name'] = 'test_cluster_vacuum_progress' check = integration_check(pg_instance) # Keep pg_class lock to block vacuum full during initilizing phase - conn = lock_table(pg_instance, 'pg_catalog.pg_class', 'EXCLUSIVE') + conn = lock_table(pg_instance, 'pg_catalog.pg_class', 'EXCLUSIVE', pg_instance['application_name']) # Start vacuum in a thread - thread = run_vacuum_thread(pg_instance, 'VACUUM FULL personsdup1') + thread = run_vacuum_thread(pg_instance, 'VACUUM FULL personsdup1', pg_instance['application_name']) - _wait_for_value(pg_instance, lower_threshold=0, query="select count(*) FROM pg_stat_progress_cluster;") + _wait_for_value( + pg_instance, + lower_threshold=0, + query="select count(*) FROM pg_stat_progress_cluster;", + application_name=pg_instance['application_name'], + ) check.check(pg_instance) # Cleanup connection and thread diff --git a/postgres/tests/utils.py b/postgres/tests/utils.py index dcf6ce13c41da..afc47ec52579e 100644 --- a/postgres/tests/utils.py +++ b/postgres/tests/utils.py @@ -67,16 +67,16 @@ def _get_superconn(db_instance, application_name='test', autocommit=True): ) -def lock_table(pg_instance, table, lock_mode): - lock_conn = _get_superconn(pg_instance) +def lock_table(pg_instance, table, lock_mode, application_name='test'): + lock_conn = _get_superconn(pg_instance, application_name) cur = lock_conn.cursor() cur.execute('BEGIN') cur.execute(f'lock {table} IN {lock_mode} MODE') return lock_conn -def kill_session(pg_instance, query_pattern): - with _get_superconn(pg_instance) as conn: +def kill_session(pg_instance, query_pattern, application_name='test'): + with _get_superconn(pg_instance, application_name) as conn: with conn.cursor() as cur: cur.execute( f"""SELECT pg_cancel_backend(pid) @@ -85,19 +85,19 @@ def kill_session(pg_instance, query_pattern): ) -def kill_vacuum(pg_instance): - kill_session(pg_instance, '^vacuum') +def kill_vacuum(pg_instance, application_name='test'): + kill_session(pg_instance, '^vacuum', application_name) # Wait until the query yielding a single value cross the provided threshold -def _wait_for_value(db_instance, lower_threshold, query, attempts=10): +def _wait_for_value(db_instance, lower_threshold, query, attempts=10, application_name='test'): value = 0 current_attempt = 0 # Stats table behave slightly differently than normal tables # Repeating the same query within a transaction will yield the # same value, despite the fact that the transaction is in READ COMMITED # To avoid this, we avoid transaction block created by the with statement - conn = _get_superconn(db_instance) + conn = _get_superconn(db_instance, application_name) while value <= lower_threshold and current_attempt < attempts: with conn.cursor() as cur: cur.execute(query)