Compare commits
No commits in common. "71143ee2389036cd9365053e637e235ccdd9e9ae" and "435e7aae3825f82b13ae51d0039971fe406747d1" have entirely different histories.
71143ee238
...
435e7aae38
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -1,70 +0,0 @@
|
||||||
import socket
|
|
||||||
from typing import Union
|
|
||||||
from urllib.parse import urlparse
|
|
||||||
from ipaddress import ip_address, ip_network, IPv4Address
|
|
||||||
|
|
||||||
class UrlValidator:
|
|
||||||
@staticmethod
|
|
||||||
def is_internal_address(ip: Union[IPv4Address]) -> bool:
|
|
||||||
return any([
|
|
||||||
ip.is_private,
|
|
||||||
ip.is_unspecified,
|
|
||||||
ip.is_reserved,
|
|
||||||
ip.is_loopback,
|
|
||||||
ip.is_multicast,
|
|
||||||
ip.is_link_local,
|
|
||||||
])
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def validate(cls, url: str):
|
|
||||||
DEFAULT_PORT_WHITELIST = {80, 81, 8080, 443, 8443, 8000}
|
|
||||||
DEFAULT_SCHEME_WHITELIST = {'http', 'https'}
|
|
||||||
DEFAULT_HOST_BLACKLIST = {'192.0.0.192', '169.254.169.254', '100.100.100.200', 'metadata.packet.net', 'metadata.google.internal'}
|
|
||||||
DEFAULT_CHARACTER_WHITELIST = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789:/-_.?&='
|
|
||||||
|
|
||||||
if url is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
whitelist_set = set(DEFAULT_CHARACTER_WHITELIST)
|
|
||||||
if any(c not in whitelist_set for c in url):
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
ip = ip_address(url)
|
|
||||||
except ValueError:
|
|
||||||
try:
|
|
||||||
host = urlparse(url).hostname
|
|
||||||
ip = ip_address(str(socket.gethostbyname(host)))
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
port_whitelist = DEFAULT_PORT_WHITELIST.copy()
|
|
||||||
scheme_whitelist = DEFAULT_SCHEME_WHITELIST.copy()
|
|
||||||
host_blacklist = DEFAULT_HOST_BLACKLIST.copy()
|
|
||||||
|
|
||||||
try:
|
|
||||||
port, scheme = urlparse(url).port, urlparse(url).scheme
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if scheme_whitelist and scheme is not None and scheme not in scheme_whitelist:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if host_blacklist and host is not None and host in host_blacklist:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if port_whitelist and port is not None and port not in port_whitelist:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if ip.version == 4:
|
|
||||||
if not ip.is_private:
|
|
||||||
# CGNAT IPs do not set `is_private` so `not is_global` added
|
|
||||||
if not ip_network(ip).is_global:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if cls.is_internal_address(ip):
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
|
@ -35,7 +35,7 @@ def checksumCheck(method, address):
|
||||||
case 'btc':
|
case 'btc':
|
||||||
return decodeBase58(address) if address[0] == '1' or address[0] == '3' else True if address[0:3] == 'bc1' and segwit_addr.decode("bc", address)[0] != None else False
|
return decodeBase58(address) if address[0] == '1' or address[0] == '3' else True if address[0:3] == 'bc1' and segwit_addr.decode("bc", address)[0] != None else False
|
||||||
case 'btct':
|
case 'btct':
|
||||||
return decodeBase58(address) if address[0] == '2' else True if address[0:3] == 'tb1' and segwit_addr.decode("tb", address)[0] != None else False
|
return decodeBase58(address) if address[0] == '1' or address[0] == '3' else True if address[0:3] == 'tb1' and segwit_addr.decode("tb", address)[0] != None else False
|
||||||
case 'ltc':
|
case 'ltc':
|
||||||
return decodeBase58(address) if address[0] == '3' or address[0] == 'M' or address[0] == 'L' else True if address[0:4] == 'ltc1' and segwit_addr.decode("ltc", address)[0] != None else False
|
return decodeBase58(address) if address[0] == '3' or address[0] == 'M' or address[0] == 'L' else True if address[0:4] == 'ltc1' and segwit_addr.decode("ltc", address)[0] != None else False
|
||||||
case 'bch':
|
case 'bch':
|
||||||
|
|
2
main.py
2
main.py
|
@ -113,7 +113,7 @@ def receive(method: str, address: str, callback: Union[str, None] = None):
|
||||||
session.close()
|
session.close()
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
## notify admin about the error
|
## notify admin about the error
|
||||||
raise ErrorException(code=422,status="error",status_message='Invalid response from dbServer')
|
raise ErrorException(code=422,status="error",status_message='Invalid response from dbServer:'+str(error))
|
||||||
else:
|
else:
|
||||||
## notify admin about the error
|
## notify admin about the error
|
||||||
raise ErrorException(code=422,status="error",status_message='Invalid response from rpcServer')
|
raise ErrorException(code=422,status="error",status_message='Invalid response from rpcServer')
|
||||||
|
|
Loading…
Reference in New Issue