Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions postgres/tests/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@


@pytest.mark.integration
@pytest.mark.flaky
@pytest.mark.usefixtures('dd_environment')
@pytest.mark.parametrize('ignore', [True, False])
def test_integration_connection_with_commenter_cursor(integration_check, pg_instance, ignore):
pg_instance['application_name'] = 'test_integration_connection_with_commenter_cursor_{}'.format(ignore)
check = integration_check(pg_instance)

with check.db() as conn:
Expand Down Expand Up @@ -44,9 +44,10 @@ def __check_prepand_sql_comment(pg_instance, ignore):
with super_conn.cursor() as cursor:
cursor.execute(
(
"SELECT query FROM pg_stat_activity where query like '%generate_series%' "
"SELECT query FROM pg_stat_activity where application_name = %s and query like '%%generate_series%%' "
"and query not like '%%pg_stat_activity%%'"
)
),
(pg_instance['application_name'],),
)
result = cursor.fetchall()
assert len(result) > 0
Expand Down
47 changes: 33 additions & 14 deletions postgres/tests/test_progress_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')]


def _check_analyze_progress(check, pg_instance, table):
thread = run_vacuum_thread(pg_instance, f'ANALYZE {table}')
def _check_analyze_progress(check, pg_instance, table, application_name):
thread = run_vacuum_thread(pg_instance, f'ANALYZE {table}', application_name)

