Skip to content

Commit d3d93ec

Browse files
DanielePalaiaDanielePalaia
andauthored
adding delivery status on publishing (#23)
* adding delivery status on publishing * when publishing messages we need to setup unseattle option --------- Co-authored-by: DanielePalaia <daniele985@@gmail.com>
1 parent 7d2910e commit d3d93ec

File tree

5 files changed

+43
-9
lines changed

5 files changed

+43
-9
lines changed

examples/getting_started/main.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def main() -> None:
7070
exchange_name = "test-exchange"
7171
queue_name = "example-queue"
7272
routing_key = "routing-key"
73-
messages_to_publish = 100
73+
messages_to_publish = 100000
7474

7575
print("connection to amqp server")
7676
connection = create_connection()
@@ -109,7 +109,13 @@ def main() -> None:
109109

110110
# publish 10 messages
111111
for i in range(messages_to_publish):
112-
publisher.publish(Message(body="test"))
112+
status = publisher.publish(Message(body="test"))
113+
if status.ACCEPTED:
114+
print("message accepted")
115+
elif status.RELEASED:
116+
print("message not routed")
117+
elif status.REJECTED:
118+
print("message not rejected")
113119

114120
publisher.close()
115121

rabbitmq_amqp_python_client/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,5 @@
5252
"AddressHelper",
5353
"AMQPMessagingHandler",
5454
"ArgumentOutOfRangeException",
55+
"Delivery",
5556
]

rabbitmq_amqp_python_client/options.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,21 @@ def test(self, link: Link) -> bool:
2121
return bool(link.is_sender)
2222

2323

24+
class SenderOptionUnseattle(LinkOption): # type: ignore
25+
def __init__(self, addr: str):
26+
self._addr = addr
27+
28+
def apply(self, link: Link) -> None:
29+
link.source.address = self._addr
30+
link.snd_settle_mode = Link.SND_UNSETTLED
31+
link.rcv_settle_mode = Link.RCV_FIRST
32+
link.properties = PropertyDict({symbol("paired"): True})
33+
link.source.dynamic = False
34+
35+
def test(self, link: Link) -> bool:
36+
return bool(link.is_sender)
37+
38+
2439
class ReceiverOption(LinkOption): # type: ignore
2540
def __init__(self, addr: str):
2641
self._addr = addr

rabbitmq_amqp_python_client/publisher.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
22
from typing import Optional
33

4-
from .options import SenderOption
4+
from .options import SenderOptionUnseattle
5+
from .qpid.proton._delivery import Delivery
56
from .qpid.proton._message import Message
67
from .qpid.proton.utils import (
78
BlockingConnection,
@@ -23,14 +24,14 @@ def _open(self) -> None:
2324
logger.debug("Creating Sender")
2425
self._sender = self._create_sender(self._addr)
2526

26-
def publish(self, message: Message) -> None:
27+
def publish(self, message: Message) -> Delivery:
2728
if self._sender is not None:
28-
self._sender.send(message)
29+
return self._sender.send(message)
2930

3031
def close(self) -> None:
3132
logger.debug("Closing Sender")
3233
if self._sender is not None:
3334
self._sender.close()
3435

3536
def _create_sender(self, addr: str) -> BlockingSender:
36-
return self._conn.create_sender(addr, options=SenderOption(addr))
37+
return self._conn.create_sender(addr, options=SenderOptionUnseattle(addr))

tests/test_publisher.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,24 @@ def test_publish_queue(connection: Connection) -> None:
1818

1919
raised = False
2020

21+
publisher = None
22+
accepted = False
23+
2124
try:
2225
publisher = connection.publisher("/queues/" + queue_name)
23-
publisher.publish(Message(body="test"))
26+
status = publisher.publish(Message(body="test"))
27+
if status.ACCEPTED:
28+
accepted = True
2429
except Exception:
2530
raised = True
2631

27-
publisher.close()
32+
if publisher is not None:
33+
publisher.close()
2834

2935
management.delete_queue(queue_name)
3036
management.close()
3137

38+
assert accepted is True
3239
assert raised is False
3340

3441

@@ -75,10 +82,13 @@ def test_publish_exchange(connection: Connection) -> None:
7582
addr = AddressHelper.exchange_address(exchange_name, routing_key)
7683

7784
raised = False
85+
accepted = False
7886

7987
try:
8088
publisher = connection.publisher(addr)
81-
publisher.publish(Message(body="test"))
89+
status = publisher.publish(Message(body="test"))
90+
if status.ACCEPTED:
91+
accepted = True
8292
except Exception:
8393
raised = True
8494

@@ -88,6 +98,7 @@ def test_publish_exchange(connection: Connection) -> None:
8898
management.delete_queue(queue_name)
8999
management.close()
90100

101+
assert accepted is True
91102
assert raised is False
92103

93104

0 commit comments

Comments
 (0)