[Ethereum] How to listen contract event in Python

eventspythonsolidity

The test contract is

pragma solidity ^0.5.0;

contract Test{
    event Here(uint256 indexed a);
    uint256 public a;

    function add() public returns (uint256){
        a += 1;
        emit Here(a);
        return a;
    }
}

So I use this below method to listen event according to https://web3py.readthedocs.io/en/stable/filters.html#asynchronous-filter-polling

from web3 import Web3
import asyncio


provider = Web3.HTTPProvider([A INFURA URL])
web3 = Web3(provider)

# this contract has been deployed on the rinkeby net
contract_address = '0x6c0827Aa560Bff3D316C9BfC700E7806e11e0a7F'
contract_abi = '[{"constant":true,"inputs":[],"name":"a","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[],"name":"add","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"a","type":"uint256"}],"name":"Here","type":"event"}]'

contract = web3.eth.contract(address=contract_address, abi=contract_abi)

def handle_event(event):
    print(event)
    # and whatever

async def log_loop(event_filter, poll_interval):
    while True:
        for event in event_filter.get_new_entries():
            handle_event(event)
        await asyncio.sleep(poll_interval)

def main():
    block_filter = web3.eth.filter('latest')
    tx_filter = web3.eth.filter('pending')
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(
            asyncio.gather(
                log_loop(block_filter, 2),
                log_loop(tx_filter, 2)))
    finally:
        loop.close()

if __name__ == "__main__":
    main()

That is all, the question is that it does not work. please give any advice, thanks.

Best Answer

Update the event to output JSON and utilize your contract in a filter.

...

contract = web3.eth.contract(address=contract_address, abi=contract_abi)

def handle_event(event):
    print(Web3.toJSON(event))
    # and whatever

async def log_loop(event_filter, poll_interval):
    while True:
        for event in event_filter.get_new_entries():
            handle_event(event)
        await asyncio.sleep(poll_interval)

def main():
    event_filter = contract.events.Here.createFilter(fromBlock='latest')
    # block_filter = web3.eth.filter('latest')
    # tx_filter = web3.eth.filter('pending')
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(
            asyncio.gather(
                log_loop(event_filter, 2)))
                # log_loop(block_filter, 2),
                # log_loop(tx_filter, 2)))
    finally:
        loop.close()

if __name__ =
    main()