Skip to content

Commit 4de3ece

Browse files
author
DanielePalaia
committed
consumer implementation
1 parent 84a224e commit 4de3ece

File tree

7 files changed

+199
-9
lines changed

7 files changed

+199
-9
lines changed

examples/getting_started/main.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,38 @@
11
from rabbitmq_amqp_python_client import (
22
BindingSpecification,
33
Connection,
4+
Event,
45
ExchangeSpecification,
56
Message,
7+
MessagingHandler,
68
QuorumQueueSpecification,
79
exchange_address,
10+
queue_address,
811
)
912

1013

14+
class MyMessageHandler(MessagingHandler):
15+
16+
def __init__(self):
17+
super().__init__()
18+
19+
def on_message(self, event: Event):
20+
print("received message: " + event.message.body)
21+
self.accept(event.delivery)
22+
23+
def on_connection_closed(self, event: Event):
24+
print("connection closed")
25+
26+
def on_connection_cloing(self, event: Event):
27+
print("connection closed")
28+
29+
def on_link_closed(self, event: Event) -> None:
30+
print("link closed")
31+
32+
def on_rejected(self, event: Event) -> None:
33+
print("rejected")
34+
35+
1136
def main() -> None:
1237
exchange_name = "test-exchange"
1338
queue_name = "example-queue"
@@ -35,21 +60,35 @@ def main() -> None:
3560

3661
addr = exchange_address(exchange_name, routing_key)
3762

63+
addr_queue = queue_address(queue_name)
64+
3865
print("create a publisher and publish a test message")
3966
publisher = connection.publisher(addr)
4067

4168
publisher.publish(Message(body="test"))
4269

70+
print("purging the queue")
71+
messages_purged = management.purge_queue(queue_name)
72+
73+
print("messages purged: " + str(messages_purged))
74+
75+
for i in range(10):
76+
publisher.publish(Message(body="test"))
77+
4378
publisher.close()
4479

80+
print("create a consumer and consume the test message")
81+
82+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
83+
4584
print("unbind")
4685
management.unbind(bind_name)
4786

48-
print("purging the queue")
49-
management.purge_queue(queue_name)
5087

88+
89+
consumer.close()
5190
print("delete queue")
52-
management.delete_queue(queue_name)
91+
#management.delete_queue(queue_name)
5392

5493
print("delete exchange")
5594
management.delete_exchange(exchange_name)

rabbitmq_amqp_python_client/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
from .address_helper import exchange_address, queue_address
44
from .common import QueueType
55
from .connection import Connection
6+
from .consumer import Consumer
67
from .entities import (
78
BindingSpecification,
89
ExchangeSpecification,
910
)
1011
from .management import Management
1112
from .publisher import Publisher
13+
from .qpid.proton._events import Event
1214
from .qpid.proton._message import Message
15+
from .qpid.proton.handlers import MessagingHandler
1316
from .queues import (
1417
ClassicQueueSpecification,
1518
QuorumQueueSpecification,
@@ -38,4 +41,7 @@
3841
"exchange_address",
3942
"queue_address",
4043
"Message",
44+
"Consumer",
45+
"MessagingHandler",
46+
"Event",
4147
]

rabbitmq_amqp_python_client/connection.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import logging
2+
from typing import Optional
23

4+
from .consumer import Consumer
35
from .management import Management
46
from .publisher import Publisher
7+
from .qpid.proton._handlers import MessagingHandler
58
from .qpid.proton.utils import BlockingConnection
69

710
logger = logging.getLogger(__name__)
@@ -34,3 +37,9 @@ def close(self) -> None:
3437
def publisher(self, destination: str) -> Publisher:
3538
publisher = Publisher(self._conn, destination)
3639
return publisher
40+
41+
def consumer(
42+
self, destination: str, handler: Optional[MessagingHandler] = None
43+
) -> Consumer:
44+
consumer = Consumer(self._conn, destination, handler)
45+
return consumer
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import logging
2+
from typing import Optional
3+
4+
from .options import ReceiverOption
5+
from .qpid.proton._handlers import MessagingHandler
6+
from .qpid.proton._message import Message
7+
from .qpid.proton.utils import (
8+
BlockingConnection,
9+
BlockingReceiver,
10+
)
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class Consumer:
16+
def __init__(
17+
self,
18+
conn: BlockingConnection,
19+
addr: str,
20+
handler: Optional[MessagingHandler] = None,
21+
):
22+
self._receiver: Optional[BlockingReceiver] = None
23+
self._conn = conn
24+
self._addr = addr
25+
self._handler = handler
26+
self._open()
27+
28+
def _open(self) -> None:
29+
if self._receiver is None:
30+
logger.debug("Creating Sender")
31+
self._receiver = self._create_receiver(self._addr)
32+
33+
def consume(self) -> Message:
34+
if self._receiver is not None:
35+
return self._receiver.receive()
36+
37+
def close(self) -> None:
38+
logger.debug("Closing Sender and Receiver")
39+
if self._receiver is not None:
40+
self._receiver.close()
41+
if self._receiver is not None:
42+
self._receiver.close()
43+
44+
def _create_receiver(self, addr: str) -> BlockingReceiver:
45+
return self._conn.create_receiver(
46+
addr, options=ReceiverOption(addr), handler=self._handler
47+
)

