Skip to content

Commit 42e7dbd

Browse files
author
DanielePalaia
committed
adding closing connections logic
1 parent f2807b3 commit 42e7dbd

File tree

3 files changed

+46
-2
lines changed

3 files changed

+46
-2
lines changed

rabbitmq_amqp_python_client/connection.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ def __init__(
3737
self._on_disconnection_handler = on_disconnection_handler
3838
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
3939
self._ssl_domain = None
40+
self._connections = [] # type: ignore
41+
self._index: int = -1
42+
43+
def _set_environment_connection_list(self, connections: []): # type: ignore
44+
self._connections = connections
4045

4146
def dial(self) -> None:
4247
logger.debug("Establishing a connection to the amqp server")
@@ -72,9 +77,11 @@ def management(self) -> Management:
7277
return self._management
7378

7479
# closes the connection to the AMQP 1.0 server.
80+
# This method should be called just from Environment and not from the user
7581
def _close(self) -> None:
7682
logger.debug("Closing connection")
7783
self._conn.close()
84+
self._connections.remove(self)
7885

7986
def publisher(self, destination: str) -> Publisher:
8087
if validate_address(destination) is False:

rabbitmq_amqp_python_client/environment.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
# For the moment this is just a Connection pooler to keep compatibility with other clients
2+
import logging
13
from typing import Annotated, Callable, Optional, TypeVar
24

35
from .connection import Connection
46
from .ssl_configuration import SslConfigurationContext
57

8+
logger = logging.getLogger(__name__)
9+
610
MT = TypeVar("MT")
711
CB = Annotated[Callable[[MT], None], "Message callback type"]
812

@@ -11,7 +15,7 @@ class Environment:
1115

1216
def __init__(self): # type: ignore
1317

14-
self._connections = []
18+
self._connections: list[Connection] = []
1519

1620
def connection(
1721
self,
@@ -28,10 +32,15 @@ def connection(
2832
ssl_context=ssl_context,
2933
on_disconnection_handler=on_disconnection_handler,
3034
)
31-
35+
logger.debug("Environment: Creating and returning a new connection")
3236
self._connections.append(connection)
37+
connection._set_environment_connection_list(self._connections)
3338
return connection
3439

3540
def close(self) -> None:
41+
logger.debug("Environment: Closing all pending connections")
3642
for connection in self._connections:
3743
connection._close()
44+
45+
def connections(self) -> list[Connection]:
46+
return self._connections

tests/test_connection.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,34 @@ def test_connection_ssl() -> None:
4343
environment.close()
4444

4545

46+
def test_environment_connections_management() -> None:
47+
48+
environment = Environment()
49+
connection = environment.connection("amqp://guest:guest@localhost:5672/")
50+
connection.dial()
51+
connection2 = environment.connection("amqp://guest:guest@localhost:5672/")
52+
connection2.dial()
53+
connection3 = environment.connection("amqp://guest:guest@localhost:5672/")
54+
connection3.dial()
55+
56+
assert len(environment.connections()) == 3
57+
58+
# this shouldn't happen but we test it anyway
59+
connection._close()
60+
61+
assert len(environment.connections()) == 2
62+
63+
connection2._close()
64+
65+
assert len(environment.connections()) == 1
66+
67+
connection3._close()
68+
69+
assert len(environment.connections()) == 0
70+
71+
environment.close()
72+
73+
4674
def test_connection_reconnection() -> None:
4775

4876
reconnected = False

0 commit comments

Comments
 (0)