- pip install web3.py
2、注册 Infura 获得节点服务
使用邮箱注册 Infura 账户后,创建一个项目,即可获得以太坊节点服务,进入设置即可看到链接的URL
6 _0 Z- k& V3 c6 K; x! n* Q
可以选择主网测试网等,会有两个链接,一个是使用HTTPS的一个是使用WebSocket,按你的需求选择一个就行了,注意:Infura 个人免费请求次数,是每天有十万个请求。
3、代码示例
- ( K2 G, T" x+ X- T" J$ C- V, i+ M* J
- from web3 import Web3) Z' y7 p# S" f3 j- ^" q, R& N6 v
- import json
- import time
- import os
- import logging* V3 L$ T# v% G
- from django.conf import settings
- from decimal import Decimal4 s0 O0 G$ i$ V( Y5 r, ]5 u
- 0 q. s, ~# P" O+ A! k* g
- class PayEthOrToken(object):2 }' i" F1 Z! }
-
- def __init__(self):
- # 设置web3
- self.web3 = Web3(Web3.HTTPProvider('your infura http url'))
- # token合约地址
- self.contract_address = 'your contract address'8 I5 k Q! _$ ?) H, v/ q! E6 H
- # 主钱包地址
- self.wallet = 'your wallet address'
- # 钱包的私钥
- self.wallet_key = 'your wallet key'$ E% h' H4 ?6 I$ n6 O
- # 合约的abi test.json 是eth的abi json文件,可以在eth区块链浏览器上获得3 f% h8 l) |0 X# Z {1 g) d! n8 p) } @
- with open('test.json', 'r') as f:' z: X+ C+ |+ _2 x( ^7 g( D3 c
- self.abi = json.loads(f.read())' [1 U' e/ b! g1 t" ^! N
- # 生成合约8 b4 ~9 {- F8 t6 l
- self.contract = self.web3.eth.contract(address=self.contract_address, abi=self.abi)
- # 代币简写
- self.token_name = 'USDT'
-
- def transfer_usdt(self, to, value):
- '''进行代币转账
- args:5 l- D, u1 R/ M) |1 F7 q
- to str:接收代币的地址2 L0 x# y. P O! B5 R2 j* {: j1 Z
- value str/int:代币数量,以ether为单位,可以是字符串和int类型
- returns:, H% g; C. K# k& C
- (str, str):返回交易哈希,以及异常信息
- '''0 g. @# V. P3 O: A" F4 D. K
- try:! ?" p( N2 ^5 ]; k2 x3 S1 t, o, Q
- token_balance = self.web3.fromWei(self.contract.functions.balanceOf(self.wallet).call(), 'ether')
- # 如果代币不足返回异常4 j9 g+ S% F# b2 Z) k
- if Decimal(token_balance) < Decimal(value):. r1 D G' E+ }* k4 d2 j
- return None, 'Platform USDT token is insufficient, please try again later'" N; q5 O* o2 y8 v" N7 A
- # 进行转账代币0 u8 ?& s, ]7 b0 c. b9 x1 M# I
- nonce = self.web3.eth.get_transaction_count(self.wallet)1 M, e" D' Q3 t2 b
- tx = {
- 'from': self.wallet,) }! a8 l* a2 b( p9 ]8 g
- 'nonce': nonce,8 y0 W4 p# q; Q, G; @
- 'gas': 100000,
- 'gasPrice': self.web3.toWei('50', 'gwei'),0 b, W( v7 c5 g* I( E H
- 'chainId': 1
- }
- to = Web3.toChecksumAddress(to)* I" J, P: q* z7 L% c
- txn = self.contract.functions.transfer(to, self.web3.toWei(value, 'ether')).buildTransaction(tx)+ {* L+ S# d8 \0 o" d) p' b
- signed_txn = self.web3.eth.account.sign_transaction(txn, private_key=self.wallet_key)
- tx_hash = self.web3.eth.send_raw_transaction(signed_txn.rawTransaction)% b( t3 c& t* }# j$ s3 h' q: X
- return self.web3.toHex(tx_hash), 'pay success'
- except Exception as e:
- logging.error(f'转账{self.token_name}代币时发生异常:{e}')1 G) b* F. Z l8 m6 U
- logging.exception(e)
- return None, str(e)
- 2 G9 U- ?; u8 V K% \* N) F1 y
- def transfer_eth(self, to, value):; k( z6 [3 h4 l$ H8 y1 \, r$ Q; G- G
- '''进行eth转账- u& l* s6 c1 y5 @% n; G
- args:% L8 e/ {4 n; b8 L. v) O
- to str:接收以太坊的地址: V" I$ L2 Z7 h$ j! L
- value str/int:数量,以ether为单位,可以是字符串和int类型7 L' }1 c( k0 i) i
- returns:5 c, q p$ j% y4 O. b) c; v
- str:返回交易哈希3 X( M* ]3 X* t
- '''( Q S* z& M5 [: R
- try:
- token_balance = self.web3.fromWei(self.web3.eth.get_balance(self.wallet), 'ether')
- # 如果代币不足返回异常0 v( U# X) [) [
- if Decimal(token_balance) < Decimal(value):
- return None, 'Platform ETH token is insufficient, please try again later'
- # 获取 nonce,这个是交易计数; L9 }7 y' X* t0 c1 A# ~: s
- to = Web3.toChecksumAddress(to): s+ e# v7 h6 q4 ~+ w. k. w
- nonce = self.web3.eth.get_transaction_count(self.wallet)+ g- o7 \& [& b' D0 H* A! ^: a
- tx = {
- 'nonce': nonce,5 O7 a( z, l4 U! K
- 'to': to,! k/ ]- D3 o5 G- q# J1 @4 V! [' ~
- 'gas': 100000,
- 'gasPrice': self.web3.toWei('50', 'gwei'),3 o! L6 W* U3 q7 j
- 'value': self.web3.toWei(value, 'ether'),
- 'chainId': 1
- }. U4 t4 h' @3 i k9 b2 z. g! I
- # 签名交易
- signed_tx = self.web3.eth.account.sign_transaction(tx, self.wallet_key)
- tx_hash = self.web3.eth.send_raw_transaction(signed_tx.rawTransaction)
- return self.web3.toHex(tx_hash), 'pay success'
- except Exception as e:$ J; v4 L1 \' i) R
- logging.error(f'转账eth时发生异常:{e}')" G$ k% }% A$ U5 M p
- logging.exception(e)% s" F% P; c4 q
- return None, str(e)* [# u4 C9 h8 D( K% w+ Y7 U k