Skip to content

Commit 22410c2

Browse files
author
DanielePalaia
committed
improving AddressHelper utility functions
1 parent 71c532d commit 22410c2

File tree

11 files changed

+106
-109
lines changed

11 files changed

+106
-109
lines changed

examples/getting_started/main.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# type: ignore
2+
3+
24
from rabbitmq_amqp_python_client import (
5+
AddressHelper,
36
BindingSpecification,
47
Connection,
58
Event,
@@ -8,15 +11,13 @@
811
MessageAck,
912
MessagingHandler,
1013
QuorumQueueSpecification,
11-
exchange_address,
12-
queue_address,
1314
)
1415

1516

1617
class MyMessageHandler(MessagingHandler):
1718

1819
def __init__(self):
19-
super().__init__(auto_accept=False, auto_settle=False)
20+
super().__init__(auto_accept=False)
2021
self._count = 0
2122

2223
def on_message(self, event: Event):
@@ -45,13 +46,16 @@ def on_message(self, event: Event):
4546

4647
if self._count == 100:
4748
print("closing receiver")
48-
event.receiver.close()
49-
event.connection.close()
49+
# if you want you can add cleanup operations here
50+
# event.receiver.close()
51+
# event.connection.close()
5052

5153
def on_connection_closed(self, event: Event):
54+
# if you want you can add cleanup operations here
5255
print("connection closed")
5356

5457
def on_link_closed(self, event: Event) -> None:
58+
# if you want you can add cleanup operations here
5559
print("link closed")
5660

5761

@@ -91,9 +95,9 @@ def main() -> None:
9195
)
9296
)
9397

94-
addr = exchange_address(exchange_name, routing_key)
98+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
9599

96-
addr_queue = queue_address(queue_name)
100+
addr_queue = AddressHelper.queue_address(queue_name)
97101

98102
print("create a publisher and publish a test message")
99103
publisher = connection.publisher(addr)
@@ -102,7 +106,7 @@ def main() -> None:
102106
messages_purged = management.purge_queue(queue_name)
103107

104108
print("messages purged: " + str(messages_purged))
105-
management.close()
109+
# management.close()
106110

107111
# publish 10 messages
108112
for i in range(messages_to_publish):
@@ -121,22 +125,22 @@ def main() -> None:
121125
pass
122126

123127
print("cleanup")
124-
# once we finish consuming we close the connection so we need to create a new one
125-
connection = create_connection()
128+
consumer.close()
129+
# once we finish consuming if we close the connection we need to create a new one
130+
# connection = create_connection()
131+
# management = connection.management()
126132

127-
management = connection.management()
128133
print("unbind")
129134
management.unbind(bind_name)
130135

131136
print("delete queue")
132-
# management.delete_queue(queue_name)
137+
management.delete_queue(queue_name)
133138

134139
print("delete exchange")
135140
management.delete_exchange(exchange_name)
136141

137142
print("closing connections")
138143
management.close()
139-
# consumer.close()
140144
print("after management closing")
141145
connection.close()
142146
print("after connection closing")

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from importlib import metadata
22

3-
from .address_helper import exchange_address, queue_address
3+
from .address_helper import AddressHelper
44
from .common import ExchangeType, QueueType
55
from .connection import Connection
66
from .consumer import Consumer
@@ -41,8 +41,6 @@
4141
"BindingSpecification",
4242
"QueueType",
4343
"Publisher",
44-
"exchange_address",
45-
"queue_address",
4644
"Message",
4745
"Consumer",
4846
"MessagingHandler",
@@ -51,4 +49,5 @@
5149
"MessageAck",
5250
"symbol",
5351
"ExchangeType",
52+
"AddressHelper",
5453
]
Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .entities import BindingSpecification
22

33

4-
def is_unreserved(char: str) -> bool:
4+
def _is_unreserved(char: str) -> bool:
55
# According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~'
66
return char.isalnum() or char in "-._~"
77

@@ -12,7 +12,7 @@ def encode_path_segment(input_string: str) -> str:
1212
# Iterate over each character in the input string
1313
for char in input_string:
1414
# Check if the character is an unreserved character
15-
if is_unreserved(char):
15+
if _is_unreserved(char):
1616
encoded.append(char) # Append as is
1717
else:
1818
# Encode character to %HH format
@@ -21,49 +21,54 @@ def encode_path_segment(input_string: str) -> str:
2121
return "".join(encoded)
2222

2323

24-
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
25-
if routing_key == "":
26-
path = "/exchanges/" + encode_path_segment(exchange_name)
27-
else:
28-
path = (
29-
"/exchanges/"
30-
+ encode_path_segment(exchange_name)
31-
+ "/"
32-
+ encode_path_segment(routing_key)
33-
)
34-
35-
return path
36-
24+
class AddressHelper:
3725

38-
def queue_address(queue_name: str) -> str:
39-
path = "/queues/" + encode_path_segment(queue_name)
40-
41-
return path
26+
@staticmethod
27+
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
28+
if routing_key == "":
29+
path = "/exchanges/" + encode_path_segment(exchange_name)
30+
else:
31+
path = (
32+
"/exchanges/"
33+
+ encode_path_segment(exchange_name)
34+
+ "/"
35+
+ encode_path_segment(routing_key)
36+
)
4237

38+
return path
4339

44-
def purge_queue_address(queue_name: str) -> str:
45-
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
40+
@staticmethod
41+
def queue_address(queue_name: str) -> str:
42+
path = "/queues/" + encode_path_segment(queue_name)
4643

47-
return path
44+
return path
4845

46+
@staticmethod
47+
def purge_queue_address(queue_name: str) -> str:
48+
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
4949

50-
def path_address() -> str:
51-
path = "/bindings"
50+
return path
5251

