137 lines
5.3 KiB
Python
137 lines
5.3 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# Copyright (c) 2014-2018 The Bitcoin Core developers
|
||
|
# Distributed under the MIT software license, see the accompanying
|
||
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||
|
|
||
|
"""
|
||
|
ZMQ example using python3's asyncio
|
||
|
|
||
|
Litecoind should be started with the command line arguments:
|
||
|
litecoind -testnet -daemon \
|
||
|
-zmqpubrawtx=tcp://127.0.0.1:28332 \
|
||
|
-zmqpubrawblock=tcp://127.0.0.1:28332 \
|
||
|
-zmqpubhashtx=tcp://127.0.0.1:28332 \
|
||
|
-zmqpubhashblock=tcp://127.0.0.1:28332 \
|
||
|
-zmqpubsequence=tcp://127.0.0.1:28332
|
||
|
|
||
|
We use the asyncio library here. `self.handle()` installs itself as a
|
||
|
future at the end of the function. Since it never returns with the event
|
||
|
loop having an empty stack of futures, this creates an infinite loop. An
|
||
|
alternative is to wrap the contents of `handle` inside `while True`.
|
||
|
|
||
|
A blocking example using python 2.7 can be obtained from the git history:
|
||
|
https://github.com/bitcoin/bitcoin/blob/37a7fe9e440b83e2364d5498931253937abe9294/contrib/zmq/zmq_sub.py
|
||
|
"""
|
||
|
|
||
|
import binascii
|
||
|
import asyncio
|
||
|
import zmq
|
||
|
import zmq.asyncio
|
||
|
import signal
|
||
|
import struct
|
||
|
import sys
|
||
|
import time, requests, json
|
||
|
|
||
|
if (sys.version_info.major, sys.version_info.minor) < (3, 5):
|
||
|
print("This example only works with Python 3.5 and greater")
|
||
|
sys.exit(1)
|
||
|
|
||
|
class RPCHost(object):
|
||
|
def __init__(self, url):
|
||
|
self._session = requests.Session()
|
||
|
self._url = url
|
||
|
self._headers = {'content-type': 'application/json'}
|
||
|
def call(self, rpcMethod, *params):
|
||
|
payload = json.dumps({"method": rpcMethod, "params": list(params), "jsonrpc": "2.0"})
|
||
|
tries = 5
|
||
|
hadConnectionFailures = False
|
||
|
while True:
|
||
|
try:
|
||
|
response = self._session.post(self._url, headers=self._headers, data=payload)
|
||
|
except requests.exceptions.ConnectionError:
|
||
|
tries -= 1
|
||
|
if tries == 0:
|
||
|
raise Exception('Failed to connect for remote procedure call.')
|
||
|
hadFailedConnections = True
|
||
|
print("Couldn't connect for remote procedure call, will sleep for five seconds and then try again ({} more tries)".format(tries))
|
||
|
time.sleep(10)
|
||
|
else:
|
||
|
if hadConnectionFailures:
|
||
|
print('Connected for remote procedure call after retry.')
|
||
|
break
|
||
|
if not response.status_code in (200, 500):
|
||
|
raise Exception('RPC connection failure: ' + str(response.status_code) + ' ' + response.reason)
|
||
|
responseJSON = response.json()
|
||
|
if 'error' in responseJSON and responseJSON['error'] != None:
|
||
|
raise Exception('Error in RPC call: ' + str(responseJSON['error']))
|
||
|
return responseJSON['result']
|
||
|
|
||
|
ZMQ_PORT = 28442
|
||
|
SERVER_IP = '172.16.0.10'
|
||
|
SERVER_PORT = 8442
|
||
|
SERVER_USER = 'litecoinrpc'
|
||
|
SERVER_PASS = 'uhalalala'
|
||
|
|
||
|
serverURL = "http://%s:%s@%s:%s" % (SERVER_USER, SERVER_PASS, SERVER_IP, SERVER_PORT)
|
||
|
rpc = RPCHost(serverURL)
|
||
|
|
||
|
class ZMQHandler():
|
||
|
def __init__(self):
|
||
|
self.loop = asyncio.get_event_loop()
|
||
|
self.zmqContext = zmq.asyncio.Context()
|
||
|
|
||
|
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
|
||
|
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
|
||
|
#self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "")
|
||
|
#self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
|
||
|
#self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
|
||
|
#self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
|
||
|
#self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
|
||
|
self.zmqSubSocket.connect("tcp://%s:%i" % (SERVER_IP, ZMQ_PORT))
|
||
|
|
||
|
async def handle(self) :
|
||
|
topic, body, seq = await self.zmqSubSocket.recv_multipart()
|
||
|
sequence = "Unknown"
|
||
|
if len(seq) == 4:
|
||
|
sequence = str(struct.unpack('<I', seq)[-1])
|
||
|
if topic == b"hashblock":
|
||
|
print('- HASH BLOCK ('+sequence+') -')
|
||
|
print(binascii.hexlify(body))
|
||
|
elif topic == b"hashtx":
|
||
|
print('- HASH TX ('+sequence+') -')
|
||
|
print(binascii.hexlify(body))
|
||
|
elif topic == b"rawblock":
|
||
|
print('- RAW BLOCK HEADER ('+sequence+') -')
|
||
|
print(binascii.hexlify(body[:80]))
|
||
|
elif topic == b"rawtx":
|
||
|
print('- RAW TX ('+sequence+') -')
|
||
|
#decode = rpc.call('decoderawtransaction', binascii.hexlify(body).decode('utf-8'))
|
||
|
#data = json.dumps(decode)
|
||
|
#print(data)
|
||
|
|
||
|
#print(binascii.hexlify(body))
|
||
|
elif topic == b"sequence":
|
||
|
hash = binascii.hexlify(body[:32])
|
||
|
label = chr(body[32])
|
||
|
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
|
||
|
print('- SEQUENCE ('+sequence+') -')
|
||
|
print(hash, label, mempool_sequence)
|
||
|
else:
|
||
|
print(binascii.hexlify(body))
|
||
|
# schedule ourselves to receive the next message
|
||
|
asyncio.ensure_future(self.handle())
|
||
|
|
||
|
|
||
|
|
||
|
def start(self):
|
||
|
self.loop.add_signal_handler(signal.SIGINT, self.stop)
|
||
|
self.loop.create_task(self.handle())
|
||
|
self.loop.run_forever()
|
||
|
|
||
|
def stop(self):
|
||
|
self.loop.stop()
|
||
|
self.zmqContext.destroy()
|
||
|
|
||
|
daemon = ZMQHandler()
|
||
|
daemon.start()
|