CodyKochmann
10/16/2016 - 9:24 PM

python scanner for youtube videos to see which view counts skyrocket

python scanner for youtube videos to see which view counts skyrocket

# -*- coding: utf-8 -*-
# @Author: cody
# @Date:   2016-10-16 14:26:46
# @Last Modified 2016-10-16
# @Last Modified time: 2016-10-16 17:18:47

VERBOSE = True

def timestamp(human_readable=False):
  # Generates a unix timestamp and a human readable timestamp if passed True
  # by: Cody Kochmann
  from calendar import timegm
  from datetime import datetime
  if human_readable:
    return(datetime.now())
  else:
    return(timegm(datetime.now().utctimetuple()))

def get_every_str_between(s, before, after):
  # returns an array of substrings between "before" and "after"
  # by: Cody Kochmann
  unique="~~~~this~is~the~obvious~way~to~do~it~~~~"
  second_unique="~~~~i~think~you'll~like~this~~~~~"
  s=(s.replace(before, unique).replace(after, second_unique).split(unique))
  out=[]
  while len(s)>0:
    tmp=s.pop()
    if second_unique in tmp:
      out.append(tmp.split(second_unique)[0])
  return(out)


import requests
import re
import sqlite3

def get(url):
    assert "://" in url and " " not in url, "invalid link detected '{}'".format(url)
    return requests.get(url).text


"""
Database Stuff
"""

DB_NAME = "yt-views.db"

def execute(sql):
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    c.execute(sql)
    #c.execute('''CREATE TABLE stocks(date text, trans text, symbol text, qty real, price real)''')
    #c.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
    conn.commit()
    conn.close()

# create the tables
# channels
execute("""CREATE TABLE IF NOT EXISTS channels(
    name TEXT NOT NULL,
    code TEXT PRIMARY KEY NOT NULL,
    last_checked INTEGER NOT NULL
)""")
# videos
execute("""CREATE TABLE IF NOT EXISTS videos(
    name TEXT NOT NULL,
    url TEXT PRIMARY KEY NOT NULL,
    last_checked INTEGER NOT NULL,
    channel_code INTEGER NOT NULL,
    FOREIGN KEY(channel_code) REFERENCES channels(code)
)""")
# each view check
execute("""CREATE TABLE IF NOT EXISTS view_counts(
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    ts INTEGER NOT NULL,
    view_count REAL NOT NULL,
    video_url TEXT NOT NULL,
    FOREIGN KEY(video_url) REFERENCES videos(url)
)""")

def select_all_from(table):
    out = []
    conn = sqlite3.connect(DB_NAME)
    c = conn.cursor()
    for row in c.execute('SELECT * FROM {}'.format(table)):
        out.append(row)
    c.close()
    return out

def get_channels():
    return select_all_from("channels")

def get_videos():
    return select_all_from("videos")

def get_view_counts():
    return select_all_from("view_counts")

def update_channel_ts(code):
    execute('UPDATE channels SET last_checked={} WHERE code="{}"'.format(int(timestamp()),code))

def update_video_ts(url):
    execute('UPDATE videos SET last_checked={} WHERE url="{}"'.format(int(timestamp()),url))

"""
===============================================================================
"""

def gen_channel_link(code): return "https://www.youtube.com/channel/{}".format(code)
def gen_videos_link(code): return "{}/videos".format(gen_channel_link(code))
def get_channel_name(code):
    url = gen_channel_link(code)
    for i in get_every_str_between(get(url), '<a ', '</a>'):
        try:
            i=i.encode('utf-8')
            if i.startswith('dir="ltr" href="/channel/{}'.format(code)) and 'title="' in i:
                return get_every_str_between(i, 'title="', '"')[0]
        except UnicodeDecodeError:
            continue
def add_channel(url):
    assert "://" in url and "/channel/" in url, 'not a valid youtube channel url'
    code = re.findall(r'channel\/[A-Za-z0-9]{1,}', url)[0].split("/")[1]
    name = get_channel_name(code)
    if VERBOSE: print "adding channel: {}".format(name)
    sql = 'INSERT OR IGNORE INTO channels(name,code,last_checked) VALUES ("{}","{}",0)'.format(name,code)
    execute(sql)

def add_video(name,href,channel_code):
    if VERBOSE: print "adding video: {}".format(name)
    execute('INSERT OR IGNORE INTO videos(name,url,channel_code,last_checked) VALUES ("{}","{}","{}",0)'.format(
        name,
        href,
        channel_code)
    )

def add_view_check(url,views):
    if VERBOSE: print "video: {} had: {} views".format(url,views)
    execute('INSERT INTO view_counts(ts,view_count,video_url) VALUES ("{}",{},"{}")'.format(
        timestamp(),
        views,
        url)
    )



# <div class="watch-view-count">3,604,903 views</div>
view_regex = r'\"watch-view-count"\>[0-9\,]{1,}'
videos_regex = r'\<a class=\"yt-uix-sessionlink[a-zA-Z\ \-0-9\"\=\&\;\|\_\/\?\>]{1,}'

def get_views(url):
    if url.startswith("/watch?"): url = "https://www.youtube.com{}".format(url)
    return float(re.findall(view_regex, get(url))[0].split('>')[1].replace(',',''))

#print get_views(link)

def scan_for_videos(url):
    for i in get_every_str_between(get(url), '<a class="yt-uix-sessionlink', '</a>'):
        try:
            i = i.encode('utf-8')
            if i.startswith(" yt-uix-tile-link "):
                title = get_every_str_between(i, 'title="', '"')[0].encode("utf-8")
                href = get_every_str_between(i, 'href="', '"')[0].encode("utf-8")
                #print "\n{}\n-----------------------".format(i)
                #views = get_views(href)
                #print "title: {}\nhref:  {}\nviews: {}\n".format(title,href,views)
                yield title, href #, views
        except UnicodeDecodeError:
            continue

def get_new_videos():
    for name,code,last_checked in get_channels():
        for title,href in scan_for_videos(gen_videos_link(code)):
            add_video(title,href,code)
        update_channel_ts(code)

def check_video_viewcounts():
    for name,url,updated,channel_code in get_videos():
        try:
            print 'scanning: {}'.format(name)
            views = get_views(url)
            add_view_check(url, views)
            update_video_ts(url)
        except UnicodeEncodeError:
            continue

def update():
    get_new_videos()
    check_video_viewcounts()

if __name__ == '__main__':
    from sys import argv
    from time import sleep

    if "-a" in argv:
        try:
            add_channel(argv[-1])
        except:
            print "something went wrong adding the channel: {}".format(argv[-1])

    #videos_link = "https://www.youtube.com/channel/UCoDZIZuadPBixSPFR7jAq2A/videos"
    #add_channel(videos_link)

    # loop the updater
    while '-r' in argv:
        update()
        sleep(60*10)