python scanner for youtube videos to see which view counts skyrocket
# -*- coding: utf-8 -*-
# @Author: cody
# @Date: 2016-10-16 14:26:46
# @Last Modified 2016-10-16
# @Last Modified time: 2016-10-16 17:18:47
VERBOSE = True
def timestamp(human_readable=False):
# Generates a unix timestamp and a human readable timestamp if passed True
# by: Cody Kochmann
from calendar import timegm
from datetime import datetime
if human_readable:
return(datetime.now())
else:
return(timegm(datetime.now().utctimetuple()))
def get_every_str_between(s, before, after):
# returns an array of substrings between "before" and "after"
# by: Cody Kochmann
unique="~~~~this~is~the~obvious~way~to~do~it~~~~"
second_unique="~~~~i~think~you'll~like~this~~~~~"
s=(s.replace(before, unique).replace(after, second_unique).split(unique))
out=[]
while len(s)>0:
tmp=s.pop()
if second_unique in tmp:
out.append(tmp.split(second_unique)[0])
return(out)
import requests
import re
import sqlite3
def get(url):
assert "://" in url and " " not in url, "invalid link detected '{}'".format(url)
return requests.get(url).text
"""
Database Stuff
"""
DB_NAME = "yt-views.db"
def execute(sql):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute(sql)
#c.execute('''CREATE TABLE stocks(date text, trans text, symbol text, qty real, price real)''')
#c.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
conn.commit()
conn.close()
# create the tables
# channels
execute("""CREATE TABLE IF NOT EXISTS channels(
name TEXT NOT NULL,
code TEXT PRIMARY KEY NOT NULL,
last_checked INTEGER NOT NULL
)""")
# videos
execute("""CREATE TABLE IF NOT EXISTS videos(
name TEXT NOT NULL,
url TEXT PRIMARY KEY NOT NULL,
last_checked INTEGER NOT NULL,
channel_code INTEGER NOT NULL,
FOREIGN KEY(channel_code) REFERENCES channels(code)
)""")
# each view check
execute("""CREATE TABLE IF NOT EXISTS view_counts(
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
view_count REAL NOT NULL,
video_url TEXT NOT NULL,
FOREIGN KEY(video_url) REFERENCES videos(url)
)""")
def select_all_from(table):
out = []
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
for row in c.execute('SELECT * FROM {}'.format(table)):
out.append(row)
c.close()
return out
def get_channels():
return select_all_from("channels")
def get_videos():
return select_all_from("videos")
def get_view_counts():
return select_all_from("view_counts")
def update_channel_ts(code):
execute('UPDATE channels SET last_checked={} WHERE code="{}"'.format(int(timestamp()),code))
def update_video_ts(url):
execute('UPDATE videos SET last_checked={} WHERE url="{}"'.format(int(timestamp()),url))
"""
===============================================================================
"""
def gen_channel_link(code): return "https://www.youtube.com/channel/{}".format(code)
def gen_videos_link(code): return "{}/videos".format(gen_channel_link(code))
def get_channel_name(code):
url = gen_channel_link(code)
for i in get_every_str_between(get(url), '<a ', '</a>'):
try:
i=i.encode('utf-8')
if i.startswith('dir="ltr" href="/channel/{}'.format(code)) and 'title="' in i:
return get_every_str_between(i, 'title="', '"')[0]
except UnicodeDecodeError:
continue
def add_channel(url):
assert "://" in url and "/channel/" in url, 'not a valid youtube channel url'
code = re.findall(r'channel\/[A-Za-z0-9]{1,}', url)[0].split("/")[1]
name = get_channel_name(code)
if VERBOSE: print "adding channel: {}".format(name)
sql = 'INSERT OR IGNORE INTO channels(name,code,last_checked) VALUES ("{}","{}",0)'.format(name,code)
execute(sql)
def add_video(name,href,channel_code):
if VERBOSE: print "adding video: {}".format(name)
execute('INSERT OR IGNORE INTO videos(name,url,channel_code,last_checked) VALUES ("{}","{}","{}",0)'.format(
name,
href,
channel_code)
)
def add_view_check(url,views):
if VERBOSE: print "video: {} had: {} views".format(url,views)
execute('INSERT INTO view_counts(ts,view_count,video_url) VALUES ("{}",{},"{}")'.format(
timestamp(),
views,
url)
)
# <div class="watch-view-count">3,604,903 views</div>
view_regex = r'\"watch-view-count"\>[0-9\,]{1,}'
videos_regex = r'\<a class=\"yt-uix-sessionlink[a-zA-Z\ \-0-9\"\=\&\;\|\_\/\?\>]{1,}'
def get_views(url):
if url.startswith("/watch?"): url = "https://www.youtube.com{}".format(url)
return float(re.findall(view_regex, get(url))[0].split('>')[1].replace(',',''))
#print get_views(link)
def scan_for_videos(url):
for i in get_every_str_between(get(url), '<a class="yt-uix-sessionlink', '</a>'):
try:
i = i.encode('utf-8')
if i.startswith(" yt-uix-tile-link "):
title = get_every_str_between(i, 'title="', '"')[0].encode("utf-8")
href = get_every_str_between(i, 'href="', '"')[0].encode("utf-8")
#print "\n{}\n-----------------------".format(i)
#views = get_views(href)
#print "title: {}\nhref: {}\nviews: {}\n".format(title,href,views)
yield title, href #, views
except UnicodeDecodeError:
continue
def get_new_videos():
for name,code,last_checked in get_channels():
for title,href in scan_for_videos(gen_videos_link(code)):
add_video(title,href,code)
update_channel_ts(code)
def check_video_viewcounts():
for name,url,updated,channel_code in get_videos():
try:
print 'scanning: {}'.format(name)
views = get_views(url)
add_view_check(url, views)
update_video_ts(url)
except UnicodeEncodeError:
continue
def update():
get_new_videos()
check_video_viewcounts()
if __name__ == '__main__':
from sys import argv
from time import sleep
if "-a" in argv:
try:
add_channel(argv[-1])
except:
print "something went wrong adding the channel: {}".format(argv[-1])
#videos_link = "https://www.youtube.com/channel/UCoDZIZuadPBixSPFR7jAq2A/videos"
#add_channel(videos_link)
# loop the updater
while '-r' in argv:
update()
sleep(60*10)