fastapi/scratch_folder/scratch_file

217 lines
6.2 KiB
Plaintext
Raw Normal View History

2023-03-10 10:46:21 +00:00
'''
def checksumCheck(method, address):
if method == 'btc':
if address[0] == '1' or address[0] == '3':
if decodeBase58(address):
return True
return False
elif address[0:3] == 'bc1':
witver = segwit_addr.decode("bc", address)
if witver[0] == None:
return False
return True
else:
return False
elif method == 'ltc':
if address[0] == '3' or address[0] == 'M' or address[0] == 'L':
if decodeBase58(address):
return True
return False
elif address[0:4] == 'ltc1':
witver = segwit_addr.decode("ltc", address)
if witver[0] != None:
return True
return False
else:
return False
elif method == 'bch':
if address[0] == '1':
if bchconvert.is_valid(address) == True:
return True
return False
elif bchconvert.is_valid('bitcoincash:'+address) == True:
return True
return False
elif method == 'zec':
if address[0] == 't' or address[0] == 'z':
if decodeBase58(address):
return True
return False
else:
return True
elif method == 'xmr':
length = len(address)
if length == 95:
checksum = moneropy.decode(address)
start = checksum[0:2].lower()
if start == '12' or start == '2a':
return True
return False
elif length == 106:
checksum = moneropy.decode(address)
start = checksum[0:2].lower()
if start == '13':
return True
return False
else:
return False
else:
return False
k = sha3.keccak_256()
print("First 65: " + str(address_type[:-65]))
print(hash64)
k.update(hash64)
a = k.hexdigest()
print("sha3 digest: " + a)
print("base58 decoded: " + address_type)
first4 = a[:-4]
last = address_type[-4:]
'''
return start
2023-04-06 11:42:08 +00:00
2023-03-10 10:46:21 +00:00
valid = re.compile(r"^(bc1|[13])[a-zA-HJ-NP-Z0-9]{25,39}$")
if valid.match(address) is None:
return False
else:
return True
else:
return "END"
'''
def validDns(d):
b = False
try:
a = socket.gethostbyname(d)
b = True
except:
return False
# ip validation
if b == True:
if a.split('.')[0] in ['127', '0']:
return False
if '.'.join([a.split('.')[0], a.split('.')[1]]) == '192.168':
return False
if a in ['1.1.1.1','2.2.2.2', '3.3.3.3']:
return False
return True
import base58
import hashlib
# Modified Base58 alphabet used in Monero
MONERO_BASE58_ALPHABET = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def encode_monero_address(address, version):
# Compute checksum
hash1 = hashlib.sha256(bytes.fromhex(version + address)).digest()
hash2 = hashlib.sha256(hash1).digest()
checksum = hash2[:4]
# Concatenate version, address, and checksum
data = bytes.fromhex(version + address) + checksum
# Encode using modified Base58 alphabet
encoded = base58.b58encode(data, alphabet=MONERO_BASE58_ALPHABET)
return encoded.decode()
def decode_monero_address(encoded):
# Decode using modified Base58 alphabet
data = base58.b58decode(encoded, alphabet=MONERO_BASE58_ALPHABET)
# Extract version, address, and checksum
version = data[:2].hex()
address = data[2:-4].hex()
checksum = data[-4:]
# Verify checksum
hash1 = hashlib.sha256(bytes.fromhex(version + address)).digest()
hash2 = hashlib.sha256(hash1).digest()
if checksum != hash2[:4]:
raise ValueError('Invalid checksum')
return version, address
def decode_monero_address(encoded):
from Crypto.Hash import keccak
import binascii
# Decode using modified Base58 alphabet
print(encoded)
data = moneropy.decode(encoded)
print(data)
# Extract version, address, and checksum
version = data[:2].encode()
address1 = data[2:66].encode()
address2 = data[66:-8].encode()
checksum = data[-8:]
print(str(version + address1 + address2))
print(hex(version))
# Verify checksum
#keccak256 = keccak.new(digest_bits=256)
#keccak256.update(version + address1 + address2)
#print(keccak256.hexdigest())
#print("Keccak256:", binascii.hexlify(keccak256))
#print(binascii.hexlify(keccak256), checksum)
2023-04-06 11:42:08 +00:00
return False
# class
class RPCXMR(object):
def __init__(self, url, user, password):
self._session = requests.Session()
self._url = url
self._user = user
self._pass = password
self._headers = {}
def call(self, rpcMethod, params):
payload = json.dumps({"method": rpcMethod, "params": params, "jsonrpc": "2.0"})
tries = 3
hadConnectionFailures = False
while True:
try:
response = self._session.post(self._url, headers=self._headers, data=payload, auth=HTTPDigestAuth(self._user, self._pass), timeout=15)
except requests.exceptions.ConnectionError:
tries -= 1
if tries == 0:
raise Exception('Failed to connect for remote procedure call.')
hadFailedConnections = True
print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries))
#time.sleep(2)
else:
if hadConnectionFailures:
print('Connected for remote procedure call after retry.')
break
if not response.status_code in (200, 500):
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
responseJSON = response.json()
if 'error' in responseJSON and responseJSON['error'] != None:
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
return responseJSON['result']