rabbitmq_amqp_python_client/qpid/proton/_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,9 @@ def __init__(
272272
)
273273
if credit:
274274
receiver.flow(credit)
275+
276+
if fetcher is None:
277+
print("fetcher is none")
275278
self.fetcher = fetcher
276279
self.container = connection.container
277280

@@ -526,7 +529,7 @@ def create_receiver(
526529
handler=handler or fetcher,
527530
options=options,
528531
),
529-
fetcher,
532+
handler or fetcher,
530533
credit=prefetch,
531534
)
532535

tests/test_consumer.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import time
2+
3+
from rabbitmq_amqp_python_client import (
4+
Connection,
5+
Event,
6+
Message,
7+
MessagingHandler,
8+
QuorumQueueSpecification,
9+
queue_address,
10+
)
11+
12+
13+
class MyMessageHandler(MessagingHandler):
14+
15+
def __init__(self):
16+
super().__init__()
17+
self._received = 0
18+
19+
def on_message(self, event: Event):
20+
self.accept(event.delivery)
21+
self._received = self._received + 1
22+
23+
def on_link_closed(self, event: Event) -> None:
24+
assert self._received > 10
25+
26+
27+
def test_consumer_sync_queue(connection: Connection) -> None:
28+
29+
queue_name = "test-queue"
30+
messages_to_send = 100
31+
management = connection.management()
32+
33+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
34+
35+
addr_queue = queue_address(queue_name)
36+
37+
publisher = connection.publisher("/queues/" + queue_name)
38+
consumer = connection.consumer(addr_queue)
39+
40+
consumed = 0
41+
42+
# publish 10 messages
43+
for i in range(messages_to_send):
44+
publisher.publish(Message(body="test" + str(i)))
45+
46+
time.sleep(5)
47+
48+
# consumer synchronously without handler
49+
for i in range(messages_to_send):
50+
message = consumer.consume()
51+
if message.body == "test" + str(i):
52+
consumed = consumed + 1
53+
54+
assert consumed >= 10
55+
56+
publisher.close()
57+
consumer.close()
58+
59+
management.delete_queue(queue_name)
60+
management.close()
61+
62+
63+
def test_consumer_async_queue(connection: Connection) -> None:
64+
65+
messages_to_send = 1000
66+
67+
queue_name = "test-queue"
68+
69+
management = connection.management()
70+
71+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
72+
73+
addr_queue = queue_address(queue_name)
74+
75+
publisher = connection.publisher("/queues/" + queue_name)
76+
77+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
78+
79+
# publish 10 messages
80+
for i in range(messages_to_send):
81+
publisher.publish(Message(body="test" + str(i)))
82+
83+
time.sleep(5)
84+
publisher.close()
85+
consumer.close()
86+
87+
management.delete_queue(queue_name)
88+
management.close()

tests/test_publisher.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ def test_publish_exchange(connection: Connection) -> None:
7272

7373

7474
def test_publish_purge(connection: Connection) -> None:
75-
connection = Connection("amqp://guest:guest@localhost:5672/")
76-
connection.dial()
7775

7876
queue_name = "test-queue"
7977
management = connection.management()
@@ -84,17 +82,17 @@ def test_publish_purge(connection: Connection) -> None:
8482

8583
try:
8684
publisher = connection.publisher("/queues/" + queue_name)
87-
for i in range(20):
85+
for i in range(100):
8886
publisher.publish(Message(body="test"))
8987
except Exception:
9088
raised = True
9189

92-
time.sleep(4)
90+
time.sleep(5)
9391

9492
message_purged = management.purge_queue(queue_name)
9593

9694
assert raised is False
97-
assert message_purged == 20
95+
assert message_purged == 100
9896

9997
publisher.close()
10098

0 commit comments

Comments
 (0)