4
4
AddressHelper ,
5
5
AmqpMessage ,
6
6
ArgumentOutOfRangeException ,
7
- BindingSpecification ,
8
7
Connection ,
9
8
ConnectionClosed ,
10
9
Environment ,
11
- ExchangeSpecification ,
10
+ OutcomeState ,
12
11
QuorumQueueSpecification ,
13
12
StreamSpecification ,
13
+ BindingSpecification ,
14
+ ExchangeSpecification ,
14
15
)
15
16
16
17
from .http_requests import delete_all_connections
18
+ from .utils import create_binding , publish_per_message
17
19
18
20
19
21
def test_publish_queue (connection : Connection ) -> None :
@@ -31,7 +33,7 @@ def test_publish_queue(connection: Connection) -> None:
31
33
try :
32
34
publisher = connection .publisher ("/queues/" + queue_name )
33
35
status = publisher .publish (AmqpMessage (body = "test" ))
34
- if status .ACCEPTED :
36
+ if status .remote_state == OutcomeState . ACCEPTED :
35
37
accepted = True
36
38
except Exception :
37
39
raised = True
@@ -46,6 +48,107 @@ def test_publish_queue(connection: Connection) -> None:
46
48
assert raised is False
47
49
48
50
51
+ def test_publish_per_message (connection : Connection ) -> None :
52
+
53
+ queue_name = "test-queue-1"
54
+ queue_name_2 = "test-queue-2"
55
+ management = connection .management ()
56
+
57
+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
58
+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
59
+
60
+ raised = False
61
+
62
+ publisher = None
63
+ accepted = False
64
+ accepted_2 = True
65
+
66
+ try :
67
+ publisher = connection .publisher ()
68
+ status = publish_per_message (
69
+ publisher , addr = AddressHelper .queue_address (queue_name )
70
+ )
71
+ if status .remote_state == OutcomeState .ACCEPTED :
72
+ accepted = True
73
+ status = publish_per_message (
74
+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
75
+ )
76
+ if status .remote_state == OutcomeState .ACCEPTED :
77
+ accepted_2 = True
78
+ except Exception :
79
+ raised = True
80
+
81
+ if publisher is not None :
82
+ publisher .close ()
83
+
84
+ purged_messages_queue_1 = management .purge_queue (queue_name )
85
+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
86
+ management .delete_queue (queue_name )
87
+ management .delete_queue (queue_name_2 )
88
+ management .close ()
89
+
90
+ assert accepted is True
91
+ assert accepted_2 is True
92
+ assert purged_messages_queue_1 == 1
93
+ assert purged_messages_queue_2 == 1
94
+ assert raised is False
95
+
96
+
97
+ def test_publish_per_message_to_exchange (connection : Connection ) -> None :
98
+
99
+ exchange_name = "test-exchange"
100
+ queue_name = "test-queue"
101
+ management = connection .management ()
102
+ routing_key = "routing-key"
103
+
104
+ management .declare_exchange (ExchangeSpecification (name = exchange_name ))
105
+
106
+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
107
+
108
+ bind_path = management .bind (
109
+ BindingSpecification (
110
+ source_exchange = exchange_name ,
111
+ destination_queue = queue_name ,
112
+ binding_key = routing_key ,
113
+ )
114
+ )
115
+
116
+ raised = False
117
+
118
+ publisher = None
119
+ # accepted = False
120
+ accepted_2 = True
121
+
122
+ try :
123
+ publisher = connection .publisher ()
124
+ # status = publish_per_message(
125
+ # publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
126
+ # )
127
+ # if status.remote_state == OutcomeState.ACCEPTED:
128
+ # accepted = True
129
+ status = publish_per_message (
130
+ publisher , addr = AddressHelper .queue_address (queue_name )
131
+ )
132
+ if status .remote_state == OutcomeState .ACCEPTED :
133
+ accepted_2 = True
134
+ except Exception :
135
+ raised = True
136
+
137
+ # if publisher is not None:
138
+ publisher .close ()
139
+
140
+ # purged_messages_queue = management.purge_queue(queue_name)
141
+ management .unbind (bind_path )
142
+ management .delete_exchange (exchange_name )
143
+ management .delete_queue (queue_name )
144
+ management .close ()
145
+
146
+ # assert accepted is True
147
+ assert accepted_2 is True
148
+ # assert purged_messages_queue == 1
149
+ assert raised is False
150
+
151
+
49
152
def test_publish_ssl (connection_ssl : Connection ) -> None :
50
153
51
154
queue_name = "test-queue"
@@ -90,24 +193,36 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90
193
assert raised is True
91
194
92
195
196
+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
197
+
198
+ queue_name = "test-queue-1"
199
+ raised = False
200
+
201
+ message = AmqpMessage (body = "test" )
202
+ message .to_address ("/invalid_destination/" + queue_name )
203
+ publisher = connection .publisher ()
204
+
205
+ try :
206
+ publisher .publish (message )
207
+ except ArgumentOutOfRangeException :
208
+ raised = True
209
+ except Exception :
210
+ raised = False
211
+
212
+ if publisher is not None :
213
+ publisher .close ()
214
+
215
+ assert raised is True
216
+
217
+
93
218
def test_publish_exchange (connection : Connection ) -> None :
94
219
95
220
exchange_name = "test-exchange"
96
221
queue_name = "test-queue"
97
222
management = connection .management ()
98
223
routing_key = "routing-key"
99
224
100
- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101
-
102
- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103
-
104
- management .bind (
105
- BindingSpecification (
106
- source_exchange = exchange_name ,
107
- destination_queue = queue_name ,
108
- binding_key = routing_key ,
109
- )
110
- )
225
+ create_binding (management , exchange_name , queue_name , routing_key )
111
226
112
227
addr = AddressHelper .exchange_address (exchange_name , routing_key )
113
228
0 commit comments