Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/reconnection/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def create_connection() -> Connection:
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)

connection = environment.connection(
url="amqp://guest:guest@localhost:5672/",
uri="amqp://guest:guest@localhost:5672/",
on_disconnection_handler=on_disconnection,
)
connection.dial()
Expand Down
6 changes: 5 additions & 1 deletion rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
StreamOptions,
)
from .environment import Environment
from .exceptions import ArgumentOutOfRangeException
from .exceptions import (
ArgumentOutOfRangeException,
ValidationCodeException,
)
from .management import Management
from .publisher import Publisher
from .qpid.proton._data import symbol # noqa: E402
Expand Down Expand Up @@ -62,6 +65,7 @@
"AddressHelper",
"AMQPMessagingHandler",
"ArgumentOutOfRangeException",
"ValidationCodeException",
"SslConfigurationContext",
"ClientCert",
"ConnectionClosed",
Expand Down
6 changes: 6 additions & 0 deletions rabbitmq_amqp_python_client/address_helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .entities import BindingSpecification
from .qpid.proton._message import Message


def _is_unreserved(char: str) -> bool:
Expand Down Expand Up @@ -73,6 +74,11 @@ def binding_path_with_exchange_queue(
)
return binding_path_wth_exchange_queue_key

@staticmethod
def message_to_address_helper(message: Message, address: str) -> Message:
message.address = address
return message


def validate_address(address: str) -> bool:
if address.startswith("/queues") or address.startswith("/exchanges"):
Expand Down
11 changes: 6 additions & 5 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ def close(self) -> None:
self._conn.close()
self._connections.remove(self)

def publisher(self, destination: str) -> Publisher:
if validate_address(destination) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
def publisher(self, destination: str = "") -> Publisher:
if destination != "":
if validate_address(destination) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
publisher = Publisher(self._conn, destination)
return publisher

Expand Down
5 changes: 3 additions & 2 deletions rabbitmq_amqp_python_client/consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Literal, Optional, Union
from typing import Literal, Optional, Union, cast

from .entities import StreamOptions
from .options import (
Expand Down Expand Up @@ -40,7 +40,8 @@ def _open(self) -> None:

def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message:
if self._receiver is not None:
return self._receiver.receive(timeout=timeout)
message = self._receiver.receive(timeout=timeout)
return cast(Message, message)

def close(self) -> None:
logger.debug("Closing the receiver")
Expand Down
26 changes: 23 additions & 3 deletions rabbitmq_amqp_python_client/publisher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import logging
from typing import Optional

from .address_helper import validate_address
from .exceptions import (
ArgumentOutOfRangeException,
ValidationCodeException,
)
from .options import SenderOptionUnseattle
from .qpid.proton._delivery import Delivery
from .qpid.proton._message import Message
Expand All @@ -13,7 +18,7 @@


class Publisher:
def __init__(self, conn: BlockingConnection, addr: str):
def __init__(self, conn: BlockingConnection, addr: str = ""):
self._sender: Optional[BlockingSender] = None
self._conn = conn
self._addr = addr
Expand All @@ -25,8 +30,23 @@ def _open(self) -> None:
self._sender = self._create_sender(self._addr)

def publish(self, message: Message) -> Delivery:
if self._sender is not None:
return self._sender.send(message)
if (self._addr != "") and (message.address is not None):
raise ValidationCodeException(
"address specified in both message and publisher"
)

if self._addr != "":
if self._sender is not None:
return self._sender.send(message)
else:
if message.address != "":
if validate_address(message.address) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
if self._sender is not None:
delivery = self._sender.send(message)
return delivery

def close(self) -> None:
logger.debug("Closing Sender")
Expand Down
1 change: 0 additions & 1 deletion rabbitmq_amqp_python_client/qpid/proton/_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def _check(self, err: int) -> int:
def _check_property_keys(self) -> None:
"""
AMQP allows only string keys for properties. This function checks that this requirement is met
and raises a MessageException if not. However, in certain cases, conversions to string are
automatically performed:

1. When a key is a user-defined (non-AMQP) subclass of str.
Expand Down
Loading