1
1
import uuid
2
2
from typing import Any , Optional
3
- import json
4
- from proton import Message , Receiver , Sender
3
+
4
+ from proton import Message
5
+ from proton ._data import Data
5
6
from proton .utils import (
6
7
BlockingConnection ,
7
8
BlockingReceiver ,
8
9
BlockingSender ,
9
10
)
10
11
11
- from proton ._data import Data
12
-
13
12
from .address_helper import exchange_address , queue_address
14
13
from .common import CommonValues
15
14
from .configuration_options import (
21
20
QueueSpecification ,
22
21
)
23
22
24
- import pickle
25
23
26
24
class Management :
27
25
def __init__ (self , conn : BlockingConnection ):
@@ -59,7 +57,6 @@ def request(
59
57
method : str ,
60
58
expected_response_codes : list [int ],
61
59
) -> None :
62
- print ("im in request" )
63
60
self ._request (str (uuid .uuid4 ()), body , path , method , expected_response_codes )
64
61
65
62
def _request (
@@ -70,63 +67,37 @@ def _request(
70
67
method : str ,
71
68
expected_response_codes : list [int ],
72
69
) -> None :
73
- print ("path is: " + path )
74
-
75
- ## test exchange message
76
70
amq_message = Message (
77
71
id = id ,
78
72
body = body ,
79
73
reply_to = "$me" ,
80
74
address = path ,
81
75
subject = method ,
82
- #properties={"id": id, "to": path, "subject": method, "reply_to": "$me"},
83
- )
84
-
85
- kvBody = {
86
- "auto_delete" : False ,
87
- "durable" : True ,
88
- "type" : "direct" ,
89
- "arguments" : {},
90
- }
91
-
92
- amq_message = Message (
93
- body = kvBody ,
94
- reply_to = "$me" ,
95
- address = path ,
96
- subject = method ,
97
- id = id ,
98
76
)
99
77
100
- message_bytes = amq_message .encode ()
101
- list_bytes = list (message_bytes )
102
-
103
78
if self ._sender is not None :
104
79
self ._sender .send (amq_message )
105
80
106
- msg = self ._receiver .receive ()
107
-
108
-
109
- print ("response received: " + str (msg .subject ))
110
-
111
- #self._validate_reponse_code(int(msg.properties["http:response"]), expected_response_codes)
81
+ if self ._receiver is not None :
82
+ msg = self ._receiver .receive ()
112
83
113
- # TO_COMPLETE HERE
84
+ self . _validate_reponse_code ( int ( msg . subject ), expected_response_codes )
114
85
115
86
# TODO
116
87
# def delete_queue(self, name:str):
117
88
118
- def declare_exchange (self , exchange_specification : ExchangeSpecification ):
89
+ def declare_exchange (
90
+ self , exchange_specification : ExchangeSpecification
91
+ ) -> ExchangeSpecification :
119
92
body = {}
120
93
body ["auto_delete" ] = exchange_specification .is_auto_delete
121
94
body ["durable" ] = exchange_specification .is_durable
122
- body ["type" ] = exchange_specification .exchange_type .value
123
- # body["internal"] = False
124
- body ["arguments" ] = {}
95
+ body ["type" ] = exchange_specification .exchange_type .value # type: ignore
96
+ body ["internal" ] = exchange_specification . is_internal
97
+ body ["arguments" ] = {} # type: ignore
125
98
126
99
path = exchange_address (exchange_specification .name )
127
100
128
- print (path )
129
-
130
101
self .request (
131
102
body ,
132
103
path ,
@@ -138,11 +109,15 @@ def declare_exchange(self, exchange_specification: ExchangeSpecification):
138
109
],
139
110
)
140
111
141
- def declare_queue (self , queue_specification : QueueSpecification ):
112
+ return exchange_specification
113
+
114
+ def declare_queue (
115
+ self , queue_specification : QueueSpecification
116
+ ) -> QueueSpecification :
142
117
body = {}
143
118
body ["auto_delete" ] = queue_specification .is_auto_delete
144
119
body ["durable" ] = queue_specification .is_durable
145
- body ["arguments" ] = {
120
+ body ["arguments" ] = { # type: ignore
146
121
"x-queue-type" : queue_specification .queue_type .value ,
147
122
"x-dead-letter-exchange" : queue_specification .dead_letter_exchange ,
148
123
"x-dead-letter-routing-key" : queue_specification .dead_letter_routing_key ,
@@ -164,8 +139,9 @@ def declare_queue(self, queue_specification: QueueSpecification):
164
139
],
165
140
)
166
141
167
- def delete_exchange ( self , exchange_name : str ):
142
+ return queue_specification
168
143
144
+ def delete_exchange (self , exchange_name : str ) -> None :
169
145
path = exchange_address (exchange_name )
170
146
171
147
print (path )
@@ -179,9 +155,7 @@ def delete_exchange(self, exchange_name:str):
179
155
],
180
156
)
181
157
182
-
183
- def delete_queue (self , queue_name :str ):
184
-
158
+ def delete_queue (self , queue_name : str ) -> None :
185
159
path = queue_address (queue_name )
186
160
187
161
print (path )
@@ -195,11 +169,10 @@ def delete_queue(self, queue_name:str):
195
169
],
196
170
)
197
171
198
- def _validate_reponse_code (self , response_code : int , expected_response_codes : list [int ]) -> None :
199
-
200
- print ("response code: " + str (response_code ))
201
-
202
- if response_code == CommonValues .response_code_409 :
172
+ def _validate_reponse_code (
173
+ self , response_code : int , expected_response_codes : list [int ]
174
+ ) -> None :
175
+ if response_code == CommonValues .response_code_409 .value :
203
176
# TODO replace with a new defined Exception
204
177
raise Exception ("ErrPreconditionFailed" )
205
178
@@ -209,7 +182,6 @@ def _validate_reponse_code(self, response_code: int, expected_response_codes: li
209
182
210
183
raise Exception ("wrong response code received" )
211
184
212
-
213
185
# TODO
214
186
# def bind(self, bind_specification:BindSpecification):
215
187
0 commit comments