fastapi/rpcs.py

66 lines
3.1 KiB
Python
Raw Normal View History

2023-03-10 10:46:21 +00:00
import time, json, requests
from requests.auth import HTTPDigestAuth
class RPCHost(object):
def __init__(self, url):
self._session = requests.Session()
self._url = url
self._headers = {'content-type': 'application/json'}
def call(self, rpcMethod, *params):
payload = json.dumps({"method": rpcMethod, "params": list(params), "jsonrpc": "2.0"})
tries = 3
hadConnectionFailures = False
while True:
try:
response = self._session.post(self._url, headers=self._headers, data=payload, timeout=15)
except requests.exceptions.ConnectionError:
tries -= 1
if tries == 0:
raise Exception('Failed to connect for remote procedure call.')
hadFailedConnections = True
print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries))
#time.sleep(2)
else:
if hadConnectionFailures:
print('Connected for remote procedure call after retry.')
break
if not response.status_code in (200, 500):
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
responseJSON = response.json()
if 'error' in responseJSON and responseJSON['error'] != None:
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
return responseJSON['result']
# class
class RPCXMR(object):
def __init__(self, url, user, password):
self._session = requests.Session()
self._url = url
self._user = user
self._pass = password
self._headers = {}
def call(self, rpcMethod, params):
payload = json.dumps({"method": rpcMethod, "params": params, "jsonrpc": "2.0"})
tries = 3
hadConnectionFailures = False
while True:
try:
response = self._session.post(self._url, headers=self._headers, data=payload, auth=HTTPDigestAuth(self._user, self._pass), timeout=15)
except requests.exceptions.ConnectionError:
tries -= 1
if tries == 0:
raise Exception('Failed to connect for remote procedure call.')
hadFailedConnections = True
print("Couldn't connect for remote procedure call, will sleep for two seconds and then try again ({} more tries)".format(tries))
#time.sleep(2)
else:
if hadConnectionFailures:
print('Connected for remote procedure call after retry.')
break
if not response.status_code in (200, 500):
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
responseJSON = response.json()
if 'error' in responseJSON and responseJSON['error'] != None:
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
return responseJSON['result']