from __future__ import print_function import base64 import requests import os import json import time import sys # Workaround to support both python 2 & 3 import six import six.moves.urllib.parse as urllibparse from exceptions import SpotifyException class SpotifyOauthError(Exception): pass def _make_authorization_headers(client_id, client_secret): auth_header = base64.b64encode(six.text_type(client_id + ':' + client_secret).encode('ascii')) return {'Authorization': 'Basic %s' % auth_header.decode('ascii')} def is_token_expired(token_info): now = int(time.time()) return token_info['expires_at'] - now < 60 class SpotifyClientCredentials(object): OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' def __init__(self, client_id=None, client_secret=None, proxies=None): """ You can either provid a client_id and client_secret to the constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET environment variables """ if not client_id: client_id = os.getenv('SPOTIPY_CLIENT_ID') if not client_secret: client_secret = os.getenv('SPOTIPY_CLIENT_SECRET') if not client_id: raise SpotifyOauthError('No client id') if not client_secret: raise SpotifyOauthError('No client secret') self.client_id = client_id self.client_secret = client_secret self.token_info = None self.proxies = proxies def get_access_token(self): """ If a valid access token is in memory, returns it Else feches a new token and returns it """ if self.token_info and not self.is_token_expired(self.token_info): return self.token_info['access_token'] token_info = self._request_access_token() token_info = self._add_custom_values_to_token_info(token_info) self.token_info = token_info return self.token_info['access_token'] def _request_access_token(self): """Gets client credentials access token """ payload = { 'grant_type': 'client_credentials'} headers = _make_authorization_headers(self.client_id, self.client_secret) response = requests.post(self.OAUTH_TOKEN_URL, data=payload, headers=headers, verify=True, proxies=self.proxies) if response.status_code != 200: raise SpotifyOauthError(response.reason) token_info = response.json() return token_info def is_token_expired(self, token_info): return is_token_expired(token_info) def _add_custom_values_to_token_info(self, token_info): """ Store some values that aren't directly provided by a Web API response. """ token_info['expires_at'] = int(time.time()) + token_info['expires_in'] return token_info class SpotifyOAuth(object): ''' Implements Authorization Code Flow for Spotify's OAuth implementation. ''' OAUTH_AUTHORIZE_URL = 'https://accounts.spotify.com/authorize' OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' def __init__(self, client_id, client_secret, redirect_uri, state=None, scope=None, cache_path=None, proxies=None): ''' Creates a SpotifyOAuth object Parameters: - client_id - the client id of your app - client_secret - the client secret of your app - redirect_uri - the redirect URI of your app - state - security state - scope - the desired scope of the request - cache_path - path to location to save tokens ''' self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri self.state=state self.cache_path = cache_path self.scope=self._normalize_scope(scope) self.proxies = proxies def get_cached_token(self): ''' Gets a cached auth token ''' token_info = None if self.cache_path: try: f = open(self.cache_path) token_info_string = f.read() f.close() token_info = json.loads(token_info_string) # if scopes don't match, then bail if 'scope' not in token_info or not self._is_scope_subset(self.scope, token_info['scope']): return None if self.is_token_expired(token_info): token_info = self.refresh_access_token(token_info['refresh_token']) except IOError: pass return token_info def _save_token_info(self, token_info): if self.cache_path: try: f = open(self.cache_path, 'w') f.write(json.dumps(token_info)) f.close() except IOError: self._warn("couldn't write token cache to " + self.cache_path) pass def _is_scope_subset(self, needle_scope, haystack_scope): needle_scope = set(needle_scope.split()) if needle_scope else set() haystack_scope = set(haystack_scope.split()) if haystack_scope else set() return needle_scope <= haystack_scope def is_token_expired(self, token_info): return is_token_expired(token_info) def get_authorize_url(self, state=None, show_dialog=False): """ Gets the URL to use to authorize this app """ payload = {'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': self.redirect_uri} if self.scope: payload['scope'] = self.scope if state is None: state = self.state if state is not None: payload['state'] = state if show_dialog: payload['show_dialog'] = True urlparams = urllibparse.urlencode(payload) return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams) def parse_response_code(self, url): """ Parse the response code in the given response url Parameters: - url - the response url """ try: return url.split("?code=")[1].split("&")[0] except IndexError: return None def _make_authorization_headers(self): return _make_authorization_headers(self.client_id, self.client_secret) def get_access_token(self, code): """ Gets the access token for the app given the code Parameters: - code - the response code """ payload = {'redirect_uri': self.redirect_uri, 'code': code, 'grant_type': 'authorization_code'} if self.scope: payload['scope'] = self.scope if self.state: payload['state'] = self.state headers = self._make_authorization_headers() response = requests.post(self.OAUTH_TOKEN_URL, data=payload, headers=headers, verify=True, proxies=self.proxies) if response.status_code != 200: raise SpotifyOauthError(response.reason) token_info = response.json() token_info = self._add_custom_values_to_token_info(token_info) self._save_token_info(token_info) return token_info def _normalize_scope(self, scope): if scope: scopes = scope.split() scopes.sort() return ' '.join(scopes) else: return None def refresh_access_token(self, refresh_token): payload = { 'refresh_token': refresh_token, 'grant_type': 'refresh_token'} headers = self._make_authorization_headers() response = requests.post(self.OAUTH_TOKEN_URL, data=payload, headers=headers, proxies=self.proxies) try: response.raise_for_status() except: message = "Couldn't refresh token: code:%d reason:%s" % ( response.status_code, response.reason, ) raise SpotifyException(response.status_code, -1, message, headers) token_info = response.json() token_info = self._add_custom_values_to_token_info(token_info) if not 'refresh_token' in token_info: token_info['refresh_token'] = refresh_token self._save_token_info(token_info) return token_info def _add_custom_values_to_token_info(self, token_info): ''' Store some values that aren't directly provided by a Web API response. ''' token_info['expires_at'] = int(time.time()) + token_info['expires_in'] token_info['scope'] = self.scope return token_info def _warn(self, msg): print('warning:' + msg, file=sys.stderr)