-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
authenticator.py
162 lines (128 loc) · 6.12 KB
/
authenticator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import json
from math import log
import os
import re
import subprocess
import threading
import asyncio
import time
import logging
from fastapi import HTTPException
import pexpect
import functools
class AzureAuthenticator:
def __init__(self):
self.users_data = {}
self.lock = threading.Lock()
async def get_device_code(self, user_id: str):
# Set the environment variable
env = self.set_env(user_id)
# run az logout to clear any existing sessions
logging.info("Logging out of Azure CLI...")
result = subprocess.run(['az', 'logout'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
# make sure the user is logged out
time.sleep(5)
if result.returncode != 0:
logging.error(f"Failed to logout: {result.stderr.decode('utf-8')}")
else:
logging.info("Logged out of Azure CLI.")
# Execute the command
child = pexpect.spawn('az login --use-device-code', env=env, timeout=120)
logging.debug("Launching a device code process...")
index = child.expect(['To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code .* to authenticate.', pexpect.EOF])
logging.debug(str(child.before))
if index == 0:
# Extract the device code from the output
match = re.search('enter the code (.*?) to authenticate', child.after.decode())
if match:
device_code = match.group(1)
logging.debug(f"Device code: {device_code}")
else:
logging.error("Could not extract device code")
raise HTTPException(status_code=500, detail="Could not extract device code")
elif index == 1:
logging.error("Command exited before device code was provided")
raise HTTPException(status_code=500, detail="Command exited before device code was provided")
with self.lock:
self.users_data[user_id] = {'child': child}
logging.info("Device code process completed successfully.")
url = 'https://microsoft.com/devicelogin'
return url, device_code
def set_env(self, user_id):
# to utilize the multiple user login experience, we need to set the environment variable AZURE_CONFIG_DIR
# https://github.com/microsoft/azure-pipelines-tasks/issues/8314
temp_dir = self.get_temp_dir(user_id)
env = os.environ.copy()
env['AZURE_CONFIG_DIR'] = temp_dir
# Ensure the directory exists
os.makedirs(env['AZURE_CONFIG_DIR'], exist_ok=True)
# execute az config set core.login_experience_v2=off
command = ['az', 'config', 'set', 'core.login_experience_v2=off']
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
if result.returncode != 0:
print(f"Failed to set config: {result.stderr.decode('utf-8')}")
logging.error(f"Failed to set config: {result.stderr.decode('utf-8')}")
else:
logging.info("az config set successfully")
return env
def get_temp_dir(self, user_id: str):
if os.path.exists('/.dockerenv'):
# We are running inside a Docker container
# /root/.temp/user_id
temp_dir = os.path.join('/app/.temp', user_id)
logging.info(f"Running inside a Docker container. Temp directory: {temp_dir}")
else:
# We are not running inside a Docker container
temp_dir = os.path.join(os.path.expanduser('~'), '.temp', user_id)
logging.info(f"Running outside a Docker container. Temp directory: {temp_dir}")
# Ensure the directory exists
os.makedirs(temp_dir, exist_ok=True)
return temp_dir
async def get_token(self, user_id: str, resource: str):
try:
# Wait for the command to finish
child = self.users_data[user_id]['child']
child.close(True)
logging.debug(f'{child.exitstatus}-{child.signalstatus}')
output = child.before.decode() + child.after.decode()
logging.info(output)
except KeyError:
logging.warning(f"No child process found for user {user_id}. Continuing execution.")
env = self.set_env(user_id)
# Execute the command
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, functools.partial(subprocess.run, ['az', 'account', 'get-access-token', '--resource', resource], capture_output=True, text=True, env=env))
# Check if the command was successful
if result.returncode != 0:
raise Exception(f'Command failed with exit code {result.returncode}: {result.stderr}')
# Parse the output as JSON
token_info = json.loads(result.stdout)
return token_info
async def check_az_login(self):
# Execute the command
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, functools.partial(subprocess.run, ['az', 'account', 'get-access-token'], capture_output=True, text=True))
# Check if the output equals the specified string
if result.stdout.strip() == 'Please run "az login" to access your accounts.':
return False
else:
return True
async def authenticate(self, user_id: str, resource: str):
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
token = await self.get_token(user_id, resource)
logging.info("Authentication successful.")
return token
except Exception as e:
error_message = str(e)
if 'az login' in error_message:
logging.error("'az login' found in the error message. Returning None.")
return None
logging.error("Waiting for user to authenticate..." + error_message)
await asyncio.sleep(10)
retry_count += 1
if retry_count == max_retries:
logging.error("Authentication failed after 3 attempts.")
return None