riccardoscalco
11/8/2014 - 1:58 PM

Twitter Influencers

Twitter Influencers

# fork of https://github.com/adamdrake/twitterstreamtemplate

from threading import Thread
from queue import Queue

from twython import TwythonStreamer
from requests.exceptions import ChunkedEncodingError

import dataset, datetime


class TwitterStream(TwythonStreamer):

    def __init__(self, consumer_key, consumer_secret, token, token_secret, tqueue):
        self.tweet_queue = tqueue
        super(TwitterStream, self).__init__(consumer_key, consumer_secret, token, token_secret)

    def on_success(self, data):
        if 'text' in data:
            self.tweet_queue.put(data)

    def on_error(self, status_code, data):
        print(status_code)
        # Want to stop trying to get data because of the error?
        # Uncomment the next line!
        # self.disconnect()


def stream_tweets(tweets_queue):
    # Input your credentials below
    consumer_key = ''
    consumer_secret = ''
    token = ''
    token_secret = ''
    try:
        stream = TwitterStream(consumer_key, consumer_secret, token, token_secret, tweets_queue)
        stream.statuses.filter(track='ebola, mers, antivirals, bigpharma antivirals, bigpharma pandemic flu, bird flu, H1N1, H5N1, H7N9, infectious outbreak, oseltamivir, pandemic, pandemic flu, pandemic vaccine, relenza, tamiflu, zanamivir, vaccine, vaccines, vaccinated, vaccinates, vaccinate, vaccination, vaccinations', language='it,en,fr,de,es')
    except ChunkedEncodingError:
        # Sometimes the API sends back one byte less than expected which results in an exception in the
        # current version of the requests library
        stream_tweets(tweet_queue)


def process_tweets(tweets_queue):
    while True:
        data = tweets_queue.get()

        # Do something with the tweet

        obs = ['retweet_count','favorite_count','lang','text','created_at','id_str']
        tmp = dict([(i, data[i]) for i in obs if i in data])

        # save coordinates
        try:
            if data['coordinates']:
                tmp['coordinates'] = str(data['coordinates']['coordinates'])
        except KeyError:
            pass
        
        # save user id
        try:
            if data['user']:
                tmp['user'] = data['user']['id_str']
                tmp['username'] = data['user']['name']
        except KeyError:
            pass        

        # if RT, save user id and tweet id of the original tweet 
        try:
            if data['retweeted_status']:
                tmp['rtOf'] = data['retweeted_status']['id_str']
                tmp['rtFrom'] = data['retweeted_status']['user']['id_str']
        except KeyError:
            pass               

        # save mentions
        try:
            if data['entities']['user_mentions']:
                tmp['user_mentions'] = str([i['id_str'] for i in data['entities']['user_mentions']])
        except KeyError:
            pass

        # save hashtags
        try:
            if data['entities']['hashtags']:
                tmp['hashtags'] = str([i['text'] for i in data['entities']['hashtags']])
        except KeyError:
            pass

        # save urls
        try:
            if data['entities']['urls']:
                tmp['urls'] = str([i['expanded_url'] for i in data['entities']['urls']])
        except KeyError:
            pass 

        #print tmp
        
        table.insert(tmp)

        # Indicate that the above enqueued task is complete.
        tweets_queue.task_done()

if __name__ == '__main__':
    
    now = datetime.datetime.today().ctime().replace(' ','-')
    db = dataset.connect('sqlite:///tweets-'+now+'.db')
    table = db['tweet']

    tweet_queue = Queue()
    Thread(target=stream_tweets, args=(tweet_queue,), daemon=True).start()

    process_tweets(tweet_queue)
# -*- coding: utf-8 -*-

import datetime
import sqlite3

import dataset
import pykov
import networkx as nx

