import string, hashlib, binascii, random, socket, base58, sys, requests, json from requests.auth import HTTPDigestAuth from typing import Union from urllib.parse import urlparse from ipaddress import ip_address, ip_network, IPv4Address from base58 import b58decode_check, b58encode_check from enum import Enum class RPCHost(object): def __init__(self, url): self._session = requests.Session() self._url = url self._headers = {'content-type': 'application/json'} def call(self, rpcMethod, *params): payload = json.dumps({"method": rpcMethod, "params": list(params), "jsonrpc": "2.0"}) tries = 3 hadConnectionFailures = False while True: try: response = self._session.post(self._url, headers=self._headers, data=payload, timeout=15) except requests.exceptions.ConnectionError: tries -= 1 if tries == 0: raise Exception('Failed to connect for remote procedure call.') hadFailedConnections = True print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries)) #time.sleep(2) else: if hadConnectionFailures: print('Connected for remote procedure call after retry.') break if not response.status_code in (200, 500): raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason) responseJSON = response.json() if 'error' in responseJSON and responseJSON['error'] != None: raise Exception('Error in RPC call: ' + str(responseJSON['error'])) return responseJSON['result'] class RPCXMR(object): def __init__(self, url, user, password): self._session = requests.Session() self._url = url self._user = user self._pass = password self._headers = {} def call(self, rpcMethod, params): payload = json.dumps({"method": rpcMethod, "params": params, "jsonrpc": "2.0"}) tries = 3 hadConnectionFailures = False while True: try: response = self._session.post(self._url, headers=self._headers, data=payload, auth=HTTPDigestAuth(self._user, self._pass), timeout=15) except requests.exceptions.ConnectionError: tries -= 1 if tries == 0: raise Exception('Failed to connect for remote procedure call.') hadFailedConnections = True print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries)) #time.sleep(2) else: if hadConnectionFailures: print('Connected for remote procedure call after retry.') break if not response.status_code in (200, 500): raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason) responseJSON = response.json() if 'error' in responseJSON and responseJSON['error'] != None: raise Exception('Error in RPC call: ' + str(responseJSON['error'])) return responseJSON['result'] def vendor_generator(size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for _ in range(size)) def decodeBase58(address): decoded = base58.b58decode(address).hex() prefixAndHash = decoded[:len(decoded)-8] checksum = decoded[len(decoded)-8:] hash = prefixAndHash for _ in range(1,3): hash = hashlib.sha256(binascii.unhexlify(hash)).hexdigest() if(checksum == hash[:8]): return True return False def decodeMonero(address): # decode monero address if len(address) == 95 or len(address) == 106: # Decode the address from Base58 decoded = decode(address) # Verify address type byte if decoded[0:2] not in ["12", "2a", "13"]: return False # Verify checksum # later return True return False def checksumCheck(method, address): match method.lower(): case 'btc': return decodeBase58(address) if address[0] == '1' or address[0] == '3' else True if address[0:3] == 'bc1' and bdecode("bc", address)[0] != None else False case 'btct': return decodeBase58(address) if address[0] == '2' else True if address[0:3] == 'tb1' and bdecode("tb", address)[0] != None else False case 'ltc': return decodeBase58(address) if address[0] == '3' or address[0] == 'M' or address[0] == 'L' else True if address[0:4] == 'ltc1' and bdecode("ltc", address)[0] != None else False case 'bch': return is_valid(address) if address[0] == '1' else True if is_valid('bitcoincash:'+address) == True else False case 'zec': return decodeBase58(address) if address[0] == 't' or address[0] == 'z' else False case 'doge': return decodeBase58(address) case 'xmr': #needs new function to check if address is valid, a decoder maybe return decodeMonero(address) case _: return False ######################################################################### ## UrlValidator ######################################################################### class UrlValidator: @staticmethod def is_internal_address(ip: Union[IPv4Address]) -> bool: return any([ ip.is_private, ip.is_unspecified, ip.is_reserved, ip.is_loopback, ip.is_multicast, ip.is_link_local, ]) @classmethod def validate(cls, url: str): DEFAULT_PORT_WHITELIST = {80, 81, 8080, 443, 8443, 8000} DEFAULT_SCHEME_WHITELIST = {'http', 'https'} DEFAULT_HOST_BLACKLIST = {'192.0.0.192', '169.254.169.254', '100.100.100.200', 'metadata.packet.net', 'metadata.google.internal'} DEFAULT_CHARACTER_WHITELIST = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789:/-_.?&=' if url is None: return False whitelist_set = set(DEFAULT_CHARACTER_WHITELIST) if any(c not in whitelist_set for c in url): return False try: ip = ip_address(url) except ValueError: try: host = urlparse(url).hostname ip = ip_address(str(socket.gethostbyname(host))) except: return False port_whitelist = DEFAULT_PORT_WHITELIST.copy() scheme_whitelist = DEFAULT_SCHEME_WHITELIST.copy() host_blacklist = DEFAULT_HOST_BLACKLIST.copy() try: port, scheme = urlparse(url).port, urlparse(url).scheme except: return False if scheme_whitelist and scheme is not None and scheme not in scheme_whitelist: return False if host_blacklist and host is not None and host in host_blacklist: return False if port_whitelist and port is not None and port not in port_whitelist: return False if ip.version == 4: if not ip.is_private: # CGNAT IPs do not set `is_private` so `not is_global` added if not ip_network(ip).is_global: return False else: return False if cls.is_internal_address(ip): return False return True """Reference implementation for Bech32/Bech32m and segwit addresses.""" ######################################################################### ## SEGWIT ######################################################################### class Encoding(Enum): """Enumeration type to list the various supported encodings.""" BECH32 = 1 BECH32M = 2 CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" BECH32M_CONST = 0x2bc830a3 def bech32_polymod(values): """Internal function that computes the Bech32 checksum.""" generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] chk = 1 for value in values: top = chk >> 25 chk = (chk & 0x1ffffff) << 5 ^ value for i in range(5): chk ^= generator[i] if ((top >> i) & 1) else 0 return chk def bech32_hrp_expand(hrp): """Expand the HRP into values for checksum computation.""" return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] def bech32_verify_checksum(hrp, data): """Verify a checksum given HRP and converted data characters.""" const = bech32_polymod(bech32_hrp_expand(hrp) + data) if const == 1: return Encoding.BECH32 if const == BECH32M_CONST: return Encoding.BECH32M return None def bech32_create_checksum(hrp, data, spec): """Compute the checksum values given HRP and data.""" values = bech32_hrp_expand(hrp) + data const = BECH32M_CONST if spec == Encoding.BECH32M else 1 polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ const return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] def bech32_encode(hrp, data, spec): """Compute a Bech32 string given HRP and data values.""" combined = data + bech32_create_checksum(hrp, data, spec) return hrp + '1' + ''.join([CHARSET[d] for d in combined]) def bech32_decode(bech): """Validate a Bech32/Bech32m string, and determine HRP and data.""" if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or (bech.lower() != bech and bech.upper() != bech)): return (None, None, None) bech = bech.lower() pos = bech.rfind('1') if pos < 1 or pos + 7 > len(bech) or len(bech) > 90: return (None, None, None) if not all(x in CHARSET for x in bech[pos+1:]): return (None, None, None) hrp = bech[:pos] data = [CHARSET.find(x) for x in bech[pos+1:]] spec = bech32_verify_checksum(hrp, data) if spec is None: return (None, None, None) return (hrp, data[:-6], spec) def convertbits(data, frombits, tobits, pad=True): """General power-of-2 base conversion.""" acc = 0 bits = 0 ret = [] maxv = (1 << tobits) - 1 max_acc = (1 << (frombits + tobits - 1)) - 1 for value in data: if value < 0 or (value >> frombits): return None acc = ((acc << frombits) | value) & max_acc bits += frombits while bits >= tobits: bits -= tobits ret.append((acc >> bits) & maxv) if pad: if bits: ret.append((acc << (tobits - bits)) & maxv) elif bits >= frombits or ((acc << (tobits - bits)) & maxv): return None return ret def bdecode(hrp, addr): """Decode a segwit address.""" hrpgot, data, spec = bech32_decode(addr) if hrpgot != hrp: return (None, None) decoded = convertbits(data[1:], 5, 8, False) if decoded is None or len(decoded) < 2 or len(decoded) > 40: return (None, None) if data[0] > 16: return (None, None) if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32: return (None, None) if data[0] == 0 and spec != Encoding.BECH32 or data[0] != 0 and spec != Encoding.BECH32M: return (None, None) return (data[0], decoded) def bencode(hrp, witver, witprog): """Encode a segwit address.""" spec = Encoding.BECH32 if witver == 0 else Encoding.BECH32M ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5), spec) if bdecode(hrp, ret) == (None, None): return None return ret ######################################################################### ## CRYPTO ######################################################################### CHARSET = 'qpzry9x8gf2tvdw0s3jn54khce6mua7l' def polymod(values): chk = 1 generator = [ (0x01, 0x98f2bc8e61), (0x02, 0x79b76d99e2), (0x04, 0xf33e5fb3c4), (0x08, 0xae2eabe2a8), (0x10, 0x1e4f43e470)] for value in values: top = chk >> 35 chk = ((chk & 0x07ffffffff) << 5) ^ value for i in generator: if top & i[0] != 0: chk ^= i[1] return chk ^ 1 def prefix_expand(prefix): return [ord(x) & 0x1f for x in prefix] + [0] def calculate_checksum(prefix, payload): poly = polymod(prefix_expand(prefix) + payload + [0, 0, 0, 0, 0, 0, 0, 0]) out = list() for i in range(8): out.append((poly >> 5 * (7 - i)) & 0x1f) return out def verify_checksum(prefix, payload): return polymod(prefix_expand(prefix) + payload) == 0 def b32decode(inputs): out = list() for letter in inputs: out.append(CHARSET.find(letter)) return out def b32encode(inputs): out = '' for char_code in inputs: out += CHARSET[char_code] return out def convertbits(data, frombits, tobits, pad=True): acc = 0 bits = 0 ret = [] maxv = (1 << tobits) - 1 max_acc = (1 << (frombits + tobits - 1)) - 1 for value in data: if value < 0 or (value >> frombits): return None acc = ((acc << frombits) | value) & max_acc bits += frombits while bits >= tobits: bits -= tobits ret.append((acc >> bits) & maxv) if pad: if bits: ret.append((acc << (tobits - bits)) & maxv) elif bits >= frombits or ((acc << (tobits - bits)) & maxv): return None return ret ######################################################################### ## BCH CONVERT ######################################################################### class InvalidAddress(Exception): pass class Address: VERSION_MAP = { 'legacy': [ ('P2SH', 5, False), ('P2PKH', 0, False), ('P2SH-TESTNET', 196, True), ('P2PKH-TESTNET', 111, True) ], 'cash': [ ('P2SH', 8, False), ('P2PKH', 0, False), ('P2SH-TESTNET', 8, True), ('P2PKH-TESTNET', 0, True) ] } MAINNET_PREFIX = 'bitcoincash' TESTNET_PREFIX = 'bchtest' def __init__(self, version, payload, prefix=None): self.version = version self.payload = payload if prefix: self.prefix = prefix else: if Address._address_type('cash', self.version)[2]: self.prefix = self.TESTNET_PREFIX else: self.prefix = self.MAINNET_PREFIX def __str__(self): return 'version: {}\npayload: {}\nprefix: {}'.format(self.version, self.payload, self.prefix) def legacy_address(self): version_int = Address._address_type('legacy', self.version)[1] return b58encode_check(Address.code_list_to_string([version_int] + self.payload)) def cash_address(self): version_int = Address._address_type('cash', self.version)[1] payload = [version_int] + self.payload payload = convertbits(payload, 8, 5) checksum = calculate_checksum(self.prefix, payload) return self.prefix + ':' + b32encode(payload + checksum) @staticmethod def code_list_to_string(code_list): if sys.version_info > (3, 0): output = bytes() for code in code_list: output += bytes([code]) else: output = '' for code in code_list: output += chr(code) return output @staticmethod def _address_type(address_type, version): for mapping in Address.VERSION_MAP[address_type]: if mapping[0] == version or mapping[1] == version: return mapping raise InvalidAddress('Could not determine address version') @staticmethod def from_string(address_string): try: address_string = str(address_string) except Exception: raise InvalidAddress('Expected string as input') if ':' not in address_string: return Address._legacy_string(address_string) else: return Address._cash_string(address_string) @staticmethod def _legacy_string(address_string): try: decoded = bytearray(b58decode_check(address_string)) except ValueError: raise InvalidAddress('Could not decode legacy address') version = Address._address_type('legacy', decoded[0])[0] payload = list() for letter in decoded[1:]: payload.append(letter) return Address(version, payload) @staticmethod def _cash_string(address_string): if address_string.upper() != address_string and address_string.lower() != address_string: raise InvalidAddress('Cash address contains uppercase and lowercase characters') address_string = address_string.lower() colon_count = address_string.count(':') if colon_count == 0: address_string = Address.MAINNET_PREFIX + ':' + address_string elif colon_count > 1: raise InvalidAddress('Cash address contains more than one colon character') prefix, base32string = address_string.split(':') decoded = b32decode(base32string) if not verify_checksum(prefix, decoded): raise InvalidAddress('Bad cash address checksum') converted = convertbits(decoded, 5, 8) version = Address._address_type('cash', converted[0])[0] if prefix == Address.TESTNET_PREFIX: version += '-TESTNET' payload = converted[1:-6] return Address(version, payload, prefix) def to_cash_address(address): return Address.from_string(address).cash_address() def to_legacy_address(address): return Address.from_string(address).legacy_address() def is_valid(address): try: Address.from_string(address) return True except InvalidAddress: return False ######################################################################### ## monero ######################################################################### __alphabet = [ ord(s) for s in "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" ] __b58base = 58 __UINT64MAX = 2 ** 64 __encodedBlockSizes = [0, 2, 3, 5, 6, 7, 9, 10, 11] __fullBlockSize = 8 __fullEncodedBlockSize = 11 def _hexToBin(hex_): if len(hex_) % 2 != 0: raise ValueError("Hex string has invalid length: %d" % len(hex_)) return [int(hex_[i : i + 2], 16) for i in range(0, len(hex_), 2)] def _binToHex(bin_): return "".join("%02x" % int(b) for b in bin_) def _uint8be_to_64(data): if not (1 <= len(data) <= 8): raise ValueError("Invalid input length: %d" % len(data)) res = 0 for b in data: res = res << 8 | b return res def _uint64_to_8be(num, size): if size < 1 or size > 8: raise ValueError("Invalid input length: %d" % size) res = [0] * size twopow8 = 2 ** 8 for i in range(size - 1, -1, -1): res[i] = num % twopow8 num = num // twopow8 return res def encode_block(data, buf, index): l_data = len(data) if l_data < 1 or l_data > __fullEncodedBlockSize: raise ValueError("Invalid block length: %d" % l_data) num = _uint8be_to_64(data) i = __encodedBlockSizes[l_data] - 1 while num > 0: remainder = num % __b58base num = num // __b58base buf[index + i] = __alphabet[remainder] i -= 1 return buf def encode(hex): """Encode hexadecimal string as base58 (ex: encoding a Monero address).""" data = _hexToBin(hex) l_data = len(data) if l_data == 0: return "" full_block_count = l_data // __fullBlockSize last_block_size = l_data % __fullBlockSize res_size = ( full_block_count * __fullEncodedBlockSize + __encodedBlockSizes[last_block_size] ) res = bytearray([__alphabet[0]] * res_size) for i in range(full_block_count): res = encode_block( data[(i * __fullBlockSize) : (i * __fullBlockSize + __fullBlockSize)], res, i * __fullEncodedBlockSize, ) if last_block_size > 0: res = encode_block( data[ (full_block_count * __fullBlockSize) : ( full_block_count * __fullBlockSize + last_block_size ) ], res, full_block_count * __fullEncodedBlockSize, ) return bytes(res).decode("ascii") def decode_block(data, buf, index): l_data = len(data) if l_data < 1 or l_data > __fullEncodedBlockSize: raise ValueError("Invalid block length: %d" % l_data) res_size = __encodedBlockSizes.index(l_data) if res_size <= 0: raise ValueError("Invalid block size: %d" % res_size) res_num = 0 order = 1 for i in range(l_data - 1, -1, -1): digit = __alphabet.index(data[i]) if digit < 0: raise ValueError("Invalid symbol: %s" % data[i]) product = order * digit + res_num if product > __UINT64MAX: raise ValueError( "Overflow: %d * %d + %d = %d" % (order, digit, res_num, product) ) res_num = product order = order * __b58base if res_size < __fullBlockSize and 2 ** (8 * res_size) <= res_num: raise ValueError("Overflow: %d doesn't fit in %d bit(s)" % (res_num, res_size)) tmp_buf = _uint64_to_8be(res_num, res_size) buf[index : index + len(tmp_buf)] = tmp_buf return buf def decode(enc): """Decode a base58 string (ex: a Monero address) into hexidecimal form.""" enc = bytearray(enc, encoding="ascii") l_enc = len(enc) if l_enc == 0: return "" full_block_count = l_enc // __fullEncodedBlockSize last_block_size = l_enc % __fullEncodedBlockSize try: last_block_decoded_size = __encodedBlockSizes.index(last_block_size) except ValueError: raise ValueError("Invalid encoded length: %d" % l_enc) data_size = full_block_count * __fullBlockSize + last_block_decoded_size data = bytearray(data_size) for i in range(full_block_count): data = decode_block( enc[ (i * __fullEncodedBlockSize) : ( i * __fullEncodedBlockSize + __fullEncodedBlockSize ) ], data, i * __fullBlockSize, ) if last_block_size > 0: data = decode_block( enc[ (full_block_count * __fullEncodedBlockSize) : ( full_block_count * __fullEncodedBlockSize + last_block_size ) ], data, full_block_count * __fullBlockSize, ) return _binToHex(data) ######################################################################### #########################################################################