1
1
import uuid
2
2
from typing import Any , Optional
3
- import json
4
- from proton import Message , Receiver , Sender
3
+
4
+ from proton import Message
5
+ from proton ._data import Data
5
6
from proton .utils import (
6
7
BlockingConnection ,
7
8
BlockingReceiver ,
8
9
BlockingSender ,
9
10
)
10
11
11
- from proton ._data import Data
12
-
13
12
from .address_helper import exchange_address , queue_address
14
13
from .common import CommonValues
15
- from .configuration_options import (
16
- ReceiverOption ,
17
- SenderOption ,
18
- )
19
14
from .entities import (
20
15
ExchangeSpecification ,
21
16
QueueSpecification ,
22
17
)
18
+ from .options import ReceiverOption , SenderOption
23
19
24
- import pickle
25
20
26
21
class Management :
27
22
def __init__ (self , conn : BlockingConnection ):
@@ -59,7 +54,6 @@ def request(
59
54
method : str ,
60
55
expected_response_codes : list [int ],
61
56
) -> None :
62
- print ("im in request" )
63
57
self ._request (str (uuid .uuid4 ()), body , path , method , expected_response_codes )
64
58
65
59
def _request (
@@ -70,63 +64,37 @@ def _request(
70
64
method : str ,
71
65
expected_response_codes : list [int ],
72
66
) -> None :
73
- print ("path is: " + path )
74
-
75
- ## test exchange message
76
67
amq_message = Message (
77
68
id = id ,
78
69
body = body ,
79
70
reply_to = "$me" ,
80
71
address = path ,
81
72
subject = method ,
82
- #properties={"id": id, "to": path, "subject": method, "reply_to": "$me"},
83
- )
84
-
85
- kvBody = {
86
- "auto_delete" : False ,
87
- "durable" : True ,
88
- "type" : "direct" ,
89
- "arguments" : {},
90
- }
91
-
92
- amq_message = Message (
93
- body = kvBody ,
94
- reply_to = "$me" ,
95
- address = path ,
96
- subject = method ,
97
- id = id ,
98
73
)
99
74
100
- message_bytes = amq_message .encode ()
101
- list_bytes = list (message_bytes )
102
-
103
75
if self ._sender is not None :
104
76
self ._sender .send (amq_message )
105
77
106
- msg = self ._receiver .receive ()
107
-
108
-
109
- print ("response received: " + str (msg .subject ))
110
-
111
- #self._validate_reponse_code(int(msg.properties["http:response"]), expected_response_codes)
78
+ if self ._receiver is not None :
79
+ msg = self ._receiver .receive ()
112
80
113
- # TO_COMPLETE HERE
81
+ self . _validate_reponse_code ( int ( msg . subject ), expected_response_codes )
114
82
115
83
# TODO
116
84
# def delete_queue(self, name:str):
117
85
118
- def declare_exchange (self , exchange_specification : ExchangeSpecification ):
86
+ def declare_exchange (
87
+ self , exchange_specification : ExchangeSpecification
88
+ ) -> ExchangeSpecification :
119
89
body = {}
120
90
body ["auto_delete" ] = exchange_specification .is_auto_delete
121
91
body ["durable" ] = exchange_specification .is_durable
122
- body ["type" ] = exchange_specification .exchange_type .value
123
- # body["internal"] = False
124
- body ["arguments" ] = {}
92
+ body ["type" ] = exchange_specification .exchange_type .value # type: ignore
93
+ body ["internal" ] = exchange_specification . is_internal
94
+ body ["arguments" ] = {} # type: ignore
125
95
126
96
path = exchange_address (exchange_specification .name )
127
97
128
- print (path )
129
-
130
98
self .request (
131
99
body ,
132
100
path ,
@@ -138,11 +106,15 @@ def declare_exchange(self, exchange_specification: ExchangeSpecification):
138
106
],
139
107
)
140
108
141
- def declare_queue (self , queue_specification : QueueSpecification ):
109
+ return exchange_specification
110
+
111
+ def declare_queue (
112
+ self , queue_specification : QueueSpecification
113
+ ) -> QueueSpecification :
142
114
body = {}
143
115
body ["auto_delete" ] = queue_specification .is_auto_delete
144
116
body ["durable" ] = queue_specification .is_durable
145
- body ["arguments" ] = {
117
+ body ["arguments" ] = { # type: ignore
146
118
"x-queue-type" : queue_specification .queue_type .value ,
147
119
"x-dead-letter-exchange" : queue_specification .dead_letter_exchange ,
148
120
"x-dead-letter-routing-key" : queue_specification .dead_letter_routing_key ,
@@ -164,8 +136,9 @@ def declare_queue(self, queue_specification: QueueSpecification):
164
136
],
165
137
)
166
138
167
- def delete_exchange ( self , exchange_name : str ):
139
+ return queue_specification
168
140
141
+ def delete_exchange (self , exchange_name : str ) -> None :
169
142
path = exchange_address (exchange_name )
170
143
171
144
print (path )
@@ -179,9 +152,7 @@ def delete_exchange(self, exchange_name:str):
179
152
],
180
153
)
181
154
182
-
183
- def delete_queue (self , queue_name :str ):
184
-
155
+ def delete_queue (self , queue_name : str ) -> None :
185
156
path = queue_address (queue_name )
186
157
187
158
print (path )
@@ -195,11 +166,10 @@ def delete_queue(self, queue_name:str):
195
166
],
196
167
)
197
168
198
- def _validate_reponse_code (self , response_code : int , expected_response_codes : list [int ]) -> None :
199
-
200
- print ("response code: " + str (response_code ))
201
-
202
- if response_code == CommonValues .response_code_409 :
169
+ def _validate_reponse_code (
170
+ self , response_code : int , expected_response_codes : list [int ]
171
+ ) -> None :
172
+ if response_code == CommonValues .response_code_409 .value :
203
173
# TODO replace with a new defined Exception
204
174
raise Exception ("ErrPreconditionFailed" )
205
175
@@ -209,7 +179,6 @@ def _validate_reponse_code(self, response_code: int, expected_response_codes: li
209
179
210
180
raise Exception ("wrong response code received" )
211
181
212
-
213
182
# TODO
214
183
# def bind(self, bind_specification:BindSpecification):
215
184
0 commit comments