Telegram 机器人的 TON Connect - Python
在本教程中,我们将创建一个示例 Telegram 机器人,该机器人支持使用 Python TON Connect SDK pytonconnect 的 TON Connect 2.0 认证。 我们将分析连接钱包、发送交易、获取有关已连接钱包的数据以及断开钱包的连接。
打开演示机器人
查看 GitHub
准备工作
安装库
要制作机器人,我们将使用 aiogram 3.0 Python 库。
要开始将 TON Connect 集成到您的 Telegram 机器人中,您需要安装 pytonconnect 包。
并且,为了使用 TON 原语并解析用户地址,我们需要 pytoniq-core。
您可以使用 pip 来完成此操作:
pip install aiogram pytoniq-core python-dotenv
pip install pytonconnect
设置配置
在 .env 文件中指定 机器人令牌 和 TON Connect 清单文件 的链接。之后在 config.py 中加载它们:
# .env
TOKEN='1111111111:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'  # your bot token here
MANIFEST_URL='https://raw.githubusercontent.com/XaBbl4/pytonconnect/main/pytonconnect-manifest.json'
# config.py
from os import environ as env
from dotenv import load_dotenv
load_dotenv()
TOKEN = env['TOKEN']
MANIFEST_URL = env['MANIFEST_URL']
创建简单机器人
创建 main.py 文件,其中将包含主要机器人代码:
# main.py
import sys
import logging
import asyncio
import config
from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, CallbackQuery
logger = logging.getLogger(__file__)
dp = Dispatcher()
bot = Bot(config.TOKEN, parse_mode=ParseMode.HTML)
    
@dp.message(CommandStart())
async def command_start_handler(message: Message):
    await message.answer(text='Hi!')
async def main() -> None:
    await bot.delete_webhook(drop_pending_updates=True)  # skip_updates = True
    await dp.start_polling(bot)
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, stream=sys.stdout)
    asyncio.run(main())
钱包连接
TON Connect 存储
让我们为 TON Connect 创建简单的存储
# tc_storage.py
from pytonconnect.storage import IStorage, DefaultStorage
storage = {}
class TcStorage(IStorage):
    def __init__(self, chat_id: int):
        self.chat_id = chat_id
    def _get_key(self, key: str):
        return str(self.chat_id) + key
    async def set_item(self, key: str, value: str):
        storage[self._get_key(key)] = value
    async def get_item(self, key: str, default_value: str = None):
        return storage.get(self._get_key(key), default_value)
    async def remove_item(self, key: str):
        storage.pop(self._get_key(key))
连接处理器
首先,我们需要一个为每个用户返回不同实例 的函数:
# connector.py
from pytonconnect import TonConnect
import config
from tc_storage import TcStorage
def get_connector(chat_id: int):
    return TonConnect(config.MANIFEST_URL, storage=TcStorage(chat_id))
其次,让我们在 command_start_handler() 中添加连接处理器:
# main.py
@dp.message(CommandStart())
async def command_start_handler(message: Message):
    chat_id = message.chat.id
    connector = get_connector(chat_id)
    connected = await connector.restore_connection()
    mk_b = InlineKeyboardBuilder()
    if connected:
        mk_b.button(text='Send Transaction', callback_data='send_tr')
        mk_b.button(text='Disconnect', callback_data='disconnect')
        await message.answer(text='You are already connected!', reply_markup=mk_b.as_markup())
    else:
        wallets_list = TonConnect.get_wallets()
        for wallet in wallets_list:
            mk_b.button(text=wallet['name'], callback_data=f'connect:{wallet["name"]}')
        mk_b.adjust(1, )
        await message.answer(text='Choose wallet to connect', reply_markup=mk_b.as_markup())
