django/base/functions.py

696 lines
22 KiB
Python
Executable File

import string, hashlib, binascii, random, socket, base58, sys, requests, json
from requests.auth import HTTPDigestAuth
from typing import Union
from urllib.parse import urlparse
from ipaddress import ip_address, ip_network, IPv4Address
from base58 import b58decode_check, b58encode_check
from enum import Enum
class RPCHost(object):
def __init__(self, url):
self._session = requests.Session()
self._url = url
self._headers = {'content-type': 'application/json'}
def call(self, rpcMethod, *params):
payload = json.dumps({"method": rpcMethod, "params": list(params), "jsonrpc": "2.0"})
tries = 3
hadConnectionFailures = False
while True:
try:
response = self._session.post(self._url, headers=self._headers, data=payload, timeout=15)
except requests.exceptions.ConnectionError:
tries -= 1
if tries == 0:
raise Exception('Failed to connect for remote procedure call.')
hadFailedConnections = True
print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries))
#time.sleep(2)
else:
if hadConnectionFailures:
print('Connected for remote procedure call after retry.')
break
if not response.status_code in (200, 500):
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
responseJSON = response.json()
if 'error' in responseJSON and responseJSON['error'] != None:
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
return responseJSON['result']
class RPCXMR(object):
def __init__(self, url, user, password):
self._session = requests.Session()
self._url = url
self._user = user
self._pass = password
self._headers = {}
def call(self, rpcMethod, params):
payload = json.dumps({"method": rpcMethod, "params": params, "jsonrpc": "2.0"})
tries = 3
hadConnectionFailures = False
while True:
try:
response = self._session.post(self._url, headers=self._headers, data=payload, auth=HTTPDigestAuth(self._user, self._pass), timeout=15)
except requests.exceptions.ConnectionError:
tries -= 1
if tries == 0:
raise Exception('Failed to connect for remote procedure call.')
hadFailedConnections = True
print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries))
#time.sleep(2)
else:
if hadConnectionFailures:
print('Connected for remote procedure call after retry.')
break
if not response.status_code in (200, 500):
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
responseJSON = response.json()
if 'error' in responseJSON and responseJSON['error'] != None:
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
return responseJSON['result']
def vendor_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
def decodeBase58(address):
decoded = base58.b58decode(address).hex()
prefixAndHash = decoded[:len(decoded)-8]
checksum = decoded[len(decoded)-8:]
hash = prefixAndHash
for _ in range(1,3):
hash = hashlib.sha256(binascii.unhexlify(hash)).hexdigest()
if(checksum == hash[:8]):
return True
return False
def decodeMonero(address):
# decode monero address
if len(address) == 95 or len(address) == 106:
# Decode the address from Base58
decoded = decode(address)
# Verify address type byte
if decoded[0:2] not in ["12", "2a", "13"]:
return False
# Verify checksum
# later
return True
return False
def checksumCheck(method, address):
match method.lower():
case 'btc':
return decodeBase58(address) if address[0] == '1' or address[0] == '3' else True if address[0:3] == 'bc1' and bdecode("bc", address)[0] != None else False
case 'btct':
return decodeBase58(address) if address[0] == '2' else True if address[0:3] == 'tb1' and bdecode("tb", address)[0] != None else False
case 'ltc':
return decodeBase58(address) if address[0] == '3' or address[0] == 'M' or address[0] == 'L' else True if address[0:4] == 'ltc1' and bdecode("ltc", address)[0] != None else False
case 'bch':
return is_valid(address) if address[0] == '1' else True if is_valid('bitcoincash:'+address) == True else False
case 'zec':
return decodeBase58(address) if address[0] == 't' or address[0] == 'z' else False
case 'doge':
return decodeBase58(address)
case 'xmr':
#needs new function to check if address is valid, a decoder maybe
return decodeMonero(address)
case _:
return False
#########################################################################
## UrlValidator
#########################################################################
class UrlValidator:
@staticmethod
def is_internal_address(ip: Union[IPv4Address]) -> bool:
return any([
ip.is_private,
ip.is_unspecified,
ip.is_reserved,
ip.is_loopback,
ip.is_multicast,
ip.is_link_local,
])
@classmethod
def validate(cls, url: str):
DEFAULT_PORT_WHITELIST = {80, 81, 8080, 443, 8443, 8000}
DEFAULT_SCHEME_WHITELIST = {'http', 'https'}
DEFAULT_HOST_BLACKLIST = {'192.0.0.192', '169.254.169.254', '100.100.100.200', 'metadata.packet.net', 'metadata.google.internal'}
DEFAULT_CHARACTER_WHITELIST = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789:/-_.?&='
if url is None:
return False
whitelist_set = set(DEFAULT_CHARACTER_WHITELIST)
if any(c not in whitelist_set for c in url):
return False
try:
ip = ip_address(url)
except ValueError:
try:
host = urlparse(url).hostname
ip = ip_address(str(socket.gethostbyname(host)))
except:
return False
port_whitelist = DEFAULT_PORT_WHITELIST.copy()
scheme_whitelist = DEFAULT_SCHEME_WHITELIST.copy()
host_blacklist = DEFAULT_HOST_BLACKLIST.copy()
try:
port, scheme = urlparse(url).port, urlparse(url).scheme
except:
return False
if scheme_whitelist and scheme is not None and scheme not in scheme_whitelist:
return False
if host_blacklist and host is not None and host in host_blacklist:
return False
if port_whitelist and port is not None and port not in port_whitelist:
return False
if ip.version == 4:
if not ip.is_private:
# CGNAT IPs do not set `is_private` so `not is_global` added
if not ip_network(ip).is_global:
return False
else:
return False
if cls.is_internal_address(ip):
return False
return True
"""Reference implementation for Bech32/Bech32m and segwit addresses."""
#########################################################################
## SEGWIT
#########################################################################
class Encoding(Enum):
"""Enumeration type to list the various supported encodings."""
BECH32 = 1
BECH32M = 2
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
BECH32M_CONST = 0x2bc830a3
def bech32_polymod(values):
"""Internal function that computes the Bech32 checksum."""
generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
chk = 1
for value in values:
top = chk >> 25
chk = (chk & 0x1ffffff) << 5 ^ value
for i in range(5):
chk ^= generator[i] if ((top >> i) & 1) else 0
return chk
def bech32_hrp_expand(hrp):
"""Expand the HRP into values for checksum computation."""
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def bech32_verify_checksum(hrp, data):
"""Verify a checksum given HRP and converted data characters."""
const = bech32_polymod(bech32_hrp_expand(hrp) + data)
if const == 1:
return Encoding.BECH32
if const == BECH32M_CONST:
return Encoding.BECH32M
return None
def bech32_create_checksum(hrp, data, spec):
"""Compute the checksum values given HRP and data."""
values = bech32_hrp_expand(hrp) + data
const = BECH32M_CONST if spec == Encoding.BECH32M else 1
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ const
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def bech32_encode(hrp, data, spec):
"""Compute a Bech32 string given HRP and data values."""
combined = data + bech32_create_checksum(hrp, data, spec)
return hrp + '1' + ''.join([CHARSET[d] for d in combined])
def bech32_decode(bech):
"""Validate a Bech32/Bech32m string, and determine HRP and data."""
if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or
(bech.lower() != bech and bech.upper() != bech)):
return (None, None, None)
bech = bech.lower()
pos = bech.rfind('1')
if pos < 1 or pos + 7 > len(bech) or len(bech) > 90:
return (None, None, None)
if not all(x in CHARSET for x in bech[pos+1:]):
return (None, None, None)
hrp = bech[:pos]
data = [CHARSET.find(x) for x in bech[pos+1:]]
spec = bech32_verify_checksum(hrp, data)
if spec is None:
return (None, None, None)
return (hrp, data[:-6], spec)
def convertbits(data, frombits, tobits, pad=True):
"""General power-of-2 base conversion."""
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
max_acc = (1 << (frombits + tobits - 1)) - 1
for value in data:
if value < 0 or (value >> frombits):
return None
acc = ((acc << frombits) | value) & max_acc
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
def bdecode(hrp, addr):
"""Decode a segwit address."""
hrpgot, data, spec = bech32_decode(addr)
if hrpgot != hrp:
return (None, None)
decoded = convertbits(data[1:], 5, 8, False)
if decoded is None or len(decoded) < 2 or len(decoded) > 40:
return (None, None)
if data[0] > 16:
return (None, None)
if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32:
return (None, None)
if data[0] == 0 and spec != Encoding.BECH32 or data[0] != 0 and spec != Encoding.BECH32M:
return (None, None)
return (data[0], decoded)
def bencode(hrp, witver, witprog):
"""Encode a segwit address."""
spec = Encoding.BECH32 if witver == 0 else Encoding.BECH32M
ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5), spec)
if bdecode(hrp, ret) == (None, None):
return None
return ret
#########################################################################
## CRYPTO
#########################################################################
CHARSET = 'qpzry9x8gf2tvdw0s3jn54khce6mua7l'
def polymod(values):
chk = 1
generator = [
(0x01, 0x98f2bc8e61),
(0x02, 0x79b76d99e2),
(0x04, 0xf33e5fb3c4),
(0x08, 0xae2eabe2a8),
(0x10, 0x1e4f43e470)]
for value in values:
top = chk >> 35
chk = ((chk & 0x07ffffffff) << 5) ^ value
for i in generator:
if top & i[0] != 0:
chk ^= i[1]
return chk ^ 1
def prefix_expand(prefix):
return [ord(x) & 0x1f for x in prefix] + [0]
def calculate_checksum(prefix, payload):
poly = polymod(prefix_expand(prefix) + payload + [0, 0, 0, 0, 0, 0, 0, 0])
out = list()
for i in range(8):
out.append((poly >> 5 * (7 - i)) & 0x1f)
return out
def verify_checksum(prefix, payload):
return polymod(prefix_expand(prefix) + payload) == 0
def b32decode(inputs):
out = list()
for letter in inputs:
out.append(CHARSET.find(letter))
return out
def b32encode(inputs):
out = ''
for char_code in inputs:
out += CHARSET[char_code]
return out
def convertbits(data, frombits, tobits, pad=True):
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
max_acc = (1 << (frombits + tobits - 1)) - 1
for value in data:
if value < 0 or (value >> frombits):
return None
acc = ((acc << frombits) | value) & max_acc
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
#########################################################################
## BCH CONVERT
#########################################################################
class InvalidAddress(Exception):
pass
class Address:
VERSION_MAP = {
'legacy': [
('P2SH', 5, False),
('P2PKH', 0, False),
('P2SH-TESTNET', 196, True),
('P2PKH-TESTNET', 111, True)
],
'cash': [
('P2SH', 8, False),
('P2PKH', 0, False),
('P2SH-TESTNET', 8, True),
('P2PKH-TESTNET', 0, True)
]
}
MAINNET_PREFIX = 'bitcoincash'
TESTNET_PREFIX = 'bchtest'
def __init__(self, version, payload, prefix=None):
self.version = version
self.payload = payload
if prefix:
self.prefix = prefix
else:
if Address._address_type('cash', self.version)[2]:
self.prefix = self.TESTNET_PREFIX
else:
self.prefix = self.MAINNET_PREFIX
def __str__(self):
return 'version: {}\npayload: {}\nprefix: {}'.format(self.version, self.payload, self.prefix)
def legacy_address(self):
version_int = Address._address_type('legacy', self.version)[1]
return b58encode_check(Address.code_list_to_string([version_int] + self.payload))
def cash_address(self):
version_int = Address._address_type('cash', self.version)[1]
payload = [version_int] + self.payload
payload = convertbits(payload, 8, 5)
checksum = calculate_checksum(self.prefix, payload)
return self.prefix + ':' + b32encode(payload + checksum)
@staticmethod
def code_list_to_string(code_list):
if sys.version_info > (3, 0):
output = bytes()
for code in code_list:
output += bytes([code])
else:
output = ''
for code in code_list:
output += chr(code)
return output
@staticmethod
def _address_type(address_type, version):
for mapping in Address.VERSION_MAP[address_type]:
if mapping[0] == version or mapping[1] == version:
return mapping
raise InvalidAddress('Could not determine address version')
@staticmethod
def from_string(address_string):
try:
address_string = str(address_string)
except Exception:
raise InvalidAddress('Expected string as input')
if ':' not in address_string:
return Address._legacy_string(address_string)
else:
return Address._cash_string(address_string)
@staticmethod
def _legacy_string(address_string):
try:
decoded = bytearray(b58decode_check(address_string))
except ValueError:
raise InvalidAddress('Could not decode legacy address')
version = Address._address_type('legacy', decoded[0])[0]
payload = list()
for letter in decoded[1:]:
payload.append(letter)
return Address(version, payload)
@staticmethod
def _cash_string(address_string):
if address_string.upper() != address_string and address_string.lower() != address_string:
raise InvalidAddress('Cash address contains uppercase and lowercase characters')
address_string = address_string.lower()
colon_count = address_string.count(':')
if colon_count == 0:
address_string = Address.MAINNET_PREFIX + ':' + address_string
elif colon_count > 1:
raise InvalidAddress('Cash address contains more than one colon character')
prefix, base32string = address_string.split(':')
decoded = b32decode(base32string)
if not verify_checksum(prefix, decoded):
raise InvalidAddress('Bad cash address checksum')
converted = convertbits(decoded, 5, 8)
version = Address._address_type('cash', converted[0])[0]
if prefix == Address.TESTNET_PREFIX:
version += '-TESTNET'
payload = converted[1:-6]
return Address(version, payload, prefix)
def to_cash_address(address):
return Address.from_string(address).cash_address()
def to_legacy_address(address):
return Address.from_string(address).legacy_address()
def is_valid(address):
try:
Address.from_string(address)
return True
except InvalidAddress:
return False
#########################################################################
## monero
#########################################################################
__alphabet = [
ord(s) for s in "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
]
__b58base = 58
__UINT64MAX = 2 ** 64
__encodedBlockSizes = [0, 2, 3, 5, 6, 7, 9, 10, 11]
__fullBlockSize = 8
__fullEncodedBlockSize = 11
def _hexToBin(hex_):
if len(hex_) % 2 != 0:
raise ValueError("Hex string has invalid length: %d" % len(hex_))
return [int(hex_[i : i + 2], 16) for i in range(0, len(hex_), 2)]
def _binToHex(bin_):
return "".join("%02x" % int(b) for b in bin_)
def _uint8be_to_64(data):
if not (1 <= len(data) <= 8):
raise ValueError("Invalid input length: %d" % len(data))
res = 0
for b in data:
res = res << 8 | b
return res
def _uint64_to_8be(num, size):
if size < 1 or size > 8:
raise ValueError("Invalid input length: %d" % size)
res = [0] * size
twopow8 = 2 ** 8
for i in range(size - 1, -1, -1):
res[i] = num % twopow8
num = num // twopow8
return res
def encode_block(data, buf, index):
l_data = len(data)
if l_data < 1 or l_data > __fullEncodedBlockSize:
raise ValueError("Invalid block length: %d" % l_data)
num = _uint8be_to_64(data)
i = __encodedBlockSizes[l_data] - 1
while num > 0:
remainder = num % __b58base
num = num // __b58base
buf[index + i] = __alphabet[remainder]
i -= 1
return buf
def encode(hex):
"""Encode hexadecimal string as base58 (ex: encoding a Monero address)."""
data = _hexToBin(hex)
l_data = len(data)
if l_data == 0:
return ""
full_block_count = l_data // __fullBlockSize
last_block_size = l_data % __fullBlockSize
res_size = (
full_block_count * __fullEncodedBlockSize + __encodedBlockSizes[last_block_size]
)
res = bytearray([__alphabet[0]] * res_size)
for i in range(full_block_count):
res = encode_block(
data[(i * __fullBlockSize) : (i * __fullBlockSize + __fullBlockSize)],
res,
i * __fullEncodedBlockSize,
)
if last_block_size > 0:
res = encode_block(
data[
(full_block_count * __fullBlockSize) : (
full_block_count * __fullBlockSize + last_block_size
)
],
res,
full_block_count * __fullEncodedBlockSize,
)
return bytes(res).decode("ascii")
def decode_block(data, buf, index):
l_data = len(data)
if l_data < 1 or l_data > __fullEncodedBlockSize:
raise ValueError("Invalid block length: %d" % l_data)
res_size = __encodedBlockSizes.index(l_data)
if res_size <= 0:
raise ValueError("Invalid block size: %d" % res_size)
res_num = 0
order = 1
for i in range(l_data - 1, -1, -1):
digit = __alphabet.index(data[i])
if digit < 0:
raise ValueError("Invalid symbol: %s" % data[i])
product = order * digit + res_num
if product > __UINT64MAX:
raise ValueError(
"Overflow: %d * %d + %d = %d" % (order, digit, res_num, product)
)
res_num = product
order = order * __b58base
if res_size < __fullBlockSize and 2 ** (8 * res_size) <= res_num:
raise ValueError("Overflow: %d doesn't fit in %d bit(s)" % (res_num, res_size))
tmp_buf = _uint64_to_8be(res_num, res_size)
buf[index : index + len(tmp_buf)] = tmp_buf
return buf
def decode(enc):
"""Decode a base58 string (ex: a Monero address) into hexidecimal form."""
enc = bytearray(enc, encoding="ascii")
l_enc = len(enc)
if l_enc == 0:
return ""
full_block_count = l_enc // __fullEncodedBlockSize
last_block_size = l_enc % __fullEncodedBlockSize
try:
last_block_decoded_size = __encodedBlockSizes.index(last_block_size)
except ValueError:
raise ValueError("Invalid encoded length: %d" % l_enc)
data_size = full_block_count * __fullBlockSize + last_block_decoded_size
data = bytearray(data_size)
for i in range(full_block_count):
data = decode_block(
enc[
(i * __fullEncodedBlockSize) : (
i * __fullEncodedBlockSize + __fullEncodedBlockSize
)
],
data,
i * __fullBlockSize,
)
if last_block_size > 0:
data = decode_block(
enc[
(full_block_count * __fullEncodedBlockSize) : (
full_block_count * __fullEncodedBlockSize + last_block_size
)
],
data,
full_block_count * __fullBlockSize,
)
return _binToHex(data)
#########################################################################
#########################################################################