-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathauth.py
More file actions
155 lines (130 loc) · 5.31 KB
/
auth.py
File metadata and controls
155 lines (130 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import aiohttp
import os
from dateutil import parser
from time import time
class AuthModel:
def __init__(self):
self._auth_url = None
self._username = None
self._password = None
self._user_domain_name = None
self._project_domain_name = None
self._project_id = None
self._project_name = None
self._region_name = None
self._application_credential_id = None
self._application_credential_secret = None
async def authenticate(self):
raise NotImplementedError
@property
def os_auth_url(self):
return self._auth_url or os.environ.get('OS_AUTH_URL')
@property
def os_username(self):
return self._username or os.environ.get('OS_USERNAME')
@property
def os_password(self):
return self._password or os.environ.get('OS_PASSWORD')
@property
def os_user_domain_name(self):
return self._user_domain_name or os.environ.get('OS_USER_DOMAIN_NAME')
@property
def os_project_domain_name(self):
return self._project_domain_name or self.os_user_domain_name
@property
def os_project_id(self):
return self._project_id or os.environ.get('OS_PROJECT_ID')
@property
def os_project_name(self):
return self._project_name or os.environ.get('OS_PROJECT_NAME')
@property
def os_region_name(self):
return self._region_name or os.environ.get('OS_REGION_NAME')
@property
def os_application_credential_id(self):
return self._application_credential_id or os.environ.get('OS_APPLICATION_CREDENTIAL_ID')
@property
def os_application_credential_secret(self):
return self._application_credential_secret or os.environ.get('OS_APPLICATION_CREDENTIAL_SECRET')
class AuthPassword(AuthModel):
def __init__(self, auth_url=None, username=None, password=None, project_name=None,
user_domain_name=None, project_domain_name=None,
application_credential_id=None, application_credential_secret=None):
super().__init__()
self._auth_url = auth_url
self._username = username
self._password = password
self._project_name = project_name
self._user_domain_name = user_domain_name
self._project_domain_name = project_domain_name
self._application_credential_id = application_credential_id
self._application_credential_secret = application_credential_secret
self._auth_endpoint = self.os_auth_url + '/auth/tokens'
self.token = None
self.token_expires_at = 0
self.headers = {
'Content-Type': 'application/json'
}
self._auth_payload = {'auth': {}}
for key_from_property in (self._identity, self._scope):
self._auth_payload['auth'].update(key_from_property)
def is_token_valid(self):
return self.token_expires_at - time() > 0
@property
def _identity(self):
if self.os_application_credential_id and self.os_application_credential_secret:
return {"identity": {
"methods": ["application_credential"],
"application_credential": {
"id": self.os_application_credential_id,
"secret": self.os_application_credential_secret
}
}
}
else:
return {"identity": {
'methods': ['password'],
'password': {
'user': {
'domain': {
'name': self.os_user_domain_name
},
'name': self.os_username,
'password': self.os_password
}
}
}}
@property
def _scope(self):
if self.os_application_credential_id and self.os_application_credential_secret:
# The scope is automatically determined from the application_credential
return {}
else:
return {"scope": {
"project": {
"domain": {
"name": self.os_project_domain_name
},
"name": self.os_project_name
}
}}
async def get_token(self):
async with aiohttp.ClientSession() as session:
async with session.post(self._auth_endpoint, json=self._auth_payload, headers=self.headers) as response:
result = await response.json()
return (
response.headers['X-Subject-Token'],
parser.parse(result['token']['expires_at']).timestamp(),
result['token']['catalog']
)
def get_endpoint_url(self, endpoint_name, prefered_interface="public"):
try:
for endpoint in self.endpoints:
if endpoint['name'] == endpoint_name:
return [url['url'] for url in endpoint['endpoints'] if url['interface'] == prefered_interface][0]
except IndexError:
raise ValueError("could not find desired interface")
raise ValueError("endpoint %s not found" % endpoint_name)
async def authenticate(self):
if self.token is None or self.is_token_valid() is False:
self.token, self.token_expires_at, self.endpoints = await self.get_token()