# Wait for vacuum to be reported
_wait_for_value(
Expand All @@ -44,14 +44,15 @@ def _check_analyze_progress(check, pg_instance, table):
check.check(pg_instance)

# Kill vacuum and cleanup thread
kill_vacuum(pg_instance)
kill_vacuum(pg_instance, application_name)
thread.join()


@requires_over_13
def test_analyze_progress_inherited(aggregator, integration_check, pg_instance):
pg_instance['application_name'] = 'test_analyze_progress_inherited'
check = integration_check(pg_instance)
_check_analyze_progress(check, pg_instance, 'test_part')
_check_analyze_progress(check, pg_instance, 'test_part', pg_instance['application_name'])
expected_tags = _get_expected_tags(check, pg_instance) + [
'child_relation:test_part1',
'phase:acquiring inherited sample rows',
Expand All @@ -64,8 +65,9 @@ def test_analyze_progress_inherited(aggregator, integration_check, pg_instance):

@requires_over_13
def test_analyze_progress(aggregator, integration_check, pg_instance):
pg_instance['application_name'] = 'test_analyze_progress'
check = integration_check(pg_instance)
_check_analyze_progress(check, pg_instance, 'test_part1')
_check_analyze_progress(check, pg_instance, 'test_part1', pg_instance['application_name'])
expected_tags = _get_expected_tags(check, pg_instance) + [
'phase:acquiring sample rows',
'table:test_part1',
Expand All @@ -77,10 +79,13 @@ def test_analyze_progress(aggregator, integration_check, pg_instance):

@requires_over_17
def test_vacuum_progress(aggregator, integration_check, pg_instance):
pg_instance['application_name'] = 'test_vacuum_progress'
check = integration_check(pg_instance)

# Start vacuum
thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1')
thread = run_vacuum_thread(
pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1', pg_instance['application_name']
)

# Wait for vacuum to be reported
_wait_for_value(
Expand All @@ -93,7 +98,7 @@ def test_vacuum_progress(aggregator, integration_check, pg_instance):
check.check(pg_instance)

# Kill vacuum and cleanup thread
kill_vacuum(pg_instance)
kill_vacuum(pg_instance, pg_instance['application_name'])
thread.join()

expected_tags = _get_expected_tags(check, pg_instance) + [
Expand All @@ -108,10 +113,13 @@ def test_vacuum_progress(aggregator, integration_check, pg_instance):
@requires_over_12
@requires_under_17
def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance):
pg_instance['application_name'] = 'test_vacuum_progress_lt_17'
check = integration_check(pg_instance)

# Start vacuum
thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1')
thread = run_vacuum_thread(
pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1', pg_instance['application_name']
)

# Wait for vacuum to be reported
_wait_for_value(
Expand All @@ -124,7 +132,7 @@ def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance):
check.check(pg_instance)

# Kill vacuum and cleanup thread
kill_vacuum(pg_instance)
kill_vacuum(pg_instance, pg_instance['application_name'])
thread.join()

expected_tags = _get_expected_tags(check, pg_instance) + [
Expand All @@ -138,13 +146,18 @@ def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance):

@requires_over_12
def test_index_progress(aggregator, integration_check, pg_instance):
pg_instance['application_name'] = 'test_index_progress'
check = integration_check(pg_instance)

# Keep test_part locked to prevent create index concurrently from finishing
conn = lock_table(pg_instance, 'test_part1', 'ROW EXCLUSIVE')

# Start vacuum in a thread
thread = run_query_thread(pg_instance, 'CREATE INDEX CONCURRENTLY test_progress_index ON test_part1 (id);')
thread = run_query_thread(
pg_instance,
'CREATE INDEX CONCURRENTLY test_progress_index ON test_part1 (id);',
pg_instance['application_name'],
)

# Wait for blocked created index to appear
_wait_for_value(
Expand All @@ -156,7 +169,7 @@ def test_index_progress(aggregator, integration_check, pg_instance):
check.check(pg_instance)

# Kill the create index
kill_session(pg_instance, 'CREATE INDEX')
kill_session(pg_instance, 'CREATE INDEX', pg_instance['application_name'])

# Cleanup connection and thread
conn.close()
Expand All @@ -177,15 +190,21 @@ def test_index_progress(aggregator, integration_check, pg_instance):

@requires_over_12
def test_cluster_vacuum_progress(aggregator, integration_check, pg_instance):
pg_instance['application_name'] = 'test_cluster_vacuum_progress'
check = integration_check(pg_instance)

# Keep pg_class lock to block vacuum full during initilizing phase
conn = lock_table(pg_instance, 'pg_catalog.pg_class', 'EXCLUSIVE')
conn = lock_table(pg_instance, 'pg_catalog.pg_class', 'EXCLUSIVE', pg_instance['application_name'])

# Start vacuum in a thread
thread = run_vacuum_thread(pg_instance, 'VACUUM FULL personsdup1')
thread = run_vacuum_thread(pg_instance, 'VACUUM FULL personsdup1', pg_instance['application_name'])

_wait_for_value(pg_instance, lower_threshold=0, query="select count(*) FROM pg_stat_progress_cluster;")
_wait_for_value(
pg_instance,
lower_threshold=0,
query="select count(*) FROM pg_stat_progress_cluster;",
application_name=pg_instance['application_name'],
)
check.check(pg_instance)

# Cleanup connection and thread
Expand Down
16 changes: 8 additions & 8 deletions postgres/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ def _get_superconn(db_instance, application_name='test', autocommit=True):
)


def lock_table(pg_instance, table, lock_mode):
lock_conn = _get_superconn(pg_instance)
def lock_table(pg_instance, table, lock_mode, application_name='test'):
lock_conn = _get_superconn(pg_instance, application_name)
cur = lock_conn.cursor()
cur.execute('BEGIN')
cur.execute(f'lock {table} IN {lock_mode} MODE')
return lock_conn


def kill_session(pg_instance, query_pattern):
with _get_superconn(pg_instance) as conn:
def kill_session(pg_instance, query_pattern, application_name='test'):
with _get_superconn(pg_instance, application_name) as conn:
with conn.cursor() as cur:
cur.execute(
f"""SELECT pg_cancel_backend(pid)
Expand All @@ -85,19 +85,19 @@ def kill_session(pg_instance, query_pattern):
)


def kill_vacuum(pg_instance):
kill_session(pg_instance, '^vacuum')
def kill_vacuum(pg_instance, application_name='test'):
kill_session(pg_instance, '^vacuum', application_name)


# Wait until the query yielding a single value cross the provided threshold
def _wait_for_value(db_instance, lower_threshold, query, attempts=10):
def _wait_for_value(db_instance, lower_threshold, query, attempts=10, application_name='test'):
value = 0
current_attempt = 0
# Stats table behave slightly differently than normal tables
# Repeating the same query within a transaction will yield the
# same value, despite the fact that the transaction is in READ COMMITED
# To avoid this, we avoid transaction block created by the with statement
conn = _get_superconn(db_instance)
conn = _get_superconn(db_instance, application_name)
while value <= lower_threshold and current_attempt < attempts:
with conn.cursor() as cur:
cur.execute(query)
Expand Down
Loading