Blockchain – Getting the Liquidity Pool Address of a Token Using web3.py on Binance Smart Chain

binance-smart-chainblockchainbscbscscanweb3.py

The snippet below checks for the latest added Liquidity Pool of a token. In poocoin.app, the LP Pool Address can be checked based on the pair (TOKEN/BNB LP Holdings). I wanted to know if web3.py can check LP Pool Address directly of any given Token Address or Contact.

For example, if I wanted to know the data of the token
0xe56842ed550ff2794f010738554db45e60730371 the LP Pool Address 0xe432afB7283A08Be24E9038C30CA6336A7cC8218 will be returned.

import json
from web3 import Web3
import asyncio
import config
import time

bsc = 'https://bsc-dataseed.binance.org/'    
web3 = Web3(Web3.HTTPProvider(bsc))
print("Connected: ", web3.isConnected())

# uniswap factory address and abi = pancakeswap factory
uniswap_factory = '0xcA143Ce32Fe78f1f7019d7d551a6402fC5350c73'  #Testnet  #0x6725F303b657a9451d8BA641348b6761A6CC7a17
uniswap_factory_abi = json.loads('[{"inputs":[{"internalType":"address","name":"_feeToSetter","type":"address"}],"payable":false,"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"token0","type":"address"},{"indexed":true,"internalType":"address","name":"token1","type":"address"},{"indexed":false,"internalType":"address","name":"pair","type":"address"},{"indexed":false,"internalType":"uint256","name":"","type":"uint256"}],"name":"PairCreated","type":"event"},{"constant":true,"inputs":[],"name":"INIT_CODE_PAIR_HASH","outputs":[{"internalType":"bytes32","name":"","type":"bytes32"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"internalType":"uint256","name":"","type":"uint256"}],"name":"allPairs","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"allPairsLength","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"internalType":"address","name":"tokenA","type":"address"},{"internalType":"address","name":"tokenB","type":"address"}],"name":"createPair","outputs":[{"internalType":"address","name":"pair","type":"address"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"feeTo","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"feeToSetter","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"internalType":"address","name":"","type":"address"},{"internalType":"address","name":"","type":"address"}],"name":"getPair","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"internalType":"address","name":"_feeTo","type":"address"}],"name":"setFeeTo","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"internalType":"address","name":"_feeToSetter","type":"address"}],"name":"setFeeToSetter","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"}]')
contract = web3.eth.contract(address=uniswap_factory, abi=uniswap_factory_abi)

def handle_event(event):
    data = Web3.toJSON(event)
    print(data)
    token0 = str(Web3.toJSON(event['args']['token1']))
    token1 = str(Web3.toJSON(event['args']['token0']))
    print("Token0: " + token0)
    print("Token1: " + token1)
    
async def log_loop(event_filter, poll_interval):
    while True:
        for PairCreated in event_filter.get_new_entries():
            handle_event(PairCreated)
        await asyncio.sleep(poll_interval)

def main():
    event_filter = contract.events.PairCreated.createFilter(fromBlock='latest')
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(
            asyncio.gather(
                log_loop(event_filter,2 )))
    finally:
        loop.close()

main()

Result:

Connected:  True
{"args": {
    "token0": "0x999E6b6919F984cf93209900505C9Bd5ECDFc28C",
    "token1": "0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c",
    "pair": "0x46ebB97309a6FB6261d79F29917b238c04F44d51",
    "": 536039}, "event": "PairCreated", "logIndex": 260,
 "transactionIndex": 72,
 "transactionHash": "0x23590c20f63b6722f00ac4352b56055c7fd6ab9c683d748452639b3cd813c440",
 "address": "0xcA143Ce32Fe78f1f7019d7d551a6402fC5350c73",
 "blockHash": "0x21fbac052088a0f6c1cc6ca26e1d175fc1055b56d5227f3a9587b83a09f5eb78",
 "blockNumber": 12582019}

Token0: "0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c"
Token1: "0x999E6b6919F984cf93209900505C9Bd5ECDFc28C"

Best Answer

I think you can get Pool Addressess doing something like this. It's simple solution but you can optimize/improve.

from web3 import Web3

class getPairExample():
    def __init__(self, _token):
        
        #=== Setup Web3 ===#
        self.bsc = 'https://bsc-dataseed.binance.org/' #rpc 
        self.web3 = Web3(Web3.HTTPProvider(self.bsc)) #web3 connect
        self.target = self.web3.toChecksumAddress(_token) #checksum token


        #=== Setup getPair Contract ===#
        #= factory address
        self.pcs_factory_address = "0xcA143Ce32Fe78f1f7019d7d551a6402fC5350c73"

        #= getPair Abi
        self.getPairAbi = """[{"constant":true,"inputs":[{"internalType":"address","name":"","type":"address"},{"internalType":"address","name":"","type":"address"}],"name":"getPair","outputs":[{"internalType":"address","name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"}]"""

        #= creating contract
        self.getPair_Factory_Contract = self.web3.eth.contract(address=self.pcs_factory_address,abi=self.getPairAbi)

        #= function to find Pairs
        self.getPair(self.target)


    def getPair(self, _token):
        pairs = []
        baseTokens = ["0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c", #bnb
                      "0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56", #busd
                      "0x55d398326f99059fF775485246999027B3197955"] #usdt

        for i in baseTokens:
            try: 
                get = self.getPair_Factory_Contract.functions.getPair(_token, i).call()
                pairs.append(get)
            except:
                pairs.append("Not Find")

        print(pairs)
        self.showPair(pairs, _token)


    def showPair(self, _pairs, _token):
        print(f"\nPancakeswap Pairs for token:\n{_token}")
        print(f"\nBNB Pair: {_pairs[0]}")
        print(f"\nBUSD Pair: {_pairs[1]}")
        print(f"\nUSDT Pair: {_pairs[2]}")
            
                

if __name__ == "__main__":
    getPairExample("0xe56842ed550ff2794f010738554db45e60730371") #put any token address here

Output:

Output - getPairExample Code

Related Topic