# I was folling this tutorial http://gettechtalent.com/blog/tutorial-real-time-frontend-updates-with-react-serverless-and-websockets-on-aws-iot.html
# but I didn't like the idea of a lambda function just for presigned URL if we already have an API that could handle this
# but... there is no Python package that does ONLY this presigned url generation
# so I decided to "translate" the code of npm package 'aws-signature-v4' (https://git.department.se/department/aws-signature-v4.git) to Python
# probably there are bugs as I only have test it with my specific params.
import urllib.parse
import hmac
import hashlib
from datetime import datetime
def to_time(datetime_object):
return datetime_object.strftime('%Y%m%dT%H%M%SZ')
def to_date(datetime_object):
return datetime_object.strftime('%Y%m%d')
def _hmac(key, string, as_hex=False):
encoded_key = key.encode('utf-8') if not isinstance(key, bytes) else key
encoded_string = string.encode('utf-8') if not isinstance(string, bytes) else string
hasher = hmac.new(
encoded_key,
msg=encoded_string,
digestmod=hashlib.sha256
)
if as_hex:
return hasher.hexdigest()
return hasher.digest()
def hash(string):
hasher = hashlib.sha256()
hasher.update(string.encode('utf-8'))
return hasher.hexdigest()
def create_credentials_scope(time, region, service):
return '/'.join([to_date(time), region, service, 'aws4_request'])
def create_canonical_headers(headers):
sorted_keys = list(headers.keys())
sorted_keys.sort()
headers_formated = [
'{}:{}\n'.format(key, headers[key])
for key in sorted_keys
]
return ''.join(headers_formated)
def create_canonical_query_string(params):
encode = lambda value: urllib.parse.quote_plus(str(value).encode('utf-8'))
sorted_keys = list(params.keys())
sorted_keys.sort()
return '&'.join([
'{}={}'.format(encode(key), encode(params[key]))
for key in sorted_keys
])
def create_signed_headers(headers):
sorted_keys = list(headers.keys())
sorted_keys.sort()
return ';'.join(sorted_keys)
def create_canonical_request(method, pathname, query, headers, payload):
return '\n'.join([
method.upper(),
pathname,
create_canonical_query_string(query),
create_canonical_headers(headers),
create_signed_headers(headers),
payload
])
def create_string_to_sign(time, region, service, request):
return '\n'.join([
'AWS4-HMAC-SHA256',
to_time(time),
create_credentials_scope(time, region, service),
hash(request)
])
def create_signature(secret, time, region, service, string_to_sign):
secret_key = 'AWS4' + secret
h1 = _hmac(secret_key.encode('utf-8'), to_date(time))
h2 = _hmac(h1, region)
h3 = _hmac(h2, service)
h4 = _hmac(h3, 'aws4_request')
return _hmac(h4, string_to_sign, as_hex=True)
def create_presigned_url(method, host, path, service, payload, options=None):
default_options = {
'key': None,
'secret': None,
'protocol': 'https',
'headers': {},
'timestamp': datetime.utcnow(),
'region': 'us-east-1',
'expires': 86400
}
import urllib.parse
import hmac
import hashlib
from datetime import datetime
from collections import OrderedDict
def to_datetime_str(datetime_object):
return datetime_object.strftime('%Y%m%dT%H%M%SZ')
def to_date_str(datetime_object):
return datetime_object.strftime('%Y%m%d')
def sort_dict(dictionary):
return OrderedDict(sorted(dictionary.items()))
def hmac_hash(key, string, as_hex=False):
key_as_bytes = key.encode('utf-8') if not isinstance(key, bytes) else key
string_as_bytes = string.encode('utf-8') if not isinstance(string, bytes) else string
hasher = hmac.new(
key_as_bytes,
msg=string_as_bytes,
digestmod=hashlib.sha256
)
if as_hex:
return hasher.hexdigest()
return hasher.digest()
def hash(string):
hasher = hashlib.sha256()
hasher.update(string.encode('utf-8'))
return hasher.hexdigest()
def create_credentials_scope(time, region, service):
return '/'.join([to_date_str(time), region, service, 'aws4_request'])
def create_canonical_headers(headers):
sorted_headers = sort_dict(headers)
canonical_headers = [
'{}:{}\n'.format(name, value)
for name, value in sorted_headers.items()
]
return ''.join(canonical_headers)
def create_canonical_query_string(params):
encode = lambda value: urllib.parse.quote_plus(str(value).encode('utf-8'))
sorted_params = sort_dict(params)
return '&'.join([
'{}={}'.format(encode(name), encode(value))
for name, value in sorted_params.items()
])
def create_signed_headers(headers):
sorted_headers = sort_dict(headers)
return ';'.join(sorted_headers.keys())
def create_canonical_request(method, pathname, query, headers, payload):
return '\n'.join([
method.upper(),
pathname,
create_canonical_query_string(query),
create_canonical_headers(headers),
create_signed_headers(headers),
payload
])
def create_string_to_sign(time, region, service, request):
return '\n'.join([
'AWS4-HMAC-SHA256',
to_datetime_str(time),
create_credentials_scope(time, region, service),
hash(request)
])
def create_signature(secret, time, region, service, string_to_sign):
secret_key = 'AWS4' + secret
h1 = hmac_hash(secret_key.encode('utf-8'), to_date_str(time))
h2 = hmac_hash(h1, region)
h3 = hmac_hash(h2, service)
h4 = hmac_hash(h3, 'aws4_request')
return hmac_hash(h4, string_to_sign, as_hex=True)
def create_presigned_url(method, host, path, service, payload, options=None):
default_options = {
'key': None,
'secret': None,
'protocol': 'https',
'headers': {},
'timestamp': datetime.utcnow(),
'region': 'us-east-1',
'expires': 86400
}
options = options or {}
default_options.update(options)
options = default_options.copy()
options['headers']['host'] = host
query = {}
if 'query' in options:
query = dict(urllib.parse.parse_qsl(query))
query['X-Amz-Algorithm'] = 'AWS4-HMAC-SHA256'
query['X-Amz-Credential'] = '{}/{}'.format(
options['key'],
create_credentials_scope(options['timestamp'], options['region'], service)
)
query['X-Amz-Date'] = to_datetime_str(options['timestamp'])
query['X-Amz-Expires'] = options['expires']
query['X-Amz-SignedHeaders'] = create_signed_headers(options['headers'])
canonical_request = create_canonical_request(method, path, query, options['headers'], payload)
string_to_sign = create_string_to_sign(options['timestamp'], options['region'], service, canonical_request)
signature = create_signature(
options['secret'],
options['timestamp'],
options['region'],
service,
string_to_sign
)
query['X-Amz-Signature'] = signature
return '{protocol}://{host}{path}?{querystring}'.format(
protocol=options['protocol'],
host=host,
path=path,
querystring=urllib.parse.urlencode(query)
)
if __name__ == '__main__':
hasher = hashlib.sha256()
hasher.update(''.encode('utf-8'))
print(create_presigned_url(
'GET',
'YOUR_ENDPOINT.iot.us-west-2.amazonaws.com',
'/mqtt',
'iotdevicegateway',
hasher.hexdigest(),
{
'key': 'YOUR_ACCESS_KEY',
'secret': 'YOUR_SECRET',
'protocol': 'wss',
'region': 'us-west-2'
}
))