mirror of
https://github.com/c0de-archive/spotipy.git
synced 2024-11-14 19:17:27 +00:00
dc996b363f
Fixes error when scope is not set
266 lines
8.6 KiB
Python
266 lines
8.6 KiB
Python
|
|
from __future__ import print_function
|
|
import base64
|
|
import requests
|
|
import os
|
|
import json
|
|
import time
|
|
import sys
|
|
|
|
# Workaround to support both python 2 & 3
|
|
import six
|
|
import six.moves.urllib.parse as urllibparse
|
|
|
|
|
|
class SpotifyOauthError(Exception):
|
|
pass
|
|
|
|
|
|
def _make_authorization_headers(client_id, client_secret):
|
|
auth_header = base64.b64encode(six.text_type(client_id + ':' + client_secret).encode('ascii'))
|
|
return {'Authorization': 'Basic %s' % auth_header.decode('ascii')}
|
|
|
|
|
|
def is_token_expired(token_info):
|
|
now = int(time.time())
|
|
return token_info['expires_at'] - now < 60
|
|
|
|
|
|
class SpotifyClientCredentials(object):
|
|
OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token'
|
|
|
|
def __init__(self, client_id=None, client_secret=None, proxies=None):
|
|
"""
|
|
You can either provid a client_id and client_secret to the
|
|
constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET
|
|
environment variables
|
|
"""
|
|
if not client_id:
|
|
client_id = os.getenv('SPOTIPY_CLIENT_ID')
|
|
|
|
if not client_secret:
|
|
client_secret = os.getenv('SPOTIPY_CLIENT_SECRET')
|
|
|
|
if not client_id:
|
|
raise SpotifyOauthError('No client id')
|
|
|
|
if not client_secret:
|
|
raise SpotifyOauthError('No client secret')
|
|
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.token_info = None
|
|
self.proxies = proxies
|
|
|
|
def get_access_token(self):
|
|
"""
|
|
If a valid access token is in memory, returns it
|
|
Else feches a new token and returns it
|
|
"""
|
|
if self.token_info and not self.is_token_expired(self.token_info):
|
|
return self.token_info['access_token']
|
|
|
|
token_info = self._request_access_token()
|
|
token_info = self._add_custom_values_to_token_info(token_info)
|
|
self.token_info = token_info
|
|
return self.token_info['access_token']
|
|
|
|
def _request_access_token(self):
|
|
"""Gets client credentials access token """
|
|
payload = { 'grant_type': 'client_credentials'}
|
|
|
|
headers = _make_authorization_headers(self.client_id, self.client_secret)
|
|
|
|
response = requests.post(self.OAUTH_TOKEN_URL, data=payload,
|
|
headers=headers, verify=True, proxies=self.proxies)
|
|
if response.status_code is not 200:
|
|
raise SpotifyOauthError(response.reason)
|
|
token_info = response.json()
|
|
return token_info
|
|
|
|
def is_token_expired(self, token_info):
|
|
return is_token_expired(token_info)
|
|
|
|
def _add_custom_values_to_token_info(self, token_info):
|
|
"""
|
|
Store some values that aren't directly provided by a Web API
|
|
response.
|
|
"""
|
|
token_info['expires_at'] = int(time.time()) + token_info['expires_in']
|
|
return token_info
|
|
|
|
|
|
class SpotifyOAuth(object):
|
|
'''
|
|
Implements Authorization Code Flow for Spotify's OAuth implementation.
|
|
'''
|
|
|
|
OAUTH_AUTHORIZE_URL = 'https://accounts.spotify.com/authorize'
|
|
OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token'
|
|
|
|
def __init__(self, client_id, client_secret, redirect_uri,
|
|
state=None, scope=None, cache_path=None, proxies=None):
|
|
'''
|
|
Creates a SpotifyOAuth object
|
|
|
|
Parameters:
|
|
- client_id - the client id of your app
|
|
- client_secret - the client secret of your app
|
|
- redirect_uri - the redirect URI of your app
|
|
- state - security state
|
|
- scope - the desired scope of the request
|
|
- cache_path - path to location to save tokens
|
|
'''
|
|
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.redirect_uri = redirect_uri
|
|
self.state=state
|
|
self.cache_path = cache_path
|
|
self.scope=self._normalize_scope(scope)
|
|
self.proxies = proxies
|
|
|
|
def get_cached_token(self):
|
|
''' Gets a cached auth token
|
|
'''
|
|
token_info = None
|
|
if self.cache_path:
|
|
try:
|
|
f = open(self.cache_path)
|
|
token_info_string = f.read()
|
|
f.close()
|
|
token_info = json.loads(token_info_string)
|
|
|
|
# if scopes don't match, then bail
|
|
if 'scope' not in token_info or not self._is_scope_subset(self.scope, token_info['scope']):
|
|
return None
|
|
|
|
if self.is_token_expired(token_info):
|
|
token_info = self.refresh_access_token(token_info['refresh_token'])
|
|
|
|
except IOError:
|
|
pass
|
|
return token_info
|
|
|
|
def _save_token_info(self, token_info):
|
|
if self.cache_path:
|
|
try:
|
|
f = open(self.cache_path, 'w')
|
|
f.write(json.dumps(token_info))
|
|
f.close()
|
|
except IOError:
|
|
self._warn("couldn't write token cache to " + self.cache_path)
|
|
pass
|
|
|
|
def _is_scope_subset(self, needle_scope, haystack_scope):
|
|
if needle_scope:
|
|
needle_scope = set(needle_scope.split())
|
|
if haystack_scope:
|
|
haystack_scope = set(haystack_scope.split())
|
|
|
|
return needle_scope <= haystack_scope
|
|
|
|
def is_token_expired(self, token_info):
|
|
return is_token_expired(token_info)
|
|
|
|
def get_authorize_url(self, state=None):
|
|
""" Gets the URL to use to authorize this app
|
|
"""
|
|
payload = {'client_id': self.client_id,
|
|
'response_type': 'code',
|
|
'redirect_uri': self.redirect_uri}
|
|
if self.scope:
|
|
payload['scope'] = self.scope
|
|
if state is None:
|
|
state = self.state
|
|
if state is not None:
|
|
payload['state'] = state
|
|
|
|
urlparams = urllibparse.urlencode(payload)
|
|
|
|
return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
|
|
|
|
def parse_response_code(self, url):
|
|
""" Parse the response code in the given response url
|
|
|
|
Parameters:
|
|
- url - the response url
|
|
"""
|
|
|
|
try:
|
|
return url.split("?code=")[1].split("&")[0]
|
|
except IndexError:
|
|
return None
|
|
|
|
def _make_authorization_headers(self):
|
|
return _make_authorization_headers(self.client_id, self.client_secret)
|
|
|
|
def get_access_token(self, code):
|
|
""" Gets the access token for the app given the code
|
|
|
|
Parameters:
|
|
- code - the response code
|
|
"""
|
|
|
|
payload = {'redirect_uri': self.redirect_uri,
|
|
'code': code,
|
|
'grant_type': 'authorization_code'}
|
|
if self.scope:
|
|
payload['scope'] = self.scope
|
|
if self.state:
|
|
payload['state'] = self.state
|
|
|
|
headers = self._make_authorization_headers()
|
|
|
|
response = requests.post(self.OAUTH_TOKEN_URL, data=payload,
|
|
headers=headers, verify=True, proxies=self.proxies)
|
|
if response.status_code is not 200:
|
|
raise SpotifyOauthError(response.reason)
|
|
token_info = response.json()
|
|
token_info = self._add_custom_values_to_token_info(token_info)
|
|
self._save_token_info(token_info)
|
|
return token_info
|
|
|
|
def _normalize_scope(self, scope):
|
|
if scope:
|
|
scopes = scope.split()
|
|
scopes.sort()
|
|
return ' '.join(scopes)
|
|
else:
|
|
return None
|
|
|
|
def refresh_access_token(self, refresh_token):
|
|
payload = { 'refresh_token': refresh_token,
|
|
'grant_type': 'refresh_token'}
|
|
|
|
headers = self._make_authorization_headers()
|
|
|
|
response = requests.post(self.OAUTH_TOKEN_URL, data=payload,
|
|
headers=headers, proxies=self.proxies)
|
|
if response.status_code != 200:
|
|
if False: # debugging code
|
|
print('headers', headers)
|
|
print('request', response.url)
|
|
self._warn("couldn't refresh token: code:%d reason:%s" \
|
|
% (response.status_code, response.reason))
|
|
return None
|
|
token_info = response.json()
|
|
token_info = self._add_custom_values_to_token_info(token_info)
|
|
if not 'refresh_token' in token_info:
|
|
token_info['refresh_token'] = refresh_token
|
|
self._save_token_info(token_info)
|
|
return token_info
|
|
|
|
def _add_custom_values_to_token_info(self, token_info):
|
|
'''
|
|
Store some values that aren't directly provided by a Web API
|
|
response.
|
|
'''
|
|
token_info['expires_at'] = int(time.time()) + token_info['expires_in']
|
|
token_info['scope'] = self.scope
|
|
return token_info
|
|
|
|
def _warn(self, msg):
|
|
print('warning:' + msg, file=sys.stderr)
|
|
|