Alex-Just
1/6/2017 - 10:23 PM

Download target URL, append all 6 letter words with a ™ symbol, and display the outputs in the opened user's browser

Download target URL, append all 6 letter words with a ™ symbol, and display the outputs in the opened user's browser

#!/usr/bin/env python3
# Author: Alex Just <AlexJustEmail@gmail.com>

"""Download target URL, append all 6 letter words with a ™ symbol, and display the outputs in the opened user's browser

Project specification:
    https://docs.google.com/document/d/1bua6MXG9rHyreSPVPEZBkk14WYHy31ia4Vb2rnrgvS0/edit

Installation:
    $ pip3 install bs4 vcrpy

Example usage:
    $ python3 -m unittest -v habr_proxy.py
    or
    $ python3 habr_proxy.py -h
    or
    $ python3 habr_proxy.py
    or
    $ python3 habr_proxy.py -target https://habrahabr.ru -host 127.0.0.1 -port 8000
"""

import argparse
import re
import threading
import unittest
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib import parse
from urllib import request
from urllib.error import HTTPError

from bs4 import BeautifulSoup

# Default values for input parameters
ARGS = {
    'target': 'https://habrahabr.ru',
    'host': '127.0.0.1',
    'port': 8000,
}


def download(url):
    try:
        resp = request.urlopen(url)
    except HTTPError as ex:
        resp = ex

    return (
        resp.read(),  # body
        resp.info().get_content_type(),  # content type
        resp.code,  # status code
    )


def get_valid_filename(s):
    s = str(s).strip().replace(' ', '_')
    return re.sub(r'[^-\w.]', '', s)


def get_text_from_html_via_beautifulsoup(html):
    doc = BeautifulSoup(html, 'html.parser')

    # Delete all script and style elements
    for e in doc(['script', 'style']):
        e.extract()

    return doc.get_text()


def find_and_replace_special_words(html_bytes):
    html = html_bytes.decode('utf8')
    text = get_text_from_html_via_beautifulsoup(html)

    words = {w for w in set(re.findall(r'\b[\w]{6}\b', text)) if w}
    for word in words:
        # Ignore words that start with `&`
        html = re.sub(r'([^&])\b({})\b'.format(word), r'\1\2™', html)

    return bytes(html, 'utf8')


class RequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        content_type, response, status_code = process_url(self.path)

        self.send_response(status_code)
        self.send_header('Content-type', content_type)
        self.end_headers()
        self.wfile.write(response)


def replace_links_to_the_proxy(html_bytes):
    html = html_bytes.decode('utf8')

    host, port = ARGS['host'], int(ARGS['port'])
    local_url = 'http://{0}:{1}'.format(host, port)

    html = html.replace(ARGS['target'], local_url)

    return bytes(html, 'utf8')


def process_url(path):
    url = ARGS['target'] if path == '/' else parse.urljoin(ARGS['target'], path)

    # Download target content
    response, content_type, status_code = download(url)

    # Ignore other resources except for the target page request
    if content_type == 'text/html':
        response = find_and_replace_special_words(response)
        response = replace_links_to_the_proxy(response)

    return content_type, response, status_code


def start_server():
    host, port = ARGS['host'], int(ARGS['port'])
    url = 'http://{0}:{1}'.format(host, port)

    threading.Timer(1.25, lambda: webbrowser.open(url)).start()
    print('Serving on {}...'.format(url))
    HTTPServer((host, port), RequestHandler).serve_forever()


def process_input_args():
    global ARGS

    p = argparse.ArgumentParser()
    p.add_argument("-target", help="set target host (default={})".format(ARGS['target']), default=ARGS['target'])
    p.add_argument("-host", help="set local server's host (default={})".format(ARGS['host']), default=ARGS['host'])
    p.add_argument("-port", help="set local server's port (default={})".format(ARGS['port']), default=ARGS['port'])

    ARGS = vars(p.parse_args())


class SmokeTest(unittest.TestCase):
    def test_default_setup(self):
        content_type, response, status_code = process_url('/company/yandex/blog/258673/')
        special_words = re.findall(r'\b[\w]{6}\b™', response.decode('utf8'), re.UNICODE)
        self.assertEqual(len(special_words), 834)


if __name__ == '__main__':
    process_input_args()
    start_server()