henocdz
12/29/2017 - 11:15 PM

aws_signature_v4.py

# I was folling this tutorial http://gettechtalent.com/blog/tutorial-real-time-frontend-updates-with-react-serverless-and-websockets-on-aws-iot.html
# but I didn't like the idea of a lambda function just for presigned URL if we already have an API that could handle this
# but... there is no Python package that does ONLY this presigned url generation 
# so I decided to "translate" the code of npm package 'aws-signature-v4' (https://git.department.se/department/aws-signature-v4.git) to Python
# probably there are bugs as I only have test it with my specific params.

import urllib.parse
import hmac
import hashlib
from datetime import datetime


def to_time(datetime_object):
    return datetime_object.strftime('%Y%m%dT%H%M%SZ')


def to_date(datetime_object):
    return datetime_object.strftime('%Y%m%d')


def _hmac(key, string, as_hex=False):
    encoded_key = key.encode('utf-8') if not isinstance(key, bytes) else key
    encoded_string = string.encode('utf-8') if not isinstance(string, bytes) else string
    hasher = hmac.new(
        encoded_key,
        msg=encoded_string,
        digestmod=hashlib.sha256
    )
    if as_hex:
        return hasher.hexdigest()
    return hasher.digest()


def hash(string):
    hasher = hashlib.sha256()
    hasher.update(string.encode('utf-8'))
    return hasher.hexdigest()


def create_credentials_scope(time, region, service):
    return '/'.join([to_date(time), region, service, 'aws4_request'])


def create_canonical_headers(headers):
    sorted_keys = list(headers.keys())
    sorted_keys.sort()
    headers_formated = [
        '{}:{}\n'.format(key, headers[key])
        for key in sorted_keys
    ]
    return ''.join(headers_formated)


def create_canonical_query_string(params):
    encode = lambda value: urllib.parse.quote_plus(str(value).encode('utf-8'))
    sorted_keys = list(params.keys())
    sorted_keys.sort()
    return '&'.join([
        '{}={}'.format(encode(key), encode(params[key]))
        for key in sorted_keys
    ])


def create_signed_headers(headers):
    sorted_keys = list(headers.keys())
    sorted_keys.sort()
    return ';'.join(sorted_keys)


def create_canonical_request(method, pathname, query, headers, payload):
    return '\n'.join([
        method.upper(),
        pathname,
        create_canonical_query_string(query),
        create_canonical_headers(headers),
        create_signed_headers(headers),
        payload
    ])


def create_string_to_sign(time, region, service, request):
    return '\n'.join([
        'AWS4-HMAC-SHA256',
        to_time(time),
        create_credentials_scope(time, region, service),
        hash(request)
    ])

def create_signature(secret, time, region, service, string_to_sign):
    secret_key = 'AWS4' + secret
    h1 = _hmac(secret_key.encode('utf-8'), to_date(time))
    h2 = _hmac(h1, region)
    h3 = _hmac(h2, service)
    h4 = _hmac(h3, 'aws4_request')
    return _hmac(h4, string_to_sign, as_hex=True)

def create_presigned_url(method, host, path, service, payload, options=None):
    default_options = {
        'key': None,
        'secret': None,
        'protocol': 'https',
        'headers': {},
        'timestamp': datetime.utcnow(),
        'region': 'us-east-1',
        'expires': 86400
    }
import urllib.parse
import hmac
import hashlib
from datetime import datetime
from collections import OrderedDict


def to_datetime_str(datetime_object):
    return datetime_object.strftime('%Y%m%dT%H%M%SZ')


def to_date_str(datetime_object):
    return datetime_object.strftime('%Y%m%d')


def sort_dict(dictionary):
    return OrderedDict(sorted(dictionary.items()))


def hmac_hash(key, string, as_hex=False):
    key_as_bytes = key.encode('utf-8') if not isinstance(key, bytes) else key
    string_as_bytes = string.encode('utf-8') if not isinstance(string, bytes) else string
    hasher = hmac.new(
        key_as_bytes,
        msg=string_as_bytes,
        digestmod=hashlib.sha256
    )
    if as_hex:
        return hasher.hexdigest()
    return hasher.digest()


