14
14
15
15
from .address_helper import validate_address
16
16
from .consumer import Consumer
17
- from .entities import RecoveryConfiguration , StreamOptions
17
+ from .entities import (
18
+ OAuth2Options ,
19
+ RecoveryConfiguration ,
20
+ StreamOptions ,
21
+ )
18
22
from .exceptions import (
19
23
ArgumentOutOfRangeException ,
20
24
ValidationCodeException ,
@@ -60,6 +64,7 @@ def __init__(
60
64
ssl_context : Union [
61
65
PosixSslConfigurationContext , WinSslConfigurationContext , None
62
66
] = None ,
67
+ oauth2_options : Optional [OAuth2Options ] = None ,
63
68
recovery_configuration : RecoveryConfiguration = RecoveryConfiguration (),
64
69
):
65
70
"""
@@ -93,6 +98,7 @@ def __init__(
93
98
self ._index : int = - 1
94
99
self ._publishers : list [Publisher ] = []
95
100
self ._consumers : list [Consumer ] = []
101
+ self ._oauth2_options = oauth2_options
96
102
97
103
# Some recovery_configuration validation
98
104
if recovery_configuration .back_off_reconnect_interval < timedelta (seconds = 1 ):
@@ -109,19 +115,8 @@ def _set_environment_connection_list(self, connections: []): # type: ignore
109
115
def _open_connections (self , reconnect_handlers : bool = False ) -> None :
110
116
111
117
logger .debug ("inside connection._open_connections creating connection" )
112
- if self ._recovery_configuration .active_recovery is False :
113
- self ._conn = BlockingConnection (
114
- url = self ._addr ,
115
- urls = self ._addrs ,
116
- ssl_domain = self ._ssl_domain ,
117
- )
118
- else :
119
- self ._conn = BlockingConnection (
120
- url = self ._addr ,
121
- urls = self ._addrs ,
122
- ssl_domain = self ._ssl_domain ,
123
- on_disconnection_handler = self ._on_disconnection ,
124
- )
118
+
119
+ self ._create_connection ()
125
120
126
121
if reconnect_handlers is True :
127
122
logger .debug ("reconnecting managements, publishers and consumers handlers" )
@@ -137,6 +132,40 @@ def _open_connections(self, reconnect_handlers: bool = False) -> None:
137
132
# Update the broken connection and sender in the consumer
138
133
self ._consumers [i ]._update_connection (self ._conn )
139
134
135
+ def _create_connection (self ) -> None :
136
+
137
+ user = None
138
+ password = None
139
+ mechs = None
140
+
141
+ if self ._oauth2_options is not None :
142
+ user = ""
143
+ password = self ._oauth2_options .token
144
+ mechs = "PLAIN"
145
+ print ("password, mechs: " + user + " " + password )
146
+
147
+ if self ._recovery_configuration .active_recovery is False :
148
+ self ._conn = BlockingConnection (
149
+ url = self ._addr ,
150
+ urls = self ._addrs ,
151
+ oauth2_options = self ._oauth2_options ,
152
+ ssl_domain = self ._ssl_domain ,
153
+ allowed_mechs = mechs ,
154
+ user = user ,
155
+ password = password ,
156
+ )
157
+ else :
158
+ self ._conn = BlockingConnection (
159
+ url = self ._addr ,
160
+ urls = self ._addrs ,
161
+ oauth2_options = self ._oauth2_options ,
162
+ ssl_domain = self ._ssl_domain ,
163
+ on_disconnection_handler = self ._on_disconnection ,
164
+ allowed_mechs = mechs ,
165
+ user = user ,
166
+ password = password ,
167
+ )
168
+
140
169
def dial (self ) -> None :
141
170
"""
142
171
Establish a connection to the AMQP server.
0 commit comments