def getGraph(path_dbfile):
  """
  Example:
  path_dbfile = '/path/to/tweets-Fri-Sep--5-12:47:34-2014.db'
  """
  tweetsSet = set()
  G = nx.DiGraph()
  Q = {}
  # read the database
  db = dataset.connect('sqlite:///' + path_dbfile)
  tweets = db[u'tweet']
  # create the objects
  for tweet in tweets:
    # avoid clones
    if tweet['id_str'] not in tweetsSet:
      tweetsSet.add(tweet['id_str'])
      # if the tweet is a retweet add a link (user, original user)
      if tweet["rtFrom"]:
        G.add_edge(tweet['user'],tweet["rtFrom"])
        Q[(tweet['user'],tweet["rtFrom"])] = Q.get((tweet['user'],tweet["rtFrom"]), 0.) + 1
      # if the tweet contains mentions add a link (user, mentioned user)
      # only in the case user and mentioned user are not equal
      if tweet['user_mentions']:
        for user in decodeMentions(tweet['user_mentions']):
          #if userA RT userB, twitter API return that userA mentions userB. Weird.
          if (user != tweet["rtFrom"]) and (user != tweet['user']):
            G.add_edge(tweet['user'],user)
            Q[(tweet['user'],user)] = Q.get((tweet['user'],user), 0.) + 1 
  return G, Q

def decodeMentions(data):
  """
  """
  return [d.split("'")[1] for d in data.split(",")]

def getTransitionMatrix(G,Q):
  """
  """
  T = pykov.Chain()
  for edge in G.edges():
    T[edge] = Q[edge]
  T.stochastic()
  return T

Twitter users visibility by means of regular Markov chains.

This is an attempt to define user visibility on a specific topic. Briefly, tweets are collected via the Twitter streaming API, stored in sqlite databases and then processed in order to create a regular Markov chain. The steady state distribution of the chain defines a metric on the set of Twitter users, which can be used to retrieve an ordered list of users.

Have a look at this paper and this other paper for further details about the mathematical methods.

Be careful, the procedure described here is experimental and it is not meant to be used in production environments.

Fetching and storing from the Twitter streaming

The file tweet_stream.py has to be filled with your Twitter credentials, furthermore change at will keywords and languages. Then start the script:

$ nohup python tweet_stream.py #Python3

The script will save a sqlite database containing informations about the tweets: tweet id, language, text, date of creation, user id, username, mentions/hashtags/urls contained and, in case of retweets, the id and user id of the original tweet. The script keeps running, it is up to you to stop it.

Define a ordered list of twitter users

Dependences: before to continue, please install networkx, dataset and pykov.

Open a Python shell. The following command create a directed graph G from the collected tweets:

>>> import analysis
>>> G, Q = analysis.getGraph('tweets-Thu-Oct--2-06:02:23-2014.db')

G is a netwokx DiGraph object, nodes are Twitter user_ids and edges are links among them (expressed as tuples).

>>> type(G)
<class 'networkx.classes.digraph.DiGraph'>
>>> G.edges()[:3]
[('231907053', '1117325342'), ('2567615524', '928342410'), ('2567615524', '18396319')]
>>> G.nodes()[:3]
['2456481724', '231907053', '2567615524']

Q is a python dictionary, where keys are links among Twitter users and values are the number of times that links have been observed:

>>> Q[('525630064', '282695161')]
3.0

The directed graph results to be disconnected, as expected. The lack of connectivity is a trivial consequence of the fact that each link is created independently from the others, and therefore there is no way to expect the existence of a connecting path between any couple of nodes.

The largest strongly connected component can be found with the following code:

>>> import networkx as nx
>>> scc = list(nx.strongly_connected_component_subgraphs(G))
>>> scc = sorted(scc,key = lambda graph: len(graph.nodes()))
>>> F = scc[-1]

The regular Markov chain is created on top of the largest component:

>>> T = analysis.getTransitionMatrix(F,Q)
>>> type(T)
<class 'pykov.Chain'>

The above chain is ergodic and its steady state can be easily derived. Users with the highest stationary probability are the most visible:

>>> s = T.steady().sort(True)
>>> s[:5]
[(u'14499829', 0.17468580328789371), (u'5402612', 0.037378364822550497), (u'427900496', 0.033809038083563836), (u'2195671183', 0.026123344415636889), (u'17899109', 0.022593710080353133)]

Written with StackEdit.