Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt

## Getting Started

An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with:
An example is provided [`here`](./examples/getting_started/getting_started.py) you can run it after starting a RabbitMQ 4.0 broker with:

poetry run python ./examples/getting_started/main.py
poetry run python ./examples/getting_started/getting_started.py

### Creating a connection

A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object.
A connection to the RabbitMQ AMQP 1.0 server can be established using the Environment object.

For example:

```python
connection = Connection("amqp://guest:guest@localhost:5672/")
environment = Environment()
connection = environment.connection("amqp://guest:guest@localhost:5672/")
connection.dial()
```

Expand Down Expand Up @@ -131,21 +132,21 @@ You can consume from a given offset or specify a default starting point (FIRST,

Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering

You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams.
You can check the [`stream example`](./examples/streams/example_with_streams.py) to see how to work with RabbitMQ streams.

### SSL connections

The client supports TLS/SSL connections.

You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection
You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to establish a secured connection


### Managing disconnections

At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
You can use this callback to implement your own logic and eventually attempt a reconnection.

You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and
You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections and
eventually attempt a reconnection


Expand Down
6 changes: 6 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Client examples
===
- [Getting started](./getting_started/getting_started.py) - Producer and Consumer example without reconnection
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
AMQPMessagingHandler,
BindingSpecification,
Connection,
Disposition,
Environment,
Event,
ExchangeSpecification,
Message,
OutcomeState,
QuorumQueueSpecification,
)

Expand Down Expand Up @@ -61,8 +62,19 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
def create_connection(environment: Environment) -> Connection:
connection = environment.connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection
Expand All @@ -75,12 +87,13 @@ def main() -> None:
routing_key = "routing-key"

print("connection to amqp server")
connection = create_connection()
environment = Environment()
connection = create_connection(environment)

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
management.declare_exchange(ExchangeSpecification(name=exchange_name))

management.declare_queue(
QuorumQueueSpecification(name=queue_name)
Expand Down Expand Up @@ -113,11 +126,11 @@ def main() -> None:
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
if status.remote_state == Disposition.ACCEPTED:
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")
elif status.remote_state == Disposition.RELEASED:
elif status.remote_state == OutcomeState.RELEASED:
print("message not routed")
elif status.remote_state == Disposition.REJECTED:
elif status.remote_state == OutcomeState.REJECTED:
print("message not rejected")

publisher.close()
Expand Down Expand Up @@ -150,7 +163,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Connection,
ConnectionClosed,
Consumer,
Environment,
Event,
ExchangeSpecification,
Management,
Expand All @@ -20,6 +21,8 @@
QuorumQueueSpecification,
)

environment = Environment()