def hash(string):
    hasher = hashlib.sha256()
    hasher.update(string.encode('utf-8'))
    return hasher.hexdigest()


def create_credentials_scope(time, region, service):
    return '/'.join([to_date_str(time), region, service, 'aws4_request'])


def create_canonical_headers(headers):
    sorted_headers = sort_dict(headers)
    canonical_headers = [
        '{}:{}\n'.format(name, value)
        for name, value in sorted_headers.items()
    ]
    return ''.join(canonical_headers)


def create_canonical_query_string(params):
    encode = lambda value: urllib.parse.quote_plus(str(value).encode('utf-8'))
    sorted_params = sort_dict(params)
    return '&'.join([
        '{}={}'.format(encode(name), encode(value))
        for name, value in sorted_params.items()
    ])


def create_signed_headers(headers):
    sorted_headers = sort_dict(headers)
    return ';'.join(sorted_headers.keys())


def create_canonical_request(method, pathname, query, headers, payload):
    return '\n'.join([
        method.upper(),
        pathname,
        create_canonical_query_string(query),
        create_canonical_headers(headers),
        create_signed_headers(headers),
        payload
    ])


def create_string_to_sign(time, region, service, request):
    return '\n'.join([
        'AWS4-HMAC-SHA256',
        to_datetime_str(time),
        create_credentials_scope(time, region, service),
        hash(request)
    ])

def create_signature(secret, time, region, service, string_to_sign):
    secret_key = 'AWS4' + secret
    h1 = hmac_hash(secret_key.encode('utf-8'), to_date_str(time))
    h2 = hmac_hash(h1, region)
    h3 = hmac_hash(h2, service)
    h4 = hmac_hash(h3, 'aws4_request')
    return hmac_hash(h4, string_to_sign, as_hex=True)

def create_presigned_url(method, host, path, service, payload, options=None):
    default_options = {
        'key': None,
        'secret': None,
        'protocol': 'https',
        'headers': {},
        'timestamp': datetime.utcnow(),
        'region': 'us-east-1',
        'expires': 86400
    }

    options = options or {}
    default_options.update(options)
    options = default_options.copy()
    options['headers']['host'] = host

    query = {}
    if 'query' in options:
        query = dict(urllib.parse.parse_qsl(query))

    query['X-Amz-Algorithm'] = 'AWS4-HMAC-SHA256'
    query['X-Amz-Credential'] = '{}/{}'.format(
        options['key'],
        create_credentials_scope(options['timestamp'], options['region'], service)
    )

    query['X-Amz-Date'] = to_datetime_str(options['timestamp'])
    query['X-Amz-Expires'] = options['expires']
    query['X-Amz-SignedHeaders'] = create_signed_headers(options['headers'])

    canonical_request = create_canonical_request(method, path, query, options['headers'], payload)
    string_to_sign = create_string_to_sign(options['timestamp'], options['region'], service, canonical_request)

    signature = create_signature(
        options['secret'],
        options['timestamp'],
        options['region'],
        service,
        string_to_sign
    )

    query['X-Amz-Signature'] = signature
    return '{protocol}://{host}{path}?{querystring}'.format(
        protocol=options['protocol'],
        host=host,
        path=path,
        querystring=urllib.parse.urlencode(query)
    )


if __name__ == '__main__':
    hasher = hashlib.sha256()
    hasher.update(''.encode('utf-8'))

    print(create_presigned_url(
        'GET',
        'YOUR_ENDPOINT.iot.us-west-2.amazonaws.com',
        '/mqtt',
        'iotdevicegateway',
        hasher.hexdigest(),
        {
            'key': 'YOUR_ACCESS_KEY',
            'secret': 'YOUR_SECRET',
            'protocol': 'wss',
            'region': 'us-west-2'
        }
    ))