django/base/functions.py

696 lines
22 KiB
Python
Raw Normal View History

2023-12-05 14:46:34 +00:00
import string, hashlib, binascii, random, socket, base58, sys, requests, json
from requests.auth import HTTPDigestAuth
2023-05-30 09:30:00 +00:00
from typing import Union
from urllib.parse import urlparse
from ipaddress import ip_address, ip_network, IPv4Address
from base58 import b58decode_check, b58encode_check
from enum import Enum
2023-05-05 20:16:08 +00:00
2023-12-05 14:46:34 +00:00
class RPCHost(object):
def __init__(self, url):
self._session = requests.Session()
self._url = url
self._headers = {'content-type': 'application/json'}
def call(self, rpcMethod, *params):
payload = json.dumps({"method": rpcMethod, "params": list(params), "jsonrpc": "2.0"})
tries = 3
hadConnectionFailures = False
while True:
try:
response = self._session.post(self._url, headers=self._headers, data=payload, timeout=15)
except requests.exceptions.ConnectionError:
tries -= 1
if tries == 0:
raise Exception('Failed to connect for remote procedure call.')
hadFailedConnections = True
print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries))
#time.sleep(2)
else:
if hadConnectionFailures:
print('Connected for remote procedure call after retry.')
break
if not response.status_code in (200, 500):
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
responseJSON = response.json()
if 'error' in responseJSON and responseJSON['error'] != None:
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
return responseJSON['result']
class RPCXMR(object):
def __init__(self, url, user, password):
self._session = requests.Session()
self._url = url
self._user = user
self._pass = password
self._headers = {}
def call(self, rpcMethod, params):
payload = json.dumps({"method": rpcMethod, "params": params, "jsonrpc": "2.0"})
tries = 3
hadConnectionFailures = False
while True:
try:
response = self._session.post(self._url, headers=self._headers, data=payload, auth=HTTPDigestAuth(self._user, self._pass), timeout=15)
except requests.exceptions.ConnectionError:
tries -= 1
if tries == 0:
raise Exception('Failed to connect for remote procedure call.')
hadFailedConnections = True
print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries))
#time.sleep(2)
else:
if hadConnectionFailures:
print('Connected for remote procedure call after retry.')
break
if not response.status_code in (200, 500):
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
responseJSON = response.json()
if 'error' in responseJSON and responseJSON['error'] != None:
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
return responseJSON['result']
2023-05-05 20:16:08 +00:00
def vendor_generator(size=6, chars=string.ascii_uppercase + string.digits):
2023-05-30 09:30:00 +00:00
return ''.join(random.choice(chars) for _ in range(size))
def decodeBase58(address):
decoded = base58.b58decode(address).hex()
prefixAndHash = decoded[:len(decoded)-8]
checksum = decoded[len(decoded)-8:]
hash = prefixAndHash
for _ in range(1,3):
hash = hashlib.sha256(binascii.unhexlify(hash)).hexdigest()
if(checksum == hash[:8]):
return True
return False
def decodeMonero(address):
# decode monero address
if len(address) == 95 or len(address) == 106:
# Decode the address from Base58
decoded = decode(address)
# Verify address type byte
if decoded[0:2] not in ["12", "2a", "13"]:
return False
# Verify checksum
# later
return True
return False
def checksumCheck(method, address):
match method.lower():
case 'btc':
2023-12-05 14:46:34 +00:00
return decodeBase58(address) if address[0] == '1' or address[0] == '3' else True if address[0:3] == 'bc1' and bdecode("bc", address)[0] != None else False
2023-05-30 09:30:00 +00:00
case 'btct':
2023-12-05 14:46:34 +00:00
return decodeBase58(address) if address[0] == '2' else True if address[0:3] == 'tb1' and bdecode("tb", address)[0] != None else False
2023-05-30 09:30:00 +00:00
case 'ltc':
2023-12-05 14:46:34 +00:00
return decodeBase58(address) if address[0] == '3' or address[0] == 'M' or address[0] == 'L' else True if address[0:4] == 'ltc1' and bdecode("ltc", address)[0] != None else False
2023-05-30 09:30:00 +00:00
case 'bch':
return is_valid(address) if address[0] == '1' else True if is_valid('bitcoincash:'+address) == True else False
case 'zec':
return decodeBase58(address) if address[0] == 't' or address[0] == 'z' else False
2023-12-05 14:46:34 +00:00
case 'doge':
return decodeBase58(address)
2023-05-30 09:30:00 +00:00
case 'xmr':
#needs new function to check if address is valid, a decoder maybe
return decodeMonero(address)
case _:
return False
#########################################################################
## UrlValidator
#########################################################################
class UrlValidator:
@staticmethod
def is_internal_address(ip: Union[IPv4Address]) -> bool:
return any([
ip.is_private,
ip.is_unspecified,
ip.is_reserved,
ip.is_loopback,
ip.is_multicast,
ip.is_link_local,
])
@classmethod
def validate(cls, url: str):
DEFAULT_PORT_WHITELIST = {80, 81, 8080, 443, 8443, 8000}
DEFAULT_SCHEME_WHITELIST = {'http', 'https'}
DEFAULT_HOST_BLACKLIST = {'192.0.0.192', '169.254.169.254', '100.100.100.200', 'metadata.packet.net', 'metadata.google.internal'}
DEFAULT_CHARACTER_WHITELIST = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789:/-_.?&='
if url is None:
return False
whitelist_set = set(DEFAULT_CHARACTER_WHITELIST)
if any(c not in whitelist_set for c in url):
return False
try:
ip = ip_address(url)
except ValueError:
try:
host = urlparse(url).hostname
ip = ip_address(str(socket.gethostbyname(host)))
except:
return False
port_whitelist = DEFAULT_PORT_WHITELIST.copy()
scheme_whitelist = DEFAULT_SCHEME_WHITELIST.copy()
host_blacklist = DEFAULT_HOST_BLACKLIST.copy()
try:
port, scheme = urlparse(url).port, urlparse(url).scheme
except:
return False
if scheme_whitelist and scheme is not None and scheme not in scheme_whitelist:
return False
if host_blacklist and host is not None and host in host_blacklist:
return False
if port_whitelist and port is not None and port not in port_whitelist:
return False
if ip.version == 4:
if not ip.is_private:
# CGNAT IPs do not set `is_private` so `not is_global` added
if not ip_network(ip).is_global:
return False
else:
return False
if cls.is_internal_address(ip):
return False
return True
"""Reference implementation for Bech32/Bech32m and segwit addresses."""
#########################################################################
## SEGWIT
#########################################################################
class Encoding(Enum):
"""Enumeration type to list the various supported encodings."""
BECH32 = 1
BECH32M = 2
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
BECH32M_CONST = 0x2bc830a3
def bech32_polymod(values):
"""Internal function that computes the Bech32 checksum."""
generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
chk = 1
for value in values:
top = chk >> 25
chk = (chk & 0x1ffffff) << 5 ^ value
for i in range(5):
chk ^= generator[i] if ((top >> i) & 1) else 0
return chk
def bech32_hrp_expand(hrp):
"""Expand the HRP into values for checksum computation."""
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def bech32_verify_checksum(hrp, data):
"""Verify a checksum given HRP and converted data characters."""
const = bech32_polymod(bech32_hrp_expand(hrp) + data)
if const == 1:
return Encoding.BECH32
if const == BECH32M_CONST:
return Encoding.BECH32M
return None
def bech32_create_checksum(hrp, data, spec):
"""Compute the checksum values given HRP and data."""
values = bech32_hrp_expand(hrp) + data
const = BECH32M_CONST if spec == Encoding.BECH32M else 1
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ const
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def bech32_encode(hrp, data, spec):
"""Compute a Bech32 string given HRP and data values."""
combined = data + bech32_create_checksum(hrp, data, spec)
return hrp + '1' + ''.join([CHARSET[d] for d in combined])
def bech32_decode(bech):
"""Validate a Bech32/Bech32m string, and determine HRP and data."""
if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or
(bech.lower() != bech and bech.upper() != bech)):
return (None, None, None)
bech = bech.lower()
pos = bech.rfind('1')
if pos < 1 or pos + 7 > len(bech) or len(bech) > 90:
return (None, None, None)
if not all(x in CHARSET for x in bech[pos+1:]):
return (None, None, None)
hrp = bech[:pos]
data = [CHARSET.find(x) for x in bech[pos+1:]]
spec = bech32_verify_checksum(hrp, data)
if spec is None:
return (None, None, None)
return (hrp, data[:-6], spec)
def convertbits(data, frombits, tobits, pad=True):
"""General power-of-2 base conversion."""
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
max_acc = (1 << (frombits + tobits - 1)) - 1
for value in data:
if value < 0 or (value >> frombits):
return None
acc = ((acc << frombits) | value) & max_acc
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
2023-12-05 14:46:34 +00:00
def bdecode(hrp, addr):
2023-05-30 09:30:00 +00:00
"""Decode a segwit address."""
hrpgot, data, spec = bech32_decode(addr)
if hrpgot != hrp:
return (None, None)
decoded = convertbits(data[1:], 5, 8, False)
if decoded is None or len(decoded) < 2 or len(decoded) > 40:
return (None, None)
if data[0] > 16:
return (None, None)
if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32:
return (None, None)
if data[0] == 0 and spec != Encoding.BECH32 or data[0] != 0 and spec != Encoding.BECH32M:
return (None, None)
return (data[0], decoded)
2023-12-05 14:46:34 +00:00
def bencode(hrp, witver, witprog):
2023-05-30 09:30:00 +00:00
"""Encode a segwit address."""
spec = Encoding.BECH32 if witver == 0 else Encoding.BECH32M
ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5), spec)
2023-12-05 14:46:34 +00:00
if bdecode(hrp, ret) == (None, None):
2023-05-30 09:30:00 +00:00
return None
return ret
#########################################################################
## CRYPTO
#########################################################################
CHARSET = 'qpzry9x8gf2tvdw0s3jn54khce6mua7l'
def polymod(values):
chk = 1
generator = [
(0x01, 0x98f2bc8e61),
(0x02, 0x79b76d99e2),
(0x04, 0xf33e5fb3c4),
(0x08, 0xae2eabe2a8),
(0x10, 0x1e4f43e470)]
for value in values:
top = chk >> 35
chk = ((chk & 0x07ffffffff) << 5) ^ value
for i in generator:
if top & i[0] != 0:
chk ^= i[1]
return chk ^ 1
def prefix_expand(prefix):
return [ord(x) & 0x1f for x in prefix] + [0]
def calculate_checksum(prefix, payload):
poly = polymod(prefix_expand(prefix) + payload + [0, 0, 0, 0, 0, 0, 0, 0])
out = list()
for i in range(8):
out.append((poly >> 5 * (7 - i)) & 0x1f)
return out
def verify_checksum(prefix, payload):
return polymod(prefix_expand(prefix) + payload) == 0
def b32decode(inputs):
out = list()
for letter in inputs:
out.append(CHARSET.find(letter))
return out
def b32encode(inputs):
out = ''
for char_code in inputs:
out += CHARSET[char_code]
return out
def convertbits(data, frombits, tobits, pad=True):
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
max_acc = (1 << (frombits + tobits - 1)) - 1
for value in data:
if value < 0 or (value >> frombits):
return None
acc = ((acc << frombits) | value) & max_acc
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
#########################################################################
## BCH CONVERT
#########################################################################
class InvalidAddress(Exception):
pass
class Address:
VERSION_MAP = {
'legacy': [
('P2SH', 5, False),
('P2PKH', 0, False),
('P2SH-TESTNET', 196, True),
('P2PKH-TESTNET', 111, True)
],
'cash': [
('P2SH', 8, False),
('P2PKH', 0, False),
('P2SH-TESTNET', 8, True),
('P2PKH-TESTNET', 0, True)
]
}
MAINNET_PREFIX = 'bitcoincash'
TESTNET_PREFIX = 'bchtest'
def __init__(self, version, payload, prefix=None):
self.version = version
self.payload = payload
if prefix:
self.prefix = prefix
else:
if Address._address_type('cash', self.version)[2]:
self.prefix = self.TESTNET_PREFIX
else:
self.prefix = self.MAINNET_PREFIX
def __str__(self):
return 'version: {}\npayload: {}\nprefix: {}'.format(self.version, self.payload, self.prefix)
def legacy_address(self):
version_int = Address._address_type('legacy', self.version)[1]
return b58encode_check(Address.code_list_to_string([version_int] + self.payload))
def cash_address(self):
version_int = Address._address_type('cash', self.version)[1]
payload = [version_int] + self.payload
payload = convertbits(payload, 8, 5)
checksum = calculate_checksum(self.prefix, payload)
return self.prefix + ':' + b32encode(payload + checksum)
@staticmethod
def code_list_to_string(code_list):
if sys.version_info > (3, 0):
output = bytes()
for code in code_list:
output += bytes([code])
else:
output = ''
for code in code_list:
output += chr(code)
return output
@staticmethod
def _address_type(address_type, version):
for mapping in Address.VERSION_MAP[address_type]:
if mapping[0] == version or mapping[1] == version:
return mapping
raise InvalidAddress('Could not determine address version')
@staticmethod
def from_string(address_string):
try:
address_string = str(address_string)
except Exception:
raise InvalidAddress('Expected string as input')
if ':' not in address_string:
return Address._legacy_string(address_string)
else:
return Address._cash_string(address_string)
@staticmethod
def _legacy_string(address_string):
try:
decoded = bytearray(b58decode_check(address_string))
except ValueError:
raise InvalidAddress('Could not decode legacy address')
version = Address._address_type('legacy', decoded[0])[0]
payload = list()
for letter in decoded[1:]:
payload.append(letter)
return Address(version, payload)
@staticmethod
def _cash_string(address_string):
if address_string.upper() != address_string and address_string.lower() != address_string:
raise InvalidAddress('Cash address contains uppercase and lowercase characters')
address_string = address_string.lower()
colon_count = address_string.count(':')
if colon_count == 0:
address_string = Address.MAINNET_PREFIX + ':' + address_string
elif colon_count > 1:
raise InvalidAddress('Cash address contains more than one colon character')
prefix, base32string = address_string.split(':')
decoded = b32decode(base32string)
if not verify_checksum(prefix, decoded):
raise InvalidAddress('Bad cash address checksum')
converted = convertbits(decoded, 5, 8)
version = Address._address_type('cash', converted[0])[0]
if prefix == Address.TESTNET_PREFIX:
version += '-TESTNET'
payload = converted[1:-6]
return Address(version, payload, prefix)
def to_cash_address(address):
return Address.from_string(address).cash_address()
def to_legacy_address(address):
return Address.from_string(address).legacy_address()
def is_valid(address):
try:
Address.from_string(address)
return True
except InvalidAddress:
return False
#########################################################################
## monero
#########################################################################
__alphabet = [
ord(s) for s in "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
]
__b58base = 58
__UINT64MAX = 2 ** 64
__encodedBlockSizes = [0, 2, 3, 5, 6, 7, 9, 10, 11]
__fullBlockSize = 8
__fullEncodedBlockSize = 11
def _hexToBin(hex_):
if len(hex_) % 2 != 0:
raise ValueError("Hex string has invalid length: %d" % len(hex_))
return [int(hex_[i : i + 2], 16) for i in range(0, len(hex_), 2)]
def _binToHex(bin_):
return "".join("%02x" % int(b) for b in bin_)
def _uint8be_to_64(data):
if not (1 <= len(data) <= 8):
raise ValueError("Invalid input length: %d" % len(data))
res = 0
for b in data:
res = res << 8 | b
return res
def _uint64_to_8be(num, size):
if size < 1 or size > 8:
raise ValueError("Invalid input length: %d" % size)
res = [0] * size
twopow8 = 2 ** 8
for i in range(size - 1, -1, -1):
res[i] = num % twopow8
num = num // twopow8
return res
def encode_block(data, buf, index):
l_data = len(data)
if l_data < 1 or l_data > __fullEncodedBlockSize:
raise ValueError("Invalid block length: %d" % l_data)
num = _uint8be_to_64(data)
i = __encodedBlockSizes[l_data] - 1
while num > 0:
remainder = num % __b58base
num = num // __b58base
buf[index + i] = __alphabet[remainder]
i -= 1
return buf
def encode(hex):
"""Encode hexadecimal string as base58 (ex: encoding a Monero address)."""
data = _hexToBin(hex)
l_data = len(data)
if l_data == 0:
return ""
full_block_count = l_data // __fullBlockSize
last_block_size = l_data % __fullBlockSize
res_size = (
full_block_count * __fullEncodedBlockSize + __encodedBlockSizes[last_block_size]
)
res = bytearray([__alphabet[0]] * res_size)
for i in range(full_block_count):
res = encode_block(
data[(i * __fullBlockSize) : (i * __fullBlockSize + __fullBlockSize)],
res,
i * __fullEncodedBlockSize,
)
if last_block_size > 0:
res = encode_block(
data[
(full_block_count * __fullBlockSize) : (
full_block_count * __fullBlockSize + last_block_size
)
],
res,
full_block_count * __fullEncodedBlockSize,
)
return bytes(res).decode("ascii")
def decode_block(data, buf, index):
l_data = len(data)
if l_data < 1 or l_data > __fullEncodedBlockSize:
raise ValueError("Invalid block length: %d" % l_data)
res_size = __encodedBlockSizes.index(l_data)
if res_size <= 0:
raise ValueError("Invalid block size: %d" % res_size)
res_num = 0
order = 1
for i in range(l_data - 1, -1, -1):
digit = __alphabet.index(data[i])
if digit < 0:
raise ValueError("Invalid symbol: %s" % data[i])
product = order * digit + res_num
if product > __UINT64MAX:
raise ValueError(
"Overflow: %d * %d + %d = %d" % (order, digit, res_num, product)
)
res_num = product
order = order * __b58base
if res_size < __fullBlockSize and 2 ** (8 * res_size) <= res_num:
raise ValueError("Overflow: %d doesn't fit in %d bit(s)" % (res_num, res_size))
tmp_buf = _uint64_to_8be(res_num, res_size)
buf[index : index + len(tmp_buf)] = tmp_buf
return buf
def decode(enc):
"""Decode a base58 string (ex: a Monero address) into hexidecimal form."""
enc = bytearray(enc, encoding="ascii")
l_enc = len(enc)
if l_enc == 0:
return ""
full_block_count = l_enc // __fullEncodedBlockSize
last_block_size = l_enc % __fullEncodedBlockSize
try:
last_block_decoded_size = __encodedBlockSizes.index(last_block_size)
except ValueError:
raise ValueError("Invalid encoded length: %d" % l_enc)
data_size = full_block_count * __fullBlockSize + last_block_decoded_size
data = bytearray(data_size)
for i in range(full_block_count):
data = decode_block(
enc[
(i * __fullEncodedBlockSize) : (
i * __fullEncodedBlockSize + __fullEncodedBlockSize
)
],
data,
i * __fullBlockSize,
)
if last_block_size > 0:
data = decode_block(
enc[
(full_block_count * __fullEncodedBlockSize) : (
full_block_count * __fullEncodedBlockSize + last_block_size
)
],
data,
full_block_count * __fullBlockSize,
)
return _binToHex(data)
#########################################################################
2023-12-05 14:46:34 +00:00
#########################################################################