53-
return path
52+
@staticmethod
53+
def path_address() -> str:
54+
path = "/bindings"
5455

56+
return path
5557

56-
def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
57-
binding_path_wth_exchange_queue_key = (
58-
"/bindings"
59-
+ "/"
60-
+ "src="
61-
+ encode_path_segment(bind_specification.source_exchange)
62-
+ ";"
63-
+ "dstq="
64-
+ encode_path_segment(bind_specification.destination_queue)
65-
+ ";key="
66-
+ encode_path_segment(bind_specification.binding_key)
67-
+ ";args="
68-
)
69-
return binding_path_wth_exchange_queue_key
58+
@staticmethod
59+
def binding_path_with_exchange_queue(
60+
bind_specification: BindingSpecification,
61+
) -> str:
62+
binding_path_wth_exchange_queue_key = (
63+
"/bindings"
64+
+ "/"
65+
+ "src="
66+
+ encode_path_segment(bind_specification.source_exchange)
67+
+ ";"
68+
+ "dstq="
69+
+ encode_path_segment(bind_specification.destination_queue)
70+
+ ";key="
71+
+ encode_path_segment(bind_specification.binding_key)
72+
+ ";args="
73+
)
74+
return binding_path_wth_exchange_queue_key

rabbitmq_amqp_python_client/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ def management(self) -> Management:
3232
# closes the connection to the AMQP 1.0 server.
3333
def close(self) -> None:
3434
logger.debug("Closing connection")
35+
print("closing connection")
3536
self._conn.close()
37+
print("after closing connection")
3638

3739
def publisher(self, destination: str) -> Publisher:
3840
publisher = Publisher(self._conn, destination)

rabbitmq_amqp_python_client/management.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,7 @@
22
import uuid
33
from typing import Any, Optional, Union
44

5-
from .address_helper import (
6-
binding_path_with_exchange_queue,
7-
exchange_address,
8-
path_address,
9-
purge_queue_address,
10-
queue_address,
11-
)
5+
from .address_helper import AddressHelper
126
from .common import CommonValues, QueueType
137
from .entities import (
148
BindingSpecification,
@@ -113,7 +107,7 @@ def declare_exchange(
113107
body["internal"] = exchange_specification.is_internal
114108
body["arguments"] = exchange_specification.arguments # type: ignore
115109

116-
path = exchange_address(exchange_specification.name)
110+
path = AddressHelper.exchange_address(exchange_specification.name)
117111

118112
self.request(
119113
body,
@@ -146,7 +140,7 @@ def declare_queue(
146140
elif isinstance(queue_specification, StreamSpecification):
147141
body = self._declare_stream(queue_specification)
148142

149-
path = queue_address(queue_specification.name)
143+
path = AddressHelper.queue_address(queue_specification.name)
150144

151145
self.request(
152146
body,
@@ -255,7 +249,7 @@ def _declare_stream(
255249

256250
def delete_exchange(self, exchange_name: str) -> None:
257251
logger.debug("delete_exchange operation called")
258-
path = exchange_address(exchange_name)
252+
path = AddressHelper.exchange_address(exchange_name)
259253

260254
self.request(
261255
None,
@@ -268,7 +262,7 @@ def delete_exchange(self, exchange_name: str) -> None:
268262

269263
def delete_queue(self, queue_name: str) -> None:
270264
logger.debug("delete_queue operation called")
271-
path = queue_address(queue_name)
265+
path = AddressHelper.queue_address(queue_name)
272266

273267
self.request(
274268
None,
@@ -302,7 +296,7 @@ def bind(self, bind_specification: BindingSpecification) -> str:
302296
body["destination_queue"] = bind_specification.destination_queue
303297
body["arguments"] = {} # type: ignore
304298

305-
path = path_address()
299+
path = AddressHelper.path_address()
306300

307301
self.request(
308302
body,
@@ -313,7 +307,9 @@ def bind(self, bind_specification: BindingSpecification) -> str:
313307
],
314308
)
315309

316-
binding_path_with_queue = binding_path_with_exchange_queue(bind_specification)
310+
binding_path_with_queue = AddressHelper.binding_path_with_exchange_queue(
311+
bind_specification
312+
)
317313
return binding_path_with_queue
318314

319315
def unbind(self, binding_exchange_queue_path: str) -> None:
@@ -329,7 +325,7 @@ def unbind(self, binding_exchange_queue_path: str) -> None:
329325

330326
def purge_queue(self, queue_name: str) -> int:
331327
logger.debug("purge_queue operation called")
332-
path = purge_queue_address(queue_name)
328+
path = AddressHelper.purge_queue_address(queue_name)
333329

334330
response = self.request(
335331
None,
@@ -344,7 +340,7 @@ def purge_queue(self, queue_name: str) -> int:
344340

345341
def queue_info(self, queue_name: str) -> QueueInfo:
346342
logger.debug("queue_info operation called")
347-
path = queue_address(queue_name)
343+
path = AddressHelper.queue_address(queue_name)
348344

349345
message = self.request(
350346
None,

rabbitmq_amqp_python_client/qpid/proton/_handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ def __init__(
712712
self,
713713
prefetch: int = 10,
714714
auto_accept: bool = False,
715-
auto_settle: bool = False,
715+
auto_settle: bool = True,
716716
peer_close_is_error: bool = False,
717717
) -> None:
718718
self.handlers = []

rabbitmq_amqp_python_client/qpid/proton/_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def close(self) -> None:
553553
finally:
554554
self.conn.free()
555555
# Nothing left to block on. Allow reactor to clean up.
556-
self.run()
556+
# self.run() # why is this necessary here?
557557
if self.conn:
558558
self.conn.handler = None # break cyclical reference
559559
self.conn = None

0 commit comments

Comments
 (0)