diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 0bb4398..46603bb 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -70,7 +70,7 @@ def main() -> None: exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" - messages_to_publish = 100 + messages_to_publish = 100000 print("connection to amqp server") connection = create_connection() @@ -109,7 +109,13 @@ def main() -> None: # publish 10 messages for i in range(messages_to_publish): - publisher.publish(Message(body="test")) + status = publisher.publish(Message(body="test")) + if status.ACCEPTED: + print("message accepted") + elif status.RELEASED: + print("message not routed") + elif status.REJECTED: + print("message not rejected") publisher.close() diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 4d4c179..1f28688 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -52,4 +52,5 @@ "AddressHelper", "AMQPMessagingHandler", "ArgumentOutOfRangeException", + "Delivery", ] diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 4a2a7df..a2cd9a2 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -21,6 +21,21 @@ def test(self, link: Link) -> bool: return bool(link.is_sender) +class SenderOptionUnseattle(LinkOption): # type: ignore + def __init__(self, addr: str): + self._addr = addr + + def apply(self, link: Link) -> None: + link.source.address = self._addr + link.snd_settle_mode = Link.SND_UNSETTLED + link.rcv_settle_mode = Link.RCV_FIRST + link.properties = PropertyDict({symbol("paired"): True}) + link.source.dynamic = False + + def test(self, link: Link) -> bool: + return bool(link.is_sender) + + class ReceiverOption(LinkOption): # type: ignore def __init__(self, addr: str): self._addr = addr diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index 25b6a69..81e3cad 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -1,7 +1,8 @@ import logging from typing import Optional -from .options import SenderOption +from .options import SenderOptionUnseattle +from .qpid.proton._delivery import Delivery from .qpid.proton._message import Message from .qpid.proton.utils import ( BlockingConnection, @@ -23,9 +24,9 @@ def _open(self) -> None: logger.debug("Creating Sender") self._sender = self._create_sender(self._addr) - def publish(self, message: Message) -> None: + def publish(self, message: Message) -> Delivery: if self._sender is not None: - self._sender.send(message) + return self._sender.send(message) def close(self) -> None: logger.debug("Closing Sender") @@ -33,4 +34,4 @@ def close(self) -> None: self._sender.close() def _create_sender(self, addr: str) -> BlockingSender: - return self._conn.create_sender(addr, options=SenderOption(addr)) + return self._conn.create_sender(addr, options=SenderOptionUnseattle(addr)) diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 57d7196..01bb0e5 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -18,17 +18,24 @@ def test_publish_queue(connection: Connection) -> None: raised = False + publisher = None + accepted = False + try: publisher = connection.publisher("/queues/" + queue_name) - publisher.publish(Message(body="test")) + status = publisher.publish(Message(body="test")) + if status.ACCEPTED: + accepted = True except Exception: raised = True - publisher.close() + if publisher is not None: + publisher.close() management.delete_queue(queue_name) management.close() + assert accepted is True assert raised is False @@ -75,10 +82,13 @@ def test_publish_exchange(connection: Connection) -> None: addr = AddressHelper.exchange_address(exchange_name, routing_key) raised = False + accepted = False try: publisher = connection.publisher(addr) - publisher.publish(Message(body="test")) + status = publisher.publish(Message(body="test")) + if status.ACCEPTED: + accepted = True except Exception: raised = True @@ -88,6 +98,7 @@ def test_publish_exchange(connection: Connection) -> None: management.delete_queue(queue_name) management.close() + assert accepted is True assert raised is False