现在,对于尚未连接钱包的用户,机器人会发送带有所有可用钱包按钮的消息。
因此,我们需要编写函数来处理 connect:{wallet["name"]} 回调:
# main.py
async def connect_wallet(message: Message, wallet_name: str):
    connector = get_connector(message.chat.id)
    wallets_list = connector.get_wallets()
    wallet = None
    for w in wallets_list:
        if w['name'] == wallet_name:
            wallet = w
    if wallet is None:
        raise Exception(f'Unknown wallet: {wallet_name}')
    generated_url = await connector.connect(wallet)
    mk_b = InlineKeyboardBuilder()
    mk_b.button(text='Connect', url=generated_url)
    await message.answer(text='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
    mk_b = InlineKeyboardBuilder()
    mk_b.button(text='Start', callback_data='start')
    for i in range(1, 180):
        await asyncio.sleep(1)
        if connector.connected:
            if connector.account.address:
                wallet_address = connector.account.address
                wallet_address = Address(wallet_address).to_str(is_bounceable=False)
                await message.answer(f'You are connected with address <code>{wallet_address}</code>', reply_markup=mk_b.as_markup())
                logger.info(f'Connected with address: {wallet_address}')
            return
    await message.answer(f'Timeout error!', reply_markup=mk_b.as_markup())
@dp.callback_query(lambda call: True)
async def main_callback_handler(call: CallbackQuery):
    await call.answer()
    message = call.message
    data = call.data
    if data == "start":
        await command_start_handler(message)
    elif data == "send_tr":
        await send_transaction(message)
    elif data == 'disconnect':
        await disconnect_wallet(message)
    else:
        data = data.split(':')
        if data[0] == 'connect':
            await connect_wallet(message, data[1])
机器人给用户 3 分钟时间连接钱包,之后会报告超时错误。
实现交易请求
让我们以 消息构建器 文章之一为例:
# messages.py
from base64 import urlsafe_b64encode
from pytoniq_core import begin_cell
def get_comment_message(destination_address: str, amount: int, comment: str) -> dict:
    data = {
        'address': destination_address,
        'amount': str(amount),
        'payload': urlsafe_b64encode(
            begin_cell()
            .store_uint(0, 32)  # op code for comment message
            .store_string(comment)  # store comment
            .end_cell()  # end cell
            .to_boc()  # convert it to boc
        )
        .decode()  # encode it to urlsafe base64
    }
    return data
并在 main.py 文件中添加 send_transaction() 函数:
# main.py
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
    connector = get_connector(message.chat.id)
    connected = await connector.restore_connection()
    if not connected:
        await message.answer('Connect wallet first!')
        return
    
    transaction = {
        'valid_until': int(time.time() + 3600),
        'messages': [
            get_comment_message(
                destination_address='0:0000000000000000000000000000000000000000000000000000000000000000',
                amount=int(0.01 * 10 ** 9),
                comment='hello world!'
            )
        ]
    }
    await message.answer(text='Approve transaction in your wallet app!')
    await connector.send_transaction(
        transaction=transaction
    )
但我们也应该处理可能的错误,所以我们将 send_transaction 方法放入 try - except 语句中:
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
    ...
    await message.answer(text='Approve transaction in your wallet app!')
    try:
        await asyncio.wait_for(connector.send_transaction(
            transaction=transaction
        ), 300)
    except asyncio.TimeoutError:
        await message.answer(text='Timeout error!')
    except pytonconnect.exceptions.UserRejectsError:
        await message.answer(text='You rejected the transaction!')
    except Exception as e:
        await message.answer(text=f'Unknown error: {e}')
添加断开连接处理器
这个函数的实现非常简单:
async def disconnect_wallet(message: Message):
    connector = get_connector(message.chat.id)
    await connector.restore_connection()
    await connector.disconnect()
    await message.answer('You have been successfully disconnected!')
目前,项目结构如下:
.
.env
├── config.py
├── connector.py
├── main.py
├── messages.py
└── tc_storage.py
main.py 的内容如下:
展示 main.py
# main.py
import sys
import logging
import asyncio
import time
import pytonconnect.exceptions
from pytoniq_core import Address
from pytonconnect import TonConnect
import config
from messages import get_comment_message
from connector import get_connector
from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.types import Message, CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder
logger = logging.getLogger(__file__)
dp = Dispatcher()
bot = Bot(config.TOKEN, parse_mode=ParseMode.HTML)
@dp.message(CommandStart())
async def command_start_handler(message: Message):
    chat_id = message.chat.id
    connector = get_connector(chat_id)
    connected = await connector.restore_connection()
    mk_b = InlineKeyboardBuilder()
    if connected:
        mk_b.button(text='Send Transaction', callback_data='send_tr')
        mk_b.button(text='Disconnect', callback_data='disconnect')
        await message.answer(text='You are already connected!', reply_markup=mk_b.as_markup())
    else:
        wallets_list = TonConnect.get_wallets()
        for wallet in wallets_list:
            mk_b.button(text=wallet['name'], callback_data=f'connect:{wallet["name"]}')
        mk_b.adjust(1, )
        await message.answer(text='Choose wallet to connect', reply_markup=mk_b.as_markup())
@dp.message(Command('transaction'))
async def send_transaction(message: Message):
    connector = get_connector(message.chat.id)
    connected = await connector.restore_connection()
    if not connected:
        await message.answer('Connect wallet first!')
        return
    transaction = {
        'valid_until': int(time.time() + 3600),
        'messages': [
            get_comment_message(
                destination_address='0:0000000000000000000000000000000000000000000000000000000000000000',
                amount=int(0.01 * 10 ** 9),
                comment='hello world!'
            )
        ]
    }
    await message.answer(text='Approve transaction in your wallet app!')
    try:
        await asyncio.wait_for(connector.send_transaction(
            transaction=transaction
        ), 300)
    except asyncio.TimeoutError:
        await message.answer(text='Timeout error!')
    except pytonconnect.exceptions.UserRejectsError:
        await message.answer(text='You rejected the transaction!')
    except Exception as e:
        await message.answer(text=f'Unknown error: {e}')
async def connect_wallet(message: Message, wallet_name: str):
    connector = get_connector(message.chat.id)
    wallets_list = connector.get_wallets()
    wallet = None
    for w in wallets_list:
        if w['name'] == wallet_name:
            wallet = w
    if wallet is None:
        raise Exception(f'Unknown wallet: {wallet_name}')
    generated_url = await connector.connect(wallet)
    mk_b = InlineKeyboardBuilder()
    mk_b.button(text='Connect', url=generated_url)
    await message.answer(text='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
    mk_b = InlineKeyboardBuilder()
    mk_b.button(text='Start', callback_data='start')
    for i in range(1, 180):
        await asyncio.sleep(1)
        if connector.connected:
            if connector.account.address:
                wallet_address = connector.account.address
                wallet_address = Address(wallet_address).to_str(is_bounceable=False)
                await message.answer(f'You are connected with address <code>{wallet_address}</code>', reply_markup=mk_b.as_markup())
                logger.info(f'Connected with address: {wallet_address}')
            return
    await message.answer(f'Timeout error!', reply_markup=mk_b.as_markup())
async def disconnect_wallet(message: Message):
    connector = get_connector(message.chat.id)
    await connector.restore_connection()
    await connector.disconnect()
    await message.answer('You have been successfully disconnected!')
@dp.callback_query(lambda call: True)
async def main_callback_handler(call: CallbackQuery):
    await call.answer()
    message = call.message
    data = call.data
    if data == "start":
        await command_start_handler(message)
    elif data == "send_tr":
        await send_transaction(message)
    elif data == 'disconnect':
        await disconnect_wallet(message)
    else:
        data = data.split(':')
        if data[0] == 'connect':
            await connect_wallet(message, data[1])
async def main() -> None:
    await bot.delete_webhook(drop_pending_updates=True)  # skip_updates = True
    await dp.start_polling(bot)
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, stream=sys.stdout)
    asyncio.run(main())
改进
添加持久存储 - Redis
目前,我们的 TON Connect 存储使用字典,导致机器人重启后会丢失会话。 让我们使用 Redis 添加永久数据库存储:
在启动 Redis 数据库后安装用于与之交互的 python 库:
pip install redis
并在 tc_storage.py 中更新 TcStorage 类:
import redis.asyncio as redis
client = redis.Redis(host='localhost', port=6379)
class TcStorage(IStorage):
    def __init__(self, chat_id: int):
        self.chat_id = chat_id
    def _get_key(self, key: str):
        return str(self.chat_id) + key
    async def set_item(self, key: str, value: str):
        await client.set(name=self._get_key(key), value=value)
    async def get_item(self, key: str, default_value: str = None):
        value = await client.get(name=self._get_key(key))
        return value.decode() if value else default_value
    async def remove_item(self, key: str):
        await client.delete(self._get_key(key))
添加二维码
安装 python qrcode 包以生成它们:
pip install qrcode
更改 connect_wallet() 函数,使其生成二维码并以图片形式发送给用户:
from io import BytesIO
import qrcode
from aiogram.types import BufferedInputFile
async def connect_wallet(message: Message, wallet_name: str):
    ...
    
    img = qrcode.make(generated_url)
    stream = BytesIO()
    img.save(stream)
    file = BufferedInputFile(file=stream.getvalue(), filename='qrcode')
    await message.answer_photo(photo=file, caption='Connect wallet within 3 minutes', reply_markup=mk_b.as_markup())
    
    ...
总结
接下来可以做什么?
- 您可以在机器人中添加更好的错误处理。
- 您可以添加启动文本和类似 /connect_wallet的命令。