Skip to content

Commit dedbd65

Browse files
authored
Fix flaky Postgres cursor test (#21281)
* Fix flaky cursor test * Remove debug lines * isolate db interaction by test name * run format
1 parent 8eb2e3a commit dedbd65

File tree

3 files changed

+45
-25
lines changed

3 files changed

+45
-25
lines changed

postgres/tests/test_cursor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77

88

99
@pytest.mark.integration
10-
@pytest.mark.flaky
1110
@pytest.mark.usefixtures('dd_environment')
1211
@pytest.mark.parametrize('ignore', [True, False])
1312
def test_integration_connection_with_commenter_cursor(integration_check, pg_instance, ignore):
13+
pg_instance['application_name'] = 'test_integration_connection_with_commenter_cursor_{}'.format(ignore)
1414
check = integration_check(pg_instance)
1515

1616
with check.db() as conn:
@@ -44,9 +44,10 @@ def __check_prepand_sql_comment(pg_instance, ignore):
4444
with super_conn.cursor() as cursor:
4545
cursor.execute(
4646
(
47-
"SELECT query FROM pg_stat_activity where query like '%generate_series%' "
47+
"SELECT query FROM pg_stat_activity where application_name = %s and query like '%%generate_series%%' "
4848
"and query not like '%%pg_stat_activity%%'"
49-
)
49+
),
50+
(pg_instance['application_name'],),
5051
)
5152
result = cursor.fetchall()
5253
assert len(result) > 0

postgres/tests/test_progress_stats.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')]
3131

3232

33-
def _check_analyze_progress(check, pg_instance, table):
34-
thread = run_vacuum_thread(pg_instance, f'ANALYZE {table}')
33+
def _check_analyze_progress(check, pg_instance, table, application_name):
34+
thread = run_vacuum_thread(pg_instance, f'ANALYZE {table}', application_name)
3535

