Source code for esy.devel

from http.server import HTTPServer, BaseHTTPRequestHandler
import os
import threading
import webbrowser
import urllib.parse
from requests_html import HTMLSession
from requests import Request
import random
import string
import getpass
from esy.auth import ESIAuthenticator
from esy.constants import ESI_AUTHORIZE_ENDPOINT

CLIENT_ID = os.getenv('ESY_CLIENT_ID')
SECRET_KEY = os.getenv('ESY_SECRET_KEY')
CALLBACK_URL = os.getenv('ESY_CALLBACK_URL', 'http://localhost:8000')
SERVER_ADDRESS = os.getenv('ESY_SERVER_ADDRESS', 'localhost')
SERVER_PORT = os.getenv('ESY_SERVER_PORT', '8000')
SCOPES = os.getenv('ESY_SCOPES')
SESSION = {}


[docs]class AuthenticationHandler(BaseHTTPRequestHandler): """ HTTP Request handler that pilfers the state and authorization code from an incoming request. """ DEVSERVER_HTML = bytes('<!DOCTYPE html>' '<html>' '<body>' '<p>Authentication complete, you can close this ' 'window now.</p>' '</body>' '</html>', encoding='utf-8') def do_GET(self): parsed = urllib.parse.urlparse(self.path) for k, v in urllib.parse.parse_qs(parsed.query).items(): SESSION[k] = ','.join(v) self.send_response(200) self.end_headers() self.request.send(self.DEVSERVER_HTML) self.request.close()
[docs]class DevServer(HTTPServer, threading.Thread): """ Tiny HTTP Server used to listen for incoming redirects from ESI, so we can snatch the authorization code. """ def __init__(self, server_address): HTTPServer.__init__(self, server_address, AuthenticationHandler, True) threading.Thread.__init__(self)
[docs] def run(self): self.handle_request() self.server_close()
[docs]def get_authorization_code(cli_login=False, server_address=SERVER_ADDRESS, server_port=SERVER_PORT, client_id=CLIENT_ID, callback_url=CALLBACK_URL, scopes=SCOPES, character_id=None, username=None, password=None): """ Starts an SSO session with ESI and retrieves the authorization code. Optionally prompts for username and password input, and character selection. :param cli_login: Start CLI-based authentication or just print the SSO URL. :type cli_login: bool :param server_address: The address :class:`DevServer` is binding to. :type server_address: str :param server_port: The port :class:`DevServer` is listening on. :type server_port: str or int :param client_id: The ESI ClientID :type client_id: str :param callback_url: The ESI CallbackURL :type callback_url: str :param scopes: The selected ESI scopes, as space-delimited string. :type scopes: str :param character_id: Pre-selected CharacterId to authorize :type character_id: str or int :param username: EVE Online SSO username :type username: str :param password: EVE Online SSO password :type password: str :return: authorization code :rtype: str """ state = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) dev_server = DevServer((server_address, int(server_port))) dev_server.start() if cli_login: _do_cli_login(callback_url, client_id, scopes, state, username, password, character_id) else: request = Request('GET', ESI_AUTHORIZE_ENDPOINT, params={ 'response_type': 'code', 'redirect_uri': callback_url, 'client_id': client_id, 'scope': scopes, 'state': state }).prepare() print('Please complete the EVE SSO authentication: {}'.format( request.url)) webbrowser.open(request.url) dev_server.join() if SESSION['state'] != state: print('State "{}" does not match our original state "{}"'.format( SESSION['state'], state)) return None return SESSION['code']
def _do_cli_login(callback_url, client_id, scopes, state, username=None, password=None, character_id=None): """ My Little Browser. """ with HTMLSession() as session: response = session.get(ESI_AUTHORIZE_ENDPOINT, params={ 'response_type': 'code', 'redirect_uri': callback_url, 'client_id': client_id, 'scope': scopes, 'state': state }) response.raise_for_status() post_url = '/'.join(ESI_AUTHORIZE_ENDPOINT.split('/')[:3] + [response.html.find('form', first=True).attrs.get('action')]) if not all((username, password)): print('Logging in to EVE Online') if not username: username = input('Username: ') if not password: password = getpass.getpass('Password: ') req_token = response.html.find('input[name="__RequestVerificationToken"]', first=True).attrs.get('value') response = response.session.post(post_url, data={'UserName': username, 'Password': password, '__RequestVerificationToken': req_token}) response.raise_for_status() # Fetch the input values from the response and put them into the data we # will send back. Most of these are identical, but never know what funk # will ensue. post_data = {} for input_name in ('ClientIdentifier', 'RedirectUri', 'State', 'Scope', 'ResponseType', '__RequestVerificationToken'): element = response.html.find('input[name="{}"]'.format(input_name), first=True) post_data[input_name] = element.attrs['value'] # Get the list of characters from the selection box and get the user to # choose selection = response.html.find('select[name="CharacterId"] option') if character_id is not None: post_data['CharacterId'] = str(character_id) else: # pragma: nocover choices = [] for option in selection: value = option.attrs.get('value') choices.append(value) print('{}. {}'.format(len(choices), option.text)) while 'CharacterId' not in post_data: choice = input('Chose character: ') try: post_data['CharacterId'] = choices[int(choice) - 1] break except ValueError: print('Invalid choice') except IndexError: print('Invalid choice') # Send the request and wait for the redirect to our local http server response = response.session.post(ESI_AUTHORIZE_ENDPOINT, data=post_data) response.raise_for_status() def verify_authorization_code(authorization_code, client_id=CLIENT_ID, secret_key=SECRET_KEY): authenticator = ESIAuthenticator() return authenticator.verify_authorization_code(authorization_code, client_id, secret_key) def verify_access_token(access_token): authenticator = ESIAuthenticator() return authenticator.verify_access_token(access_token) def get_access_token(refresh_token, client_id=CLIENT_ID, secret_key=SECRET_KEY): authenticator = ESIAuthenticator() return authenticator.get_access_token(refresh_token, client_id, secret_key) def revoke_token(token, token_type, client_id=CLIENT_ID, secret_key=SECRET_KEY): authenticator = ESIAuthenticator() return authenticator.revoke_token(token, token_type=token_type, client_id=client_id, secret=secret_key)