Commit 962be4d8 authored by Janez K's avatar Janez K
Browse files

twitter feed and twitter sentiment analysis

parent 30709c05
......@@ -11,4 +11,3 @@ pyparsing==1.5.6
pydot==1.0.28
wsgiref==0.1.2
feedparser==5.1.2
tweepy==2.0
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
"""
Tweepy Twitter API library
"""
__version__ = '2.0'
__author__ = 'Joshua Roesslein'
__license__ = 'MIT'
from tweepy.models import Status, User, DirectMessage, Friendship, SavedSearch, SearchResult, ModelFactory, Category
from tweepy.error import TweepError
from tweepy.api import API
from tweepy.cache import Cache, MemoryCache, FileCache
from tweepy.auth import BasicAuthHandler, OAuthHandler
from tweepy.streaming import Stream, StreamListener
from tweepy.cursor import Cursor
# Global, unauthenticated instance of API
api = API()
def debug(enable=True, level=1):
import httplib
httplib.HTTPConnection.debuglevel = level
This diff is collapsed.
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
from urllib2 import Request, urlopen
import base64
from tweepy import oauth
from tweepy.error import TweepError
from tweepy.api import API
class AuthHandler(object):
def apply_auth(self, url, method, headers, parameters):
"""Apply authentication headers to request"""
raise NotImplementedError
def get_username(self):
"""Return the username of the authenticated user"""
raise NotImplementedError
class BasicAuthHandler(AuthHandler):
def __init__(self, username, password):
self.username = username
self._b64up = base64.b64encode('%s:%s' % (username, password))
def apply_auth(self, url, method, headers, parameters):
headers['Authorization'] = 'Basic %s' % self._b64up
def get_username(self):
return self.username
class OAuthHandler(AuthHandler):
"""OAuth authentication handler"""
OAUTH_HOST = 'api.twitter.com'
OAUTH_ROOT = '/oauth/'
def __init__(self, consumer_key, consumer_secret, callback=None, secure=False):
self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1()
self.request_token = None
self.access_token = None
self.callback = callback
self.username = None
self.secure = secure
def _get_oauth_url(self, endpoint, secure=False):
if self.secure or secure:
prefix = 'https://'
else:
prefix = 'http://'
return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
def apply_auth(self, url, method, headers, parameters):
request = oauth.OAuthRequest.from_consumer_and_token(
self._consumer, http_url=url, http_method=method,
token=self.access_token, parameters=parameters
)
request.sign_request(self._sigmethod, self._consumer, self.access_token)
headers.update(request.to_header())
def _get_request_token(self):
try:
url = self._get_oauth_url('request_token')
request = oauth.OAuthRequest.from_consumer_and_token(
self._consumer, http_url=url, callback=self.callback
)
request.sign_request(self._sigmethod, self._consumer, None)
resp = urlopen(Request(url, headers=request.to_header()))
return oauth.OAuthToken.from_string(resp.read())
except Exception, e:
raise TweepError(e)
def set_request_token(self, key, secret):
self.request_token = oauth.OAuthToken(key, secret)
def set_access_token(self, key, secret):
self.access_token = oauth.OAuthToken(key, secret)
def get_authorization_url(self, signin_with_twitter=False):
"""Get the authorization URL to redirect the user"""
try:
# get the request token
self.request_token = self._get_request_token()
# build auth request and return as url
if signin_with_twitter:
url = self._get_oauth_url('authenticate')
else:
url = self._get_oauth_url('authorize')
request = oauth.OAuthRequest.from_token_and_callback(
token=self.request_token, http_url=url
)
return request.to_url()
except Exception, e:
raise TweepError(e)
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
# build request
request = oauth.OAuthRequest.from_consumer_and_token(
self._consumer,
token=self.request_token, http_url=url,
verifier=str(verifier)
)
request.sign_request(self._sigmethod, self._consumer, self.request_token)
# send request
resp = urlopen(Request(url, headers=request.to_header()))
self.access_token = oauth.OAuthToken.from_string(resp.read())
return self.access_token
except Exception, e:
raise TweepError(e)
def get_xauth_access_token(self, username, password):
"""
Get an access token from an username and password combination.
In order to get this working you need to create an app at
http://twitter.com/apps, after that send a mail to api@twitter.com
and request activation of xAuth for it.
"""
try:
url = self._get_oauth_url('access_token', secure=True) # must use HTTPS
request = oauth.OAuthRequest.from_consumer_and_token(
oauth_consumer=self._consumer,
http_method='POST', http_url=url,
parameters = {
'x_auth_mode': 'client_auth',
'x_auth_username': username,
'x_auth_password': password
}
)
request.sign_request(self._sigmethod, self._consumer, None)
resp = urlopen(Request(url, data=request.to_postdata()))
self.access_token = oauth.OAuthToken.from_string(resp.read())
return self.access_token
except Exception, e:
raise TweepError(e)
def get_username(self):
if self.username is None:
api = API(self)
user = api.verify_credentials()
if user:
self.username = user.screen_name
else:
raise TweepError("Unable to get username, invalid oauth token!")
return self.username
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
import httplib
import urllib
import time
import re
from tweepy.error import TweepError
from tweepy.utils import convert_to_utf8_str
from tweepy.models import Model
re_path_template = re.compile('{\w+}')
def bind_api(**config):
class APIMethod(object):
path = config['path']
payload_type = config.get('payload_type', None)
payload_list = config.get('payload_list', False)
allowed_param = config.get('allowed_param', [])
method = config.get('method', 'GET')
require_auth = config.get('require_auth', False)
search_api = config.get('search_api', False)
use_cache = config.get('use_cache', True)
def __init__(self, api, args, kargs):
# If authentication is required and no credentials
# are provided, throw an error.
if self.require_auth and not api.auth:
raise TweepError('Authentication required!')
self.api = api
self.post_data = kargs.pop('post_data', None)
self.retry_count = kargs.pop('retry_count', api.retry_count)
self.retry_delay = kargs.pop('retry_delay', api.retry_delay)
self.retry_errors = kargs.pop('retry_errors', api.retry_errors)
self.headers = kargs.pop('headers', {})
self.build_parameters(args, kargs)
# Pick correct URL root to use
if self.search_api:
self.api_root = api.search_root
else:
self.api_root = api.api_root
# Perform any path variable substitution
self.build_path()
if api.secure:
self.scheme = 'https://'
else:
self.scheme = 'http://'
if self.search_api:
self.host = api.search_host
else:
self.host = api.host
# Manually set Host header to fix an issue in python 2.5
# or older where Host is set including the 443 port.
# This causes Twitter to issue 301 redirect.
# See Issue https://github.com/tweepy/tweepy/issues/12
self.headers['Host'] = self.host
def build_parameters(self, args, kargs):
self.parameters = {}
for idx, arg in enumerate(args):
if arg is None:
continue
try:
self.parameters[self.allowed_param[idx]] = convert_to_utf8_str(arg)
except IndexError:
raise TweepError('Too many parameters supplied!')
for k, arg in kargs.items():
if arg is None:
continue
if k in self.parameters:
raise TweepError('Multiple values for parameter %s supplied!' % k)
self.parameters[k] = convert_to_utf8_str(arg)
def build_path(self):
for variable in re_path_template.findall(self.path):
name = variable.strip('{}')
if name == 'user' and 'user' not in self.parameters and self.api.auth:
# No 'user' parameter provided, fetch it from Auth instead.
value = self.api.auth.get_username()
else:
try:
value = urllib.quote(self.parameters[name])
except KeyError:
raise TweepError('No parameter value found for path variable: %s' % name)
del self.parameters[name]
self.path = self.path.replace(variable, value)
def execute(self):
# Build the request URL
url = self.api_root + self.path
if len(self.parameters):
url = '%s?%s' % (url, urllib.urlencode(self.parameters))
# Query the cache if one is available
# and this request uses a GET method.
if self.use_cache and self.api.cache and self.method == 'GET':
cache_result = self.api.cache.get(url)
# if cache result found and not expired, return it
if cache_result:
# must restore api reference
if isinstance(cache_result, list):
for result in cache_result:
if isinstance(result, Model):
result._api = self.api
else:
if isinstance(cache_result, Model):
cache_result._api = self.api
return cache_result
# Continue attempting request until successful
# or maximum number of retries is reached.
retries_performed = 0
while retries_performed < self.retry_count + 1:
# Open connection
# FIXME: add timeout
if self.api.secure:
conn = httplib.HTTPSConnection(self.host)
else:
conn = httplib.HTTPConnection(self.host)
# Apply authentication
if self.api.auth:
self.api.auth.apply_auth(
self.scheme + self.host + url,
self.method, self.headers, self.parameters
)
# Execute request
try:
conn.request(self.method, url, headers=self.headers, body=self.post_data)
resp = conn.getresponse()
except Exception, e:
raise TweepError('Failed to send request: %s' % e)
# Exit request loop if non-retry error code
if self.retry_errors:
if resp.status not in self.retry_errors: break
else:
if resp.status == 200: break
# Sleep before retrying request again
time.sleep(self.retry_delay)
retries_performed += 1
# If an error was returned, throw an exception
self.api.last_response = resp
if resp.status != 200:
try:
error_msg = self.api.parser.parse_error(resp.read())
except Exception:
error_msg = "Twitter error response: status code = %s" % resp.status
raise TweepError(error_msg, resp)
# Parse the response payload
result = self.api.parser.parse(self, resp.read())
conn.close()
# Store result into cache if one is available.
if self.use_cache and self.api.cache and self.method == 'GET' and result:
self.api.cache.store(url, result)
return result
def _call(api, *args, **kargs):
method = APIMethod(api, args, kargs)
return method.execute()
# Set pagination mode
if 'cursor' in APIMethod.allowed_param:
_call.pagination_mode = 'cursor'
elif 'page' in APIMethod.allowed_param:
_call.pagination_mode = 'page'
return _call
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
import time
import datetime
import threading
import os
try:
import cPickle as pickle
except ImportError:
import pickle
try:
import hashlib
except ImportError:
# python 2.4
import md5 as hashlib
try:
import fcntl
except ImportError:
# Probably on a windows system
# TODO: use win32file
pass
class Cache(object):
"""Cache interface"""
def __init__(self, timeout=60):
"""Initialize the cache
timeout: number of seconds to keep a cached entry
"""
self.timeout = timeout
def store(self, key, value):
"""Add new record to cache
key: entry key
value: data of entry
"""
raise NotImplementedError
def get(self, key, timeout=None):
"""Get cached entry if exists and not expired
key: which entry to get
timeout: override timeout with this value [optional]
"""
raise NotImplementedError
def count(self):
"""Get count of entries currently stored in cache"""
raise NotImplementedError
def cleanup(self):
"""Delete any expired entries in cache."""
raise NotImplementedError
def flush(self):
"""Delete all cached entries"""
raise NotImplementedError
class MemoryCache(Cache):
"""In-memory cache"""
def __init__(self, timeout=60):
Cache.__init__(self, timeout)
self._entries = {}
self.lock = threading.Lock()
def __getstate__(self):
# pickle
return {'entries': self._entries, 'timeout': self.timeout}
def __setstate__(self, state):
# unpickle
self.lock = threading.Lock()
self._entries = state['entries']
self.timeout = state['timeout']
def _is_expired(self, entry, timeout):
return timeout > 0 and (time.time() - entry[0]) >= timeout
def store(self, key, value):
self.lock.acquire()
self._entries[key] = (time.time(), value)
self.lock.release()
def get(self, key, timeout=None):
self.lock.acquire()
try:
# check to see if we have this key
entry = self._entries.get(key)
if not entry:
# no hit, return nothing
return None
# use provided timeout in arguments if provided
# otherwise use the one provided during init.
if timeout is None:
timeout = self.timeout
# make sure entry is not expired
if self._is_expired(entry, timeout):
# entry expired, delete and return nothing
del self._entries[key]
return None
# entry found and not expired, return it
return entry[1]
finally:
self.lock.release()
def count(self):
return len(self._entries)
def cleanup(self):
self.lock.acquire()
try:
for k, v in self._entries.items():
if self._is_expired(v, self.timeout):
del self._entries[k]
finally:
self.lock.release()
def flush(self):
self.lock.acquire()
self._entries.clear()
self.lock.release()
class FileCache(Cache):
"""File-based cache"""
# locks used to make cache thread-safe
cache_locks = {}
def __init__(self, cache_dir, timeout=60):
Cache.__init__(self, timeout)
if os.path.exists(cache_dir) is False:
os.mkdir(cache_dir)
self.cache_dir = cache_dir
if cache_dir in FileCache.cache_locks:
self.lock = FileCache.cache_locks[cache_dir]
else:
self.lock = threading.Lock()
FileCache.cache_locks[cache_dir] = self.lock
if os.name == 'posix':
self._lock_file = self._lock_file_posix
self._unlock_file = self._unlock_file_posix
elif os.name == 'nt':
self._lock_file = self._lock_file_win32
self._unlock_file = self._unlock_file_win32
else:
print 'Warning! FileCache locking not supported on this system!'
self._lock_file = self._lock_file_dummy
self._unlock_file = self._unlock_file_dummy
def _get_path(self, key):
md5 = hashlib.md5()
md5.update(key)
return os.path.join(self.cache_dir, md5.hexdigest())
def _lock_file_dummy(self, path, exclusive=True):
return None
def _unlock_file_dummy(self, lock):
return
def _lock_file_posix(self, path, exclusive=True):
lock_path = path + '.lock'
if exclusive is True:
f_lock = open(lock_path, 'w')
fcntl.lockf(f_lock, fcntl.LOCK_EX)
else:
f_lock = open(lock_path, 'r')
fcntl.lockf(f_lock, fcntl.LOCK_SH)
if os.path.exists(lock_path) is False:
f_lock.close()
return None
return f_lock
def _unlock_file_posix(self, lock):
lock.close()
def _lock_file_win32(self, path, exclusive=True):
# TODO: implement
return None
def _unlock_file_win32(self, lock):
# TODO: implement
return
def _delete_file(self, path):
os.remove(path)
if os.path.exists(path + '.lock'):
os.remove(path + '.lock')
def store(self, key, value):
path = self._get_path(key)
self.lock.acquire()
try:
# acquire lock and open file
f_lock = self._lock_file(path)
datafile = open(path, 'wb')
# write data
pickle.dump((time.time(), value), datafile)
# close and unlock file
datafile.close()
self._unlock_file(f_lock)
finally:
self.lock.release()
def get(self, key, timeout=None):