Source code for twilio.request_validator
import base64
import hmac
from hashlib import sha1, sha256
from urllib.parse import urlparse, parse_qs
[docs]def compare(string1, string2):
"""Compare two strings while protecting against timing attacks
:param str string1: the first string
:param str string2: the second string
:returns: True if the strings are equal, False if not
:rtype: :obj:`bool`
"""
if len(string1) != len(string2):
return False
result = True
for c1, c2 in zip(string1, string2):
result &= c1 == c2
return result
[docs]def remove_port(uri):
"""Remove the port number from a URI
:param uri: parsed URI that Twilio requested on your server
:returns: full URI without a port number
:rtype: str
"""
if not uri.port:
return uri.geturl()
new_netloc = uri.netloc.split(':')[0]
new_uri = uri._replace(netloc=new_netloc)
return new_uri.geturl()
[docs]def add_port(uri):
"""Add the port number to a URI
:param uri: parsed URI that Twilio requested on your server
:returns: full URI with a port number
:rtype: str
"""
if uri.port:
return uri.geturl()
port = 443 if uri.scheme == "https" else 80
new_netloc = uri.netloc + ":" + str(port)
new_uri = uri._replace(netloc=new_netloc)
return new_uri.geturl()
[docs]class RequestValidator(object):
def __init__(self, token):
self.token = token.encode("utf-8")
[docs] def compute_signature(self, uri, params):
"""Compute the signature for a given request
:param uri: full URI that Twilio requested on your server
:param params: post vars that Twilio sent with the request
:returns: The computed signature
"""
s = uri
if params:
for param_name in sorted(set(params)):
values = self.get_values(params, param_name)
for value in sorted(values):
s += param_name + value
# compute signature and compare signatures
mac = hmac.new(self.token, s.encode("utf-8"), sha1)
computed = base64.b64encode(mac.digest())
computed = computed.decode('utf-8')
return computed.strip()
[docs] def get_values(self, param_dict, param_name):
try:
# Support MultiDict used by Flask.
return param_dict.getall(param_name)
except AttributeError:
try:
# Support QueryDict used by Django.
return param_dict.getlist(param_name)
except AttributeError:
# Fallback to a standard dict.
return [param_dict[param_name]]
[docs] def compute_hash(self, body):
computed = sha256(body.encode("utf-8")).hexdigest()
return computed.strip()
[docs] def validate(self, uri, params, signature):
"""Validate a request from Twilio
:param uri: full URI that Twilio requested on your server
:param params: dictionary of POST variables or string of POST body for JSON requests
:param signature: expected signature in HTTP X-Twilio-Signature header
:returns: True if the request passes validation, False if not
"""
if params is None:
params = {}
parsed_uri = urlparse(uri)
uri_with_port = add_port(parsed_uri)
uri_without_port = remove_port(parsed_uri)
valid_body_hash = True # May not receive body hash, so default succeed
query = parse_qs(parsed_uri.query)
if "bodySHA256" in query and isinstance(params, str):
valid_body_hash = compare(self.compute_hash(params), query["bodySHA256"][0])
params = {}
# check signature of uri with and without port,
# since sig generation on back end is inconsistent
valid_signature = compare(self.compute_signature(uri_without_port, params), signature)
valid_signature_with_port = compare(self.compute_signature(uri_with_port, params), signature)
return valid_body_hash and (valid_signature or valid_signature_with_port)