The test contract is
pragma solidity ^0.5.0;
contract Test{
event Here(uint256 indexed a);
uint256 public a;
function add() public returns (uint256){
a += 1;
emit Here(a);
return a;
}
}
So I use this below method to listen event according to https://web3py.readthedocs.io/en/stable/filters.html#asynchronous-filter-polling
from web3 import Web3
import asyncio
provider = Web3.HTTPProvider([A INFURA URL])
web3 = Web3(provider)
# this contract has been deployed on the rinkeby net
contract_address = '0x6c0827Aa560Bff3D316C9BfC700E7806e11e0a7F'
contract_abi = '[{"constant":true,"inputs":[],"name":"a","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[],"name":"add","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"a","type":"uint256"}],"name":"Here","type":"event"}]'
contract = web3.eth.contract(address=contract_address, abi=contract_abi)
def handle_event(event):
print(event)
# and whatever
async def log_loop(event_filter, poll_interval):
while True:
for event in event_filter.get_new_entries():
handle_event(event)
await asyncio.sleep(poll_interval)
def main():
block_filter = web3.eth.filter('latest')
tx_filter = web3.eth.filter('pending')
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
asyncio.gather(
log_loop(block_filter, 2),
log_loop(tx_filter, 2)))
finally:
loop.close()
if __name__ == "__main__":
main()
That is all, the question is that it does not work. please give any advice, thanks.
Best Answer
Update the event to output JSON and utilize your contract in a filter.