3636
# Wait for vacuum to be reported
3737
_wait_for_value(
@@ -44,14 +44,15 @@ def _check_analyze_progress(check, pg_instance, table):
4444
check.check(pg_instance)
4545

4646
# Kill vacuum and cleanup thread
47-
kill_vacuum(pg_instance)
47+
kill_vacuum(pg_instance, application_name)
4848
thread.join()
4949

5050

5151
@requires_over_13
5252
def test_analyze_progress_inherited(aggregator, integration_check, pg_instance):
53+
pg_instance['application_name'] = 'test_analyze_progress_inherited'
5354
check = integration_check(pg_instance)
54-
_check_analyze_progress(check, pg_instance, 'test_part')
55+
_check_analyze_progress(check, pg_instance, 'test_part', pg_instance['application_name'])
5556
expected_tags = _get_expected_tags(check, pg_instance) + [
5657
'child_relation:test_part1',
5758
'phase:acquiring inherited sample rows',
@@ -64,8 +65,9 @@ def test_analyze_progress_inherited(aggregator, integration_check, pg_instance):
6465

6566
@requires_over_13
6667
def test_analyze_progress(aggregator, integration_check, pg_instance):
68+
pg_instance['application_name'] = 'test_analyze_progress'
6769
check = integration_check(pg_instance)
68-
_check_analyze_progress(check, pg_instance, 'test_part1')
70+
_check_analyze_progress(check, pg_instance, 'test_part1', pg_instance['application_name'])
6971
expected_tags = _get_expected_tags(check, pg_instance) + [
7072
'phase:acquiring sample rows',
7173
'table:test_part1',
@@ -77,10 +79,13 @@ def test_analyze_progress(aggregator, integration_check, pg_instance):
7779

7880
@requires_over_17
7981
def test_vacuum_progress(aggregator, integration_check, pg_instance):
82+
pg_instance['application_name'] = 'test_vacuum_progress'
8083
check = integration_check(pg_instance)
8184

8285
# Start vacuum
83-
thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1')
86+
thread = run_vacuum_thread(
87+
pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1', pg_instance['application_name']
88+
)
8489

8590
# Wait for vacuum to be reported
8691
_wait_for_value(
@@ -93,7 +98,7 @@ def test_vacuum_progress(aggregator, integration_check, pg_instance):
9398
check.check(pg_instance)
9499

95100
# Kill vacuum and cleanup thread
96-
kill_vacuum(pg_instance)
101+
kill_vacuum(pg_instance, pg_instance['application_name'])
97102
thread.join()
98103

99104
expected_tags = _get_expected_tags(check, pg_instance) + [
@@ -108,10 +113,13 @@ def test_vacuum_progress(aggregator, integration_check, pg_instance):
108113
@requires_over_12
109114
@requires_under_17
110115
def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance):
116+
pg_instance['application_name'] = 'test_vacuum_progress_lt_17'
111117
check = integration_check(pg_instance)
112118

113119
# Start vacuum
114-
thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1')
120+
thread = run_vacuum_thread(
121+
pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING) test_part1', pg_instance['application_name']
122+
)
115123

116124
# Wait for vacuum to be reported
117125
_wait_for_value(
@@ -124,7 +132,7 @@ def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance):
124132
check.check(pg_instance)
125133

126134
# Kill vacuum and cleanup thread
127-
kill_vacuum(pg_instance)
135+
kill_vacuum(pg_instance, pg_instance['application_name'])
128136
thread.join()
129137

130138
expected_tags = _get_expected_tags(check, pg_instance) + [
@@ -138,13 +146,18 @@ def test_vacuum_progress_lt_17(aggregator, integration_check, pg_instance):
138146

139147
@requires_over_12
140148
def test_index_progress(aggregator, integration_check, pg_instance):
149+
pg_instance['application_name'] = 'test_index_progress'
141150
check = integration_check(pg_instance)
142151

143152
# Keep test_part locked to prevent create index concurrently from finishing
144153
conn = lock_table(pg_instance, 'test_part1', 'ROW EXCLUSIVE')
145154

146155
# Start vacuum in a thread
147-
thread = run_query_thread(pg_instance, 'CREATE INDEX CONCURRENTLY test_progress_index ON test_part1 (id);')
156+
thread = run_query_thread(
157+
pg_instance,
158+
'CREATE INDEX CONCURRENTLY test_progress_index ON test_part1 (id);',
159+
pg_instance['application_name'],
160+
)
148161

149162
# Wait for blocked created index to appear
150163
_wait_for_value(
@@ -156,7 +169,7 @@ def test_index_progress(aggregator, integration_check, pg_instance):
156169
check.check(pg_instance)
157170

158171
# Kill the create index
159-
kill_session(pg_instance, 'CREATE INDEX')
172+
kill_session(pg_instance, 'CREATE INDEX', pg_instance['application_name'])
160173

161174
# Cleanup connection and thread
162175
conn.close()
@@ -177,15 +190,21 @@ def test_index_progress(aggregator, integration_check, pg_instance):
177190

178191
@requires_over_12
179192
def test_cluster_vacuum_progress(aggregator, integration_check, pg_instance):
193+
pg_instance['application_name'] = 'test_cluster_vacuum_progress'
180194
check = integration_check(pg_instance)
181195

182196
# Keep pg_class lock to block vacuum full during initilizing phase
183-
conn = lock_table(pg_instance, 'pg_catalog.pg_class', 'EXCLUSIVE')
197+
conn = lock_table(pg_instance, 'pg_catalog.pg_class', 'EXCLUSIVE', pg_instance['application_name'])
184198

185199
# Start vacuum in a thread
186-
thread = run_vacuum_thread(pg_instance, 'VACUUM FULL personsdup1')
200+
thread = run_vacuum_thread(pg_instance, 'VACUUM FULL personsdup1', pg_instance['application_name'])
187201

188-
_wait_for_value(pg_instance, lower_threshold=0, query="select count(*) FROM pg_stat_progress_cluster;")
202+
_wait_for_value(
203+
pg_instance,
204+
lower_threshold=0,
205+
query="select count(*) FROM pg_stat_progress_cluster;",
206+
application_name=pg_instance['application_name'],
207+
)
189208
check.check(pg_instance)
190209

191210
# Cleanup connection and thread

postgres/tests/utils.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,16 @@ def _get_superconn(db_instance, application_name='test', autocommit=True):
6767
)
6868

6969

70-
def lock_table(pg_instance, table, lock_mode):
71-
lock_conn = _get_superconn(pg_instance)
70+
def lock_table(pg_instance, table, lock_mode, application_name='test'):
71+
lock_conn = _get_superconn(pg_instance, application_name)
7272
cur = lock_conn.cursor()
7373
cur.execute('BEGIN')
7474
cur.execute(f'lock {table} IN {lock_mode} MODE')
7575
return lock_conn
7676

7777

78-
def kill_session(pg_instance, query_pattern):
79-
with _get_superconn(pg_instance) as conn:
78+
def kill_session(pg_instance, query_pattern, application_name='test'):
79+
with _get_superconn(pg_instance, application_name) as conn:
8080
with conn.cursor() as cur:
8181
cur.execute(
8282
f"""SELECT pg_cancel_backend(pid)
@@ -85,19 +85,19 @@ def kill_session(pg_instance, query_pattern):
8585
)
8686

8787

88-
def kill_vacuum(pg_instance):
89-
kill_session(pg_instance, '^vacuum')
88+
def kill_vacuum(pg_instance, application_name='test'):
89+
kill_session(pg_instance, '^vacuum', application_name)
9090

9191

9292
# Wait until the query yielding a single value cross the provided threshold
93-
def _wait_for_value(db_instance, lower_threshold, query, attempts=10):
93+
def _wait_for_value(db_instance, lower_threshold, query, attempts=10, application_name='test'):
9494
value = 0
9595
current_attempt = 0
9696
# Stats table behave slightly differently than normal tables
9797
# Repeating the same query within a transaction will yield the
9898
# same value, despite the fact that the transaction is in READ COMMITED
9999
# To avoid this, we avoid transaction block created by the with statement
100-
conn = _get_superconn(db_instance)
100+
conn = _get_superconn(db_instance, application_name)
101101
while value <= lower_threshold and current_attempt < attempts:
102102
with conn.cursor() as cur:
103103
cur.execute(query)

0 commit comments

Comments
 (0)