14
14
15
15
from .address_helper import validate_address
16
16
from .consumer import Consumer
17
- from .entities import RecoveryConfiguration , StreamOptions
17
+ from .entities import (
18
+ OAuth2Options ,
19
+ RecoveryConfiguration ,
20
+ StreamOptions ,
21
+ )
18
22
from .exceptions import (
19
23
ArgumentOutOfRangeException ,
20
24
ValidationCodeException ,
@@ -60,6 +64,7 @@ def __init__(
60
64
ssl_context : Union [
61
65
PosixSslConfigurationContext , WinSslConfigurationContext , None
62
66
] = None ,
67
+ oauth2_options : Optional [OAuth2Options ] = None ,
63
68
recovery_configuration : RecoveryConfiguration = RecoveryConfiguration (),
64
69
):
65
70
"""
@@ -93,6 +98,7 @@ def __init__(
93
98
self ._index : int = - 1
94
99
self ._publishers : list [Publisher ] = []
95
100
self ._consumers : list [Consumer ] = []
101
+ self ._oauth2_options = oauth2_options
96
102
97
103
# Some recovery_configuration validation
98
104
if recovery_configuration .back_off_reconnect_interval < timedelta (seconds = 1 ):
@@ -109,19 +115,8 @@ def _set_environment_connection_list(self, connections: []): # type: ignore
109
115
def _open_connections (self , reconnect_handlers : bool = False ) -> None :
110
116
111
117
logger .debug ("inside connection._open_connections creating connection" )
112
- if self ._recovery_configuration .active_recovery is False :
113
- self ._conn = BlockingConnection (
114
- url = self ._addr ,
115
- urls = self ._addrs ,
116
- ssl_domain = self ._ssl_domain ,
117
- )
118
- else :
119
- self ._conn = BlockingConnection (
120
- url = self ._addr ,
121
- urls = self ._addrs ,
122
- ssl_domain = self ._ssl_domain ,
123
- on_disconnection_handler = self ._on_disconnection ,
124
- )
118
+
119
+ self ._create_connection ()
125
120
126
121
if reconnect_handlers is True :
127
122
logger .debug ("reconnecting managements, publishers and consumers handlers" )
@@ -137,6 +132,35 @@ def _open_connections(self, reconnect_handlers: bool = False) -> None:
137
132
# Update the broken connection and sender in the consumer
138
133
self ._consumers [i ]._update_connection (self ._conn )
139
134
135
+ def _create_connection (self ) -> None :
136
+
137
+ user = None
138
+ password = None
139
+
140
+ if self ._oauth2_options is not None :
141
+ user = ""
142
+ password = self ._oauth2_options .token
143
+
144
+ if self ._recovery_configuration .active_recovery is False :
145
+ self ._conn = BlockingConnection (
146
+ url = self ._addr ,
147
+ urls = self ._addrs ,
148
+ oauth2_options = self ._oauth2_options ,
149
+ ssl_domain = self ._ssl_domain ,
150
+ user = user ,
151
+ password = password ,
152
+ )
153
+ else :
154
+ self ._conn = BlockingConnection (
155
+ url = self ._addr ,
156
+ urls = self ._addrs ,
157
+ oauth2_options = self ._oauth2_options ,
158
+ ssl_domain = self ._ssl_domain ,
159
+ on_disconnection_handler = self ._on_disconnection ,
160
+ user = user ,
161
+ password = password ,
162
+ )
163
+
140
164
def dial (self ) -> None :
141
165
"""
142
166
Establish a connection to the AMQP server.
0 commit comments