1
+ # type: ignore
2
+
3
+
1
4
from rabbitmq_amqp_python_client import (
5
+ AddressHelper ,
6
+ AMQPMessagingHandler ,
2
7
BindingSpecification ,
3
8
Connection ,
9
+ Event ,
4
10
ExchangeSpecification ,
5
11
Message ,
6
12
QuorumQueueSpecification ,
7
- exchange_address ,
8
13
)
9
14
10
15
16
+ class MyMessageHandler (AMQPMessagingHandler ):
17
+
18
+ def __init__ (self ):
19
+ super ().__init__ ()
20
+ self ._count = 0
21
+
22
+ def on_message (self , event : Event ):
23
+ print ("received message: " + str (event .message .annotations ))
24
+
25
+ # accepting
26
+ self .delivery_context .accept (event )
27
+
28
+ # in case of rejection (+eventually deadlettering)
29
+ # self.delivery_context.discard(event)
30
+
31
+ # in case of requeuing
32
+ # self.delivery_context.requeue(event)
33
+
34
+ # annotations = {}
35
+ # annotations[symbol('x-opt-string')] = 'x-test1'
36
+ # in case of requeuing with annotations added
37
+ # self.delivery_context.requeue_with_annotations(event, annotations)
38
+
39
+ # in case of rejection with annotations added
40
+ # self.delivery_context.discard_with_annotations(event)
41
+
42
+ print ("count " + str (self ._count ))
43
+
44
+ self ._count = self ._count + 1
45
+
46
+ if self ._count == 100 :
47
+ print ("closing receiver" )
48
+ # if you want you can add cleanup operations here
49
+ # event.receiver.close()
50
+ # event.connection.close()
51
+
52
+ def on_connection_closed (self , event : Event ):
53
+ # if you want you can add cleanup operations here
54
+ print ("connection closed" )
55
+
56
+ def on_link_closed (self , event : Event ) -> None :
57
+ # if you want you can add cleanup operations here
58
+ print ("link closed" )
59
+
60
+
61
+ def create_connection () -> Connection :
62
+ connection = Connection ("amqp://guest:guest@localhost:5672/" )
63
+ connection .dial ()
64
+
65
+ return connection
66
+
67
+
11
68
def main () -> None :
69
+
12
70
exchange_name = "test-exchange"
13
71
queue_name = "example-queue"
14
72
routing_key = "routing-key"
15
- connection = Connection ( "amqp://guest:guest@localhost:5672/" )
73
+ messages_to_publish = 100
16
74
17
75
print ("connection to amqp server" )
18
- connection . dial ()
76
+ connection = create_connection ()
19
77
20
78
management = connection .management ()
21
79
22
80
print ("declaring exchange and queue" )
23
81
management .declare_exchange (ExchangeSpecification (name = exchange_name , arguments = {}))
24
82
25
- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
83
+ management .declare_queue (
84
+ QuorumQueueSpecification (name = queue_name )
85
+ # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
86
+ )
26
87
27
88
print ("binding queue to exchange" )
28
89
bind_name = management .bind (
@@ -33,21 +94,44 @@ def main() -> None:
33
94
)
34
95
)
35
96
36
- addr = exchange_address (exchange_name , routing_key )
97
+ addr = AddressHelper .exchange_address (exchange_name , routing_key )
98
+
99
+ addr_queue = AddressHelper .queue_address (queue_name )
37
100
38
101
print ("create a publisher and publish a test message" )
39
102
publisher = connection .publisher (addr )
40
103
41
- publisher .publish (Message (body = "test" ))
104
+ print ("purging the queue" )
105
+ messages_purged = management .purge_queue (queue_name )
106
+
107
+ print ("messages purged: " + str (messages_purged ))
108
+ # management.close()
109
+
110
+ # publish 10 messages
111
+ for i in range (messages_to_publish ):
112
+ publisher .publish (Message (body = "test" ))
42
113
43
114
publisher .close ()
44
115
116
+ print (
117
+ "create a consumer and consume the test message - press control + c to terminate to consume"
118
+ )
119
+ consumer = connection .consumer (addr_queue , handler = MyMessageHandler ())
120
+
121
+ try :
122
+ consumer .run ()
123
+ except KeyboardInterrupt :
124
+ pass
125
+
126
+ print ("cleanup" )
127
+ consumer .close ()
128
+ # once we finish consuming if we close the connection we need to create a new one
129
+ # connection = create_connection()
130
+ # management = connection.management()
131
+
45
132
print ("unbind" )
46
133
management .unbind (bind_name )
47
134
48
- print ("purging the queue" )
49
- management .purge_queue (queue_name )
50
-
51
135
print ("delete queue" )
52
136
management .delete_queue (queue_name )
53
137
@@ -56,7 +140,9 @@ def main() -> None:
56
140
57
141
print ("closing connections" )
58
142
management .close ()
143
+ print ("after management closing" )
59
144
connection .close ()
145
+ print ("after connection closing" )
60
146
61
147
62
148
if __name__ == "__main__" :
0 commit comments