# here we keep track of the objects we need to reconnect
@dataclass
Expand Down Expand Up @@ -118,8 +121,9 @@ def create_connection() -> Connection:
# "amqp://ha_tls-rabbit_node2-1:5602/",
# ]
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
connection = Connection(
uri="amqp://guest:guest@localhost:5672/",

connection = environment.connection(
url="amqp://guest:guest@localhost:5672/",
on_disconnection_handler=on_disconnection,
)
connection.dial()
Expand All @@ -146,7 +150,7 @@ def main() -> None:

print("declaring exchange and queue")
connection_configuration.management.declare_exchange(
ExchangeSpecification(name=exchange_name, arguments={})
ExchangeSpecification(name=exchange_name)
)

connection_configuration.management.declare_queue(
Expand Down Expand Up @@ -242,7 +246,7 @@ def main() -> None:
print("closing connections")
connection_configuration.management.close()
print("after management closing")
connection_configuration.connection.close()
environment.close()
print("after connection closing")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AddressHelper,
AMQPMessagingHandler,
Connection,
Environment,
Event,
Message,
OffsetSpecification,
Expand Down Expand Up @@ -65,8 +66,19 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
def create_connection(environment: Environment) -> Connection:
connection = environment.connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection
Expand All @@ -76,15 +88,16 @@ def main() -> None:
queue_name = "example-queue"

print("connection to amqp server")
connection = create_connection()
environment = Environment()
connection = create_connection(environment)

management = connection.management()

management.declare_queue(StreamSpecification(name=queue_name))

addr_queue = AddressHelper.queue_address(queue_name)

consumer_connection = create_connection()
consumer_connection = create_connection(environment)

stream_filter_options = StreamOptions()
# can be first, last, next or an offset long
Expand Down Expand Up @@ -135,7 +148,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
BindingSpecification,
ClientCert,
Connection,
Environment,
Event,
ExchangeSpecification,
Message,
Expand Down Expand Up @@ -62,12 +63,12 @@ def on_link_closed(self, event: Event) -> None:
print("link closed")


def create_connection() -> Connection:
def create_connection(environment: Environment) -> Connection:
# in case of SSL enablement
ca_cert_file = ".ci/certs/ca_certificate.pem"
client_cert = ".ci/certs/client_certificate.pem"
client_key = ".ci/certs/client_key.pem"
connection = Connection(
connection = environment.connection(
"amqps://guest:guest@localhost:5671/",
ssl_context=SslConfigurationContext(
ca_cert=ca_cert_file,
Expand All @@ -84,14 +85,15 @@ def main() -> None:
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
environment = Environment()

print("connection to amqp server")
connection = create_connection()
connection = create_connection(environment)

management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
management.declare_exchange(ExchangeSpecification(name=exchange_name))

management.declare_queue(
QuorumQueueSpecification(name=queue_name)
Expand Down Expand Up @@ -160,7 +162,7 @@ def main() -> None:
print("closing connections")
management.close()
print("after management closing")
connection.close()
environment.close()
print("after connection closing")


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "rabbitmq-amqp-python-client"
version = "0.1.0-alpha.2"
version = "0.1.0-alpha.3"
description = "Python RabbitMQ client for AMQP 1.0 protocol"
authors = ["RabbitMQ team"]
license = "Apache-2.0 license"
Expand Down
7 changes: 5 additions & 2 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
OffsetSpecification,
StreamOptions,
)
from .environment import Environment
from .exceptions import ArgumentOutOfRangeException
from .management import Management
from .publisher import Publisher
Expand Down Expand Up @@ -39,6 +40,8 @@

del metadata

OutcomeState = Disposition

__all__ = [
"Connection",
"Management",
Expand All @@ -61,9 +64,9 @@
"ArgumentOutOfRangeException",
"SslConfigurationContext",
"ClientCert",
"Delivery",
"ConnectionClosed",
"StreamOptions",
"OffsetSpecification",
"Disposition",
"OutcomeState",
"Environment",
]
6 changes: 6 additions & 0 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def __init__(
self._on_disconnection_handler = on_disconnection_handler
self._conf_ssl_context: Optional[SslConfigurationContext] = ssl_context
self._ssl_domain = None
self._connections = [] # type: ignore
self._index: int = -1

def _set_environment_connection_list(self, connections: []): # type: ignore
self._connections = connections

def dial(self) -> None:
logger.debug("Establishing a connection to the amqp server")
Expand Down Expand Up @@ -75,6 +80,7 @@ def management(self) -> Management:
def close(self) -> None:
logger.debug("Closing connection")
self._conn.close()
self._connections.remove(self)

def publisher(self, destination: str) -> Publisher:
if validate_address(destination) is False:
Expand Down
6 changes: 3 additions & 3 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional, Union

Expand All @@ -13,7 +13,7 @@
@dataclass
class ExchangeSpecification:
name: str
arguments: dict[str, str]
arguments: dict[str, str] = field(default_factory=dict)
exchange_type: ExchangeType = ExchangeType.direct
is_auto_delete: bool = False
is_internal: bool = False
Expand All @@ -24,7 +24,7 @@ class ExchangeSpecification:
class QueueInfo:
name: str
arguments: dict[str, Any]
queue_type: QueueType = QueueType.quorum
queue_type: QueueType = QueueType.classic
is_exclusive: Optional[bool] = None
is_auto_delete: bool = False
is_durable: bool = True
Expand Down
Loading