Web3py – How to Subscribe to New Block Headers Using Python

pythonweb3.pyweb3js

I am looking for a subscribe function in web3py like the one implemented in web3js:

web3.eth.subscribe('newBlockHeaders' [, callback]);

As there currently is no implementation of it in web3py, has anyone an idea on a possible implementation or some resources on it?

Best Answer

web3.py doesn't have a native implementation of watchers like web3.js. your only option is to connect to the exposed websocket like follows:

import asyncio
import json
import requests
from websockets import connect

async def get_event():
    async with connect("ws://localhost:8545") as ws:
        await ws.send({"id": 1, "method": "eth_subscribe", "params": ["newHeads"]})
        subscription_response = await ws.recv()
        print(subscription_response)
        # you are now subscribed to the event 
        # you keep trying to listen to new events (similar idea to longPolling)
        while True:
            try:
                message = await asyncio.wait_for(ws.recv(), timeout=60)
                print(json.loads(message))
                pass
            except:
                pass
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(get_event())

Note that the subscription_response will have a subscription id that you can use to cancel the subscription. you can find all jsonRPC methods form the docs

Related Topic