Skip to content

Commit 088eefd

Browse files
author
DanielePalaia
committed
adding validations for annotations
1 parent 0db39d3 commit 088eefd

File tree

9 files changed

+111
-28
lines changed

9 files changed

+111
-28
lines changed

examples/getting_started/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33

44
from rabbitmq_amqp_python_client import (
55
AddressHelper,
6+
AMQPMessagingHandler,
67
BindingSpecification,
78
Connection,
8-
AMQPMessagingHandler,
99
Event,
1010
ExchangeSpecification,
1111
Message,

rabbitmq_amqp_python_client/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
from importlib import metadata
22

33
from .address_helper import AddressHelper
4+
from .amqp_consumer_handler import AMQPMessagingHandler
45
from .common import ExchangeType, QueueType
56
from .connection import Connection
67
from .consumer import Consumer
7-
from .amqp_consumer_handler import (
8-
AMQPMessagingHandler,
9-
)
108
from .entities import (
119
BindingSpecification,
1210
ExchangeSpecification,
1311
)
12+
from .exceptions import ArgumentOutOfRangeException
1413
from .management import Management
1514
from .publisher import Publisher
1615
from .qpid.proton._data import symbol # noqa: E402
@@ -52,4 +51,5 @@
5251
"ExchangeType",
5352
"AddressHelper",
5453
"AMQPMessagingHandler",
54+
"ArgumentOutOfRangeException",
5555
]

rabbitmq_amqp_python_client/amqp_consumer_handler.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
from .delivery_context import DeliveryContext
22
from .qpid.proton.handlers import MessagingHandler
33

4-
5-
'''
6-
AMQPMessagingHandler extends the QPID MessagingHandler.
4+
"""
5+
AMQPMessagingHandler extends the QPID MessagingHandler.
76
It is an helper to set the default values needed for manually accepting and settling messages.
8-
9-
self.delivery_context is an instance of DeliveryContext, which is used to accept, reject,
7+
self.delivery_context is an instance of DeliveryContext, which is used to accept, reject,
108
requeue or requeue with annotations a message.
9+
It is not mandatory to use this class, but it is a good practice to use it.
10+
"""
11+
1112

12-
It is not mandatory to use this class, but it is a good practice to use it.
13-
'''
1413
class AMQPMessagingHandler(MessagingHandler): # type: ignore
1514

1615
def __init__(self, auto_accept: bool = False, auto_settle: bool = True):

rabbitmq_amqp_python_client/connection.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ def management(self) -> Management:
3232
# closes the connection to the AMQP 1.0 server.
3333
def close(self) -> None:
3434
logger.debug("Closing connection")
35-
print("closing connection")
3635
self._conn.close()
37-
print("after closing connection")
3836

3937
def publisher(self, destination: str) -> Publisher:
4038
publisher = Publisher(self._conn, destination)

rabbitmq_amqp_python_client/delivery_context.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
from typing import Dict
22

3+
from .exceptions import ArgumentOutOfRangeException
34
from .qpid.proton._data import PythonAMQPData
45
from .qpid.proton._delivery import Delivery
56
from .qpid.proton._events import Event
7+
from .utils import validate_annotations
68

7-
'''
9+
"""
810
DeliveryContext is a class that is used to accept, reject, requeue or requeue with annotations a message.
911
It is an helper to set the default values needed for manually accepting and settling messages.
10-
'''
12+
"""
1113

1214

1315
class DeliveryContext:
@@ -39,32 +41,37 @@ def discard(self, event: Event) -> None:
3941
or dead-letter it if it is configured.
4042
Application-specific annotation keys must start with the <code>x-opt-</code> prefix.
4143
Annotation keys the broker understands starts with <code>x-</code>, but not with <code>x-opt-
42-
43-
This maps to the AMQP 1.0
44+
This maps to the AMQP 1.0
4445
modified{delivery-failed = true, undeliverable-here = true}</code> outcome.
4546
<param name="annotations"> annotations message annotations to combine with existing ones </param>
4647
<a
4748
href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified">AMQP
48-
1.0 <code>modified</code> outcome</a>
49-
49+
1.0 <code>modified</code> outcome</a>
5050
The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
5151
"""
5252

5353
def discard_with_annotations(
54-
self, event: Event, annotations: Dict[str, "PythonAMQPData"]
54+
self, event: Event, annotations: Dict[str, "PythonAMQPData"]
5555
) -> None:
5656
dlv = event.delivery
5757
dlv.local.annotations = annotations
5858
dlv.local.failed = True
5959
dlv.local.undeliverable = True
6060

61+
validated = validate_annotations(annotations.keys())
62+
63+
if validated is False:
64+
raise ArgumentOutOfRangeException(
65+
"Message annotation key must start with 'x-'"
66+
)
67+
6168
dlv.update(Delivery.MODIFIED)
6269
dlv.settle()
6370

6471
"""
65-
Requeue the message (AMQP 1.0 <code>released</code> outcome).
72+
Requeue the message (AMQP 1.0 <code>released</code> outcome).
6673
This means the message has not been processed and the broker can requeue it and deliver it
67-
to the same or a different consumer.
74+
to the same or a different consumer.
6875
"""
6976

7077
def requeue(self, event: Event) -> None:
@@ -74,31 +81,34 @@ def requeue(self, event: Event) -> None:
7481

7582
"""
7683
Requeue the message with annotations to combine with the existing message annotations.
77-
7884
This means the message has not been processed and the broker can requeue it and deliver it
7985
to the same or a different consumer.
8086
Application-specific annotation keys must start with the <code>x-opt-</code> prefix.
8187
Annotation keys the broker understands starts with <code>x-</code>, but not with <code>x-opt-
8288
</code>.
83-
8489
This maps to the AMQP 1.0 <code>
8590
modified{delivery-failed = false, undeliverable-here = false}</code> outcome.
86-
8791
<param name="annotations"> annotations message annotations to combine with existing ones </param>
8892
<a
8993
href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified">AMQP
9094
1.0 <code>modified</code> outcome</a>
91-
9295
The annotations can be used only with Quorum queues, see https://www.rabbitmq.com/docs/amqp#modified-outcome
9396
"""
9497

9598
def requeue_with_annotations(
96-
self, event: Event, annotations: Dict[str, "PythonAMQPData"]
99+
self, event: Event, annotations: Dict[str, "PythonAMQPData"]
97100
) -> None:
98101
dlv = event.delivery
99102
dlv.local.annotations = annotations
100103
dlv.local.failed = False
101104
dlv.local.undeliverable = False
102105

106+
validated = validate_annotations(annotations.keys())
107+
108+
if validated is False:
109+
raise ArgumentOutOfRangeException(
110+
"Message annotation key must start with 'x-'"
111+
)
112+
103113
dlv.update(Delivery.MODIFIED)
104114
dlv.settle()

rabbitmq_amqp_python_client/exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,12 @@ def __init__(self, msg: str):
55

66
def __str__(self) -> str:
77
return repr(self.msg)
8+
9+
10+
class ArgumentOutOfRangeException(BaseException):
11+
# Constructor or Initializer
12+
def __init__(self, msg: str):
13+
self.msg = msg
14+
15+
def __str__(self) -> str:
16+
return repr(self.msg)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
def validate_annotations(annotations: []) -> bool: # type: ignore
2+
validated = True
3+
for annotation in annotations:
4+
if len(annotation) > 0 and annotation[:2] == "x-":
5+
pass
6+
else:
7+
validated = False
8+
return validated
9+
return validated

tests/conftest.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
from rabbitmq_amqp_python_client import (
44
AddressHelper,
5-
Connection,
65
AMQPMessagingHandler,
6+
Connection,
77
Event,
88
symbol,
99
)
@@ -144,3 +144,19 @@ def on_message(self, event: Event):
144144
if self._received == 1000:
145145
event.connection.close()
146146
raise ConsumerTestException("consumed")
147+
148+
149+
class MyMessageHandlerRequeueWithInvalidAnnotations(AMQPMessagingHandler):
150+
151+
def __init__(self):
152+
super().__init__()
153+
self._received = 0
154+
155+
def on_message(self, event: Event):
156+
annotations = {}
157+
annotations[symbol("invalid")] = "x-test1"
158+
self.delivery_context.requeue_with_annotations(event, annotations)
159+
self._received = self._received + 1
160+
if self._received == 1000:
161+
event.connection.close()
162+
raise ConsumerTestException("consumed")

tests/test_consumer.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from rabbitmq_amqp_python_client import (
22
AddressHelper,
3+
ArgumentOutOfRangeException,
34
Connection,
45
QuorumQueueSpecification,
56
)
@@ -12,6 +13,7 @@
1213
MyMessageHandlerNoack,
1314
MyMessageHandlerRequeue,
1415
MyMessageHandlerRequeueWithAnnotations,
16+
MyMessageHandlerRequeueWithInvalidAnnotations,
1517
)
1618
from .utils import (
1719
cleanup_dead_lettering,
@@ -320,3 +322,43 @@ def test_consumer_async_queue_with_requeue_with_annotations(
320322
assert "x-opt-string" in message.annotations
321323

322324
assert message_count > 0
325+
326+
327+
def test_consumer_async_queue_with_requeue_with_invalid_annotations(
328+
connection: Connection,
329+
) -> None:
330+
messages_to_send = 1000
331+
test_failure = True
332+
333+
queue_name = "test-queue-async-requeue"
334+
335+
management = connection.management()
336+
337+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
338+
339+
addr_queue = AddressHelper.queue_address(queue_name)
340+
341+
publish_messages(connection, messages_to_send, queue_name)
342+
343+
# we closed the connection so we need to open a new one
344+
connection_consumer = create_connection()
345+
346+
try:
347+
consumer = connection_consumer.consumer(
348+
addr_queue, handler=MyMessageHandlerRequeueWithInvalidAnnotations()
349+
)
350+
351+
consumer.run()
352+
# ack to terminate the consumer
353+
except ConsumerTestException:
354+
pass
355+
356+
except ArgumentOutOfRangeException:
357+
test_failure = False
358+
359+
consumer.close()
360+
361+
management.delete_queue(queue_name)
362+
management.close()
363+
364+
assert test_failure is False

0 commit comments

Comments
 (0)