forked from mike/django
629 lines
19 KiB
Python
629 lines
19 KiB
Python
import string, hashlib, binascii, random, socket, base58, sys
|
|
from typing import Union
|
|
from urllib.parse import urlparse
|
|
from ipaddress import ip_address, ip_network, IPv4Address
|
|
from base58 import b58decode_check, b58encode_check
|
|
from enum import Enum
|
|
|
|
def vendor_generator(size=6, chars=string.ascii_uppercase + string.digits):
|
|
return ''.join(random.choice(chars) for _ in range(size))
|
|
|
|
|
|
def decodeBase58(address):
|
|
decoded = base58.b58decode(address).hex()
|
|
prefixAndHash = decoded[:len(decoded)-8]
|
|
checksum = decoded[len(decoded)-8:]
|
|
hash = prefixAndHash
|
|
for _ in range(1,3):
|
|
hash = hashlib.sha256(binascii.unhexlify(hash)).hexdigest()
|
|
if(checksum == hash[:8]):
|
|
return True
|
|
return False
|
|
|
|
def decodeMonero(address):
|
|
# decode monero address
|
|
if len(address) == 95 or len(address) == 106:
|
|
# Decode the address from Base58
|
|
decoded = decode(address)
|
|
|
|
# Verify address type byte
|
|
if decoded[0:2] not in ["12", "2a", "13"]:
|
|
return False
|
|
|
|
# Verify checksum
|
|
# later
|
|
return True
|
|
return False
|
|
|
|
def checksumCheck(method, address):
|
|
match method.lower():
|
|
case 'btc':
|
|
return decodeBase58(address) if address[0] == '1' or address[0] == '3' else True if address[0:3] == 'bc1' and decode("bc", address)[0] != None else False
|
|
case 'btct':
|
|
return decodeBase58(address) if address[0] == '2' else True if address[0:3] == 'tb1' and decode("tb", address)[0] != None else False
|
|
case 'ltc':
|
|
return decodeBase58(address) if address[0] == '3' or address[0] == 'M' or address[0] == 'L' else True if address[0:4] == 'ltc1' and decode("ltc", address)[0] != None else False
|
|
case 'bch':
|
|
return is_valid(address) if address[0] == '1' else True if is_valid('bitcoincash:'+address) == True else False
|
|
case 'zec':
|
|
return decodeBase58(address) if address[0] == 't' or address[0] == 'z' else False
|
|
case 'xmr':
|
|
#needs new function to check if address is valid, a decoder maybe
|
|
return decodeMonero(address)
|
|
case _:
|
|
return False
|
|
|
|
#########################################################################
|
|
## UrlValidator
|
|
#########################################################################
|
|
|
|
class UrlValidator:
|
|
@staticmethod
|
|
def is_internal_address(ip: Union[IPv4Address]) -> bool:
|
|
return any([
|
|
ip.is_private,
|
|
ip.is_unspecified,
|
|
ip.is_reserved,
|
|
ip.is_loopback,
|
|
ip.is_multicast,
|
|
ip.is_link_local,
|
|
])
|
|
|
|
@classmethod
|
|
def validate(cls, url: str):
|
|
DEFAULT_PORT_WHITELIST = {80, 81, 8080, 443, 8443, 8000}
|
|
DEFAULT_SCHEME_WHITELIST = {'http', 'https'}
|
|
DEFAULT_HOST_BLACKLIST = {'192.0.0.192', '169.254.169.254', '100.100.100.200', 'metadata.packet.net', 'metadata.google.internal'}
|
|
DEFAULT_CHARACTER_WHITELIST = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789:/-_.?&='
|
|
|
|
if url is None:
|
|
return False
|
|
|
|
whitelist_set = set(DEFAULT_CHARACTER_WHITELIST)
|
|
if any(c not in whitelist_set for c in url):
|
|
return False
|
|
|
|
try:
|
|
ip = ip_address(url)
|
|
except ValueError:
|
|
try:
|
|
host = urlparse(url).hostname
|
|
ip = ip_address(str(socket.gethostbyname(host)))
|
|
except:
|
|
return False
|
|
|
|
port_whitelist = DEFAULT_PORT_WHITELIST.copy()
|
|
scheme_whitelist = DEFAULT_SCHEME_WHITELIST.copy()
|
|
host_blacklist = DEFAULT_HOST_BLACKLIST.copy()
|
|
|
|
try:
|
|
port, scheme = urlparse(url).port, urlparse(url).scheme
|
|
except:
|
|
return False
|
|
|
|
if scheme_whitelist and scheme is not None and scheme not in scheme_whitelist:
|
|
return False
|
|
|
|
if host_blacklist and host is not None and host in host_blacklist:
|
|
return False
|
|
|
|
if port_whitelist and port is not None and port not in port_whitelist:
|
|
return False
|
|
|
|
if ip.version == 4:
|
|
if not ip.is_private:
|
|
# CGNAT IPs do not set `is_private` so `not is_global` added
|
|
if not ip_network(ip).is_global:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
if cls.is_internal_address(ip):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
"""Reference implementation for Bech32/Bech32m and segwit addresses."""
|
|
#########################################################################
|
|
## SEGWIT
|
|
#########################################################################
|
|
|
|
class Encoding(Enum):
|
|
"""Enumeration type to list the various supported encodings."""
|
|
BECH32 = 1
|
|
BECH32M = 2
|
|
|
|
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
|
|
BECH32M_CONST = 0x2bc830a3
|
|
|
|
def bech32_polymod(values):
|
|
"""Internal function that computes the Bech32 checksum."""
|
|
generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
|
|
chk = 1
|
|
for value in values:
|
|
top = chk >> 25
|
|
chk = (chk & 0x1ffffff) << 5 ^ value
|
|
for i in range(5):
|
|
chk ^= generator[i] if ((top >> i) & 1) else 0
|
|
return chk
|
|
|
|
|
|
def bech32_hrp_expand(hrp):
|
|
"""Expand the HRP into values for checksum computation."""
|
|
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
|
|
|
|
|
|
def bech32_verify_checksum(hrp, data):
|
|
"""Verify a checksum given HRP and converted data characters."""
|
|
const = bech32_polymod(bech32_hrp_expand(hrp) + data)
|
|
if const == 1:
|
|
return Encoding.BECH32
|
|
if const == BECH32M_CONST:
|
|
return Encoding.BECH32M
|
|
return None
|
|
|
|
def bech32_create_checksum(hrp, data, spec):
|
|
"""Compute the checksum values given HRP and data."""
|
|
values = bech32_hrp_expand(hrp) + data
|
|
const = BECH32M_CONST if spec == Encoding.BECH32M else 1
|
|
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ const
|
|
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
|
|
|
|
|
|
def bech32_encode(hrp, data, spec):
|
|
"""Compute a Bech32 string given HRP and data values."""
|
|
combined = data + bech32_create_checksum(hrp, data, spec)
|
|
return hrp + '1' + ''.join([CHARSET[d] for d in combined])
|
|
|
|
def bech32_decode(bech):
|
|
"""Validate a Bech32/Bech32m string, and determine HRP and data."""
|
|
if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or
|
|
(bech.lower() != bech and bech.upper() != bech)):
|
|
return (None, None, None)
|
|
bech = bech.lower()
|
|
pos = bech.rfind('1')
|
|
if pos < 1 or pos + 7 > len(bech) or len(bech) > 90:
|
|
return (None, None, None)
|
|
if not all(x in CHARSET for x in bech[pos+1:]):
|
|
return (None, None, None)
|
|
hrp = bech[:pos]
|
|
data = [CHARSET.find(x) for x in bech[pos+1:]]
|
|
spec = bech32_verify_checksum(hrp, data)
|
|
if spec is None:
|
|
return (None, None, None)
|
|
return (hrp, data[:-6], spec)
|
|
|
|
def convertbits(data, frombits, tobits, pad=True):
|
|
"""General power-of-2 base conversion."""
|
|
acc = 0
|
|
bits = 0
|
|
ret = []
|
|
maxv = (1 << tobits) - 1
|
|
max_acc = (1 << (frombits + tobits - 1)) - 1
|
|
for value in data:
|
|
if value < 0 or (value >> frombits):
|
|
return None
|
|
acc = ((acc << frombits) | value) & max_acc
|
|
bits += frombits
|
|
while bits >= tobits:
|
|
bits -= tobits
|
|
ret.append((acc >> bits) & maxv)
|
|
if pad:
|
|
if bits:
|
|
ret.append((acc << (tobits - bits)) & maxv)
|
|
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
|
|
return None
|
|
return ret
|
|
|
|
|
|
def decode(hrp, addr):
|
|
"""Decode a segwit address."""
|
|
hrpgot, data, spec = bech32_decode(addr)
|
|
if hrpgot != hrp:
|
|
return (None, None)
|
|
decoded = convertbits(data[1:], 5, 8, False)
|
|
if decoded is None or len(decoded) < 2 or len(decoded) > 40:
|
|
return (None, None)
|
|
if data[0] > 16:
|
|
return (None, None)
|
|
if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32:
|
|
return (None, None)
|
|
if data[0] == 0 and spec != Encoding.BECH32 or data[0] != 0 and spec != Encoding.BECH32M:
|
|
return (None, None)
|
|
return (data[0], decoded)
|
|
|
|
|
|
def encode(hrp, witver, witprog):
|
|
"""Encode a segwit address."""
|
|
spec = Encoding.BECH32 if witver == 0 else Encoding.BECH32M
|
|
ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5), spec)
|
|
if decode(hrp, ret) == (None, None):
|
|
return None
|
|
return ret
|
|
|
|
#########################################################################
|
|
## CRYPTO
|
|
#########################################################################
|
|
CHARSET = 'qpzry9x8gf2tvdw0s3jn54khce6mua7l'
|
|
|
|
def polymod(values):
|
|
chk = 1
|
|
generator = [
|
|
(0x01, 0x98f2bc8e61),
|
|
(0x02, 0x79b76d99e2),
|
|
(0x04, 0xf33e5fb3c4),
|
|
(0x08, 0xae2eabe2a8),
|
|
(0x10, 0x1e4f43e470)]
|
|
for value in values:
|
|
top = chk >> 35
|
|
chk = ((chk & 0x07ffffffff) << 5) ^ value
|
|
for i in generator:
|
|
if top & i[0] != 0:
|
|
chk ^= i[1]
|
|
return chk ^ 1
|
|
|
|
|
|
def prefix_expand(prefix):
|
|
return [ord(x) & 0x1f for x in prefix] + [0]
|
|
|
|
|
|
def calculate_checksum(prefix, payload):
|
|
poly = polymod(prefix_expand(prefix) + payload + [0, 0, 0, 0, 0, 0, 0, 0])
|
|
out = list()
|
|
for i in range(8):
|
|
out.append((poly >> 5 * (7 - i)) & 0x1f)
|
|
return out
|
|
|
|
|
|
def verify_checksum(prefix, payload):
|
|
return polymod(prefix_expand(prefix) + payload) == 0
|
|
|
|
|
|
def b32decode(inputs):
|
|
out = list()
|
|
for letter in inputs:
|
|
out.append(CHARSET.find(letter))
|
|
return out
|
|
|
|
|
|
def b32encode(inputs):
|
|
out = ''
|
|
for char_code in inputs:
|
|
out += CHARSET[char_code]
|
|
return out
|
|
|
|
|
|
def convertbits(data, frombits, tobits, pad=True):
|
|
acc = 0
|
|
bits = 0
|
|
ret = []
|
|
maxv = (1 << tobits) - 1
|
|
max_acc = (1 << (frombits + tobits - 1)) - 1
|
|
for value in data:
|
|
if value < 0 or (value >> frombits):
|
|
return None
|
|
acc = ((acc << frombits) | value) & max_acc
|
|
bits += frombits
|
|
while bits >= tobits:
|
|
bits -= tobits
|
|
ret.append((acc >> bits) & maxv)
|
|
if pad:
|
|
if bits:
|
|
ret.append((acc << (tobits - bits)) & maxv)
|
|
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
|
|
return None
|
|
return ret
|
|
|
|
|
|
#########################################################################
|
|
## BCH CONVERT
|
|
#########################################################################
|
|
class InvalidAddress(Exception):
|
|
pass
|
|
|
|
|
|
class Address:
|
|
VERSION_MAP = {
|
|
'legacy': [
|
|
('P2SH', 5, False),
|
|
('P2PKH', 0, False),
|
|
('P2SH-TESTNET', 196, True),
|
|
('P2PKH-TESTNET', 111, True)
|
|
],
|
|
'cash': [
|
|
('P2SH', 8, False),
|
|
('P2PKH', 0, False),
|
|
('P2SH-TESTNET', 8, True),
|
|
('P2PKH-TESTNET', 0, True)
|
|
]
|
|
}
|
|
MAINNET_PREFIX = 'bitcoincash'
|
|
TESTNET_PREFIX = 'bchtest'
|
|
|
|
def __init__(self, version, payload, prefix=None):
|
|
self.version = version
|
|
self.payload = payload
|
|
if prefix:
|
|
self.prefix = prefix
|
|
else:
|
|
if Address._address_type('cash', self.version)[2]:
|
|
self.prefix = self.TESTNET_PREFIX
|
|
else:
|
|
self.prefix = self.MAINNET_PREFIX
|
|
|
|
def __str__(self):
|
|
return 'version: {}\npayload: {}\nprefix: {}'.format(self.version, self.payload, self.prefix)
|
|
|
|
def legacy_address(self):
|
|
version_int = Address._address_type('legacy', self.version)[1]
|
|
return b58encode_check(Address.code_list_to_string([version_int] + self.payload))
|
|
|
|
def cash_address(self):
|
|
version_int = Address._address_type('cash', self.version)[1]
|
|
payload = [version_int] + self.payload
|
|
payload = convertbits(payload, 8, 5)
|
|
checksum = calculate_checksum(self.prefix, payload)
|
|
return self.prefix + ':' + b32encode(payload + checksum)
|
|
|
|
@staticmethod
|
|
def code_list_to_string(code_list):
|
|
if sys.version_info > (3, 0):
|
|
output = bytes()
|
|
for code in code_list:
|
|
output += bytes([code])
|
|
else:
|
|
output = ''
|
|
for code in code_list:
|
|
output += chr(code)
|
|
return output
|
|
|
|
@staticmethod
|
|
def _address_type(address_type, version):
|
|
for mapping in Address.VERSION_MAP[address_type]:
|
|
if mapping[0] == version or mapping[1] == version:
|
|
return mapping
|
|
raise InvalidAddress('Could not determine address version')
|
|
|
|
@staticmethod
|
|
def from_string(address_string):
|
|
try:
|
|
address_string = str(address_string)
|
|
except Exception:
|
|
raise InvalidAddress('Expected string as input')
|
|
if ':' not in address_string:
|
|
return Address._legacy_string(address_string)
|
|
else:
|
|
return Address._cash_string(address_string)
|
|
|
|
@staticmethod
|
|
def _legacy_string(address_string):
|
|
try:
|
|
decoded = bytearray(b58decode_check(address_string))
|
|
except ValueError:
|
|
raise InvalidAddress('Could not decode legacy address')
|
|
version = Address._address_type('legacy', decoded[0])[0]
|
|
payload = list()
|
|
for letter in decoded[1:]:
|
|
payload.append(letter)
|
|
return Address(version, payload)
|
|
|
|
@staticmethod
|
|
def _cash_string(address_string):
|
|
if address_string.upper() != address_string and address_string.lower() != address_string:
|
|
raise InvalidAddress('Cash address contains uppercase and lowercase characters')
|
|
address_string = address_string.lower()
|
|
colon_count = address_string.count(':')
|
|
if colon_count == 0:
|
|
address_string = Address.MAINNET_PREFIX + ':' + address_string
|
|
elif colon_count > 1:
|
|
raise InvalidAddress('Cash address contains more than one colon character')
|
|
prefix, base32string = address_string.split(':')
|
|
decoded = b32decode(base32string)
|
|
if not verify_checksum(prefix, decoded):
|
|
raise InvalidAddress('Bad cash address checksum')
|
|
converted = convertbits(decoded, 5, 8)
|
|
version = Address._address_type('cash', converted[0])[0]
|
|
if prefix == Address.TESTNET_PREFIX:
|
|
version += '-TESTNET'
|
|
payload = converted[1:-6]
|
|
return Address(version, payload, prefix)
|
|
|
|
|
|
def to_cash_address(address):
|
|
return Address.from_string(address).cash_address()
|
|
|
|
|
|
def to_legacy_address(address):
|
|
return Address.from_string(address).legacy_address()
|
|
|
|
|
|
def is_valid(address):
|
|
try:
|
|
Address.from_string(address)
|
|
return True
|
|
except InvalidAddress:
|
|
return False
|
|
|
|
#########################################################################
|
|
## monero
|
|
#########################################################################
|
|
|
|
|
|
__alphabet = [
|
|
ord(s) for s in "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
|
|
]
|
|
__b58base = 58
|
|
__UINT64MAX = 2 ** 64
|
|
__encodedBlockSizes = [0, 2, 3, 5, 6, 7, 9, 10, 11]
|
|
__fullBlockSize = 8
|
|
__fullEncodedBlockSize = 11
|
|
|
|
|
|
def _hexToBin(hex_):
|
|
if len(hex_) % 2 != 0:
|
|
raise ValueError("Hex string has invalid length: %d" % len(hex_))
|
|
return [int(hex_[i : i + 2], 16) for i in range(0, len(hex_), 2)]
|
|
|
|
|
|
def _binToHex(bin_):
|
|
return "".join("%02x" % int(b) for b in bin_)
|
|
|
|
|
|
def _uint8be_to_64(data):
|
|
if not (1 <= len(data) <= 8):
|
|
raise ValueError("Invalid input length: %d" % len(data))
|
|
|
|
res = 0
|
|
for b in data:
|
|
res = res << 8 | b
|
|
return res
|
|
|
|
|
|
def _uint64_to_8be(num, size):
|
|
if size < 1 or size > 8:
|
|
raise ValueError("Invalid input length: %d" % size)
|
|
res = [0] * size
|
|
|
|
twopow8 = 2 ** 8
|
|
for i in range(size - 1, -1, -1):
|
|
res[i] = num % twopow8
|
|
num = num // twopow8
|
|
|
|
return res
|
|
|
|
|
|
def encode_block(data, buf, index):
|
|
l_data = len(data)
|
|
|
|
if l_data < 1 or l_data > __fullEncodedBlockSize:
|
|
raise ValueError("Invalid block length: %d" % l_data)
|
|
|
|
num = _uint8be_to_64(data)
|
|
i = __encodedBlockSizes[l_data] - 1
|
|
|
|
while num > 0:
|
|
remainder = num % __b58base
|
|
num = num // __b58base
|
|
buf[index + i] = __alphabet[remainder]
|
|
i -= 1
|
|
|
|
return buf
|
|
|
|
|
|
def encode(hex):
|
|
"""Encode hexadecimal string as base58 (ex: encoding a Monero address)."""
|
|
data = _hexToBin(hex)
|
|
l_data = len(data)
|
|
|
|
if l_data == 0:
|
|
return ""
|
|
|
|
full_block_count = l_data // __fullBlockSize
|
|
last_block_size = l_data % __fullBlockSize
|
|
res_size = (
|
|
full_block_count * __fullEncodedBlockSize + __encodedBlockSizes[last_block_size]
|
|
)
|
|
|
|
res = bytearray([__alphabet[0]] * res_size)
|
|
|
|
for i in range(full_block_count):
|
|
res = encode_block(
|
|
data[(i * __fullBlockSize) : (i * __fullBlockSize + __fullBlockSize)],
|
|
res,
|
|
i * __fullEncodedBlockSize,
|
|
)
|
|
|
|
if last_block_size > 0:
|
|
res = encode_block(
|
|
data[
|
|
(full_block_count * __fullBlockSize) : (
|
|
full_block_count * __fullBlockSize + last_block_size
|
|
)
|
|
],
|
|
res,
|
|
full_block_count * __fullEncodedBlockSize,
|
|
)
|
|
|
|
return bytes(res).decode("ascii")
|
|
|
|
|
|
def decode_block(data, buf, index):
|
|
l_data = len(data)
|
|
|
|
if l_data < 1 or l_data > __fullEncodedBlockSize:
|
|
raise ValueError("Invalid block length: %d" % l_data)
|
|
|
|
res_size = __encodedBlockSizes.index(l_data)
|
|
if res_size <= 0:
|
|
raise ValueError("Invalid block size: %d" % res_size)
|
|
|
|
res_num = 0
|
|
order = 1
|
|
for i in range(l_data - 1, -1, -1):
|
|
digit = __alphabet.index(data[i])
|
|
if digit < 0:
|
|
raise ValueError("Invalid symbol: %s" % data[i])
|
|
|
|
product = order * digit + res_num
|
|
if product > __UINT64MAX:
|
|
raise ValueError(
|
|
"Overflow: %d * %d + %d = %d" % (order, digit, res_num, product)
|
|
)
|
|
|
|
res_num = product
|
|
order = order * __b58base
|
|
|
|
if res_size < __fullBlockSize and 2 ** (8 * res_size) <= res_num:
|
|
raise ValueError("Overflow: %d doesn't fit in %d bit(s)" % (res_num, res_size))
|
|
|
|
tmp_buf = _uint64_to_8be(res_num, res_size)
|
|
buf[index : index + len(tmp_buf)] = tmp_buf
|
|
|
|
return buf
|
|
|
|
|
|
def decode(enc):
|
|
"""Decode a base58 string (ex: a Monero address) into hexidecimal form."""
|
|
enc = bytearray(enc, encoding="ascii")
|
|
l_enc = len(enc)
|
|
|
|
if l_enc == 0:
|
|
return ""
|
|
|
|
full_block_count = l_enc // __fullEncodedBlockSize
|
|
last_block_size = l_enc % __fullEncodedBlockSize
|
|
try:
|
|
last_block_decoded_size = __encodedBlockSizes.index(last_block_size)
|
|
except ValueError:
|
|
raise ValueError("Invalid encoded length: %d" % l_enc)
|
|
|
|
data_size = full_block_count * __fullBlockSize + last_block_decoded_size
|
|
|
|
data = bytearray(data_size)
|
|
for i in range(full_block_count):
|
|
data = decode_block(
|
|
enc[
|
|
(i * __fullEncodedBlockSize) : (
|
|
i * __fullEncodedBlockSize + __fullEncodedBlockSize
|
|
)
|
|
],
|
|
data,
|
|
i * __fullBlockSize,
|
|
)
|
|
|
|
if last_block_size > 0:
|
|
data = decode_block(
|
|
enc[
|
|
(full_block_count * __fullEncodedBlockSize) : (
|
|
full_block_count * __fullEncodedBlockSize + last_block_size
|
|
)
|
|
],
|
|
data,
|
|
full_block_count * __fullBlockSize,
|
|
)
|
|
|
|
return _binToHex(data)
|
|
|
|
#########################################################################
|
|
######################################################################### |