まず、ユーザーがチャンネルに登録しているかどうかをチェックする Telegram Bot を開発する必要があります。pyTelegramBotAPI==3.6.6
ボットを作成し、Telethon==1.9.0
ユーザーが購読していることを確認するために使用します。
@bot.message_handler
関数を使用してクラスのグローバル インスタンスを呼び出す場所がありますtelethon.sync
。次のようになります。
from telebot import TeleBot
from telethon.sync import TelegramClient
import config # my module with constants
class TeleHelper:
def __init__(self, api_id, api_hash, phone, channel, session_name='session'):
self._client = TelegramClient(session_name, api_id, api_hash)
self._client.connect()
self._setup(phone)
self._channel = channel
def _setup(self, phone): # just setup
if not self._client.is_user_authorized():
self._client.send_code_request(phone)
self._client.sign_in(phone, input('Enter the code: '))
@staticmethod
def get_target(user): # get username or full name
if user.username:
return user.username
else:
return user.first_name + (f' {user.last_name}' if user.last_name else '')
def check_subscription(self, user): # search user in channel members, there is a problem
target = self.get_target(user)
participants = self._client.iter_participants(self._channel, search=target)
ids = [member.id for member in participants]
return user.id in ids
bot = TeleBot(config.bot_token) # bot instance
tg = TeleHelper(config.api_id, config.api_hash, config.phone, config.channel) # instance of the class above
@bot.message_handler(commands=['command'])
def handle_join(message):
if tg.check_subscription(message.from_user): # here problems start
text = 'All is good!'
bot.send_message(message.chat.id, text)
else:
text = 'You have to subscribe @python_lounge'
bot.send_message(message.chat.id, text)
if __name__ == '__main__':
bot.polling()
TelegramClient
すべてが正常に見えるように、テレソンからではなくからインポートしtelethon.sync
ましたが、誤ってエラーが発生しました。
2019-08-24 10:31:07,342 (main.py:65 WorkerThread1) ERROR - TeleBot: "RuntimeError occurred, args=('You must use "async for" if the event loop is running (i.e. you are inside an "async def")',)
Traceback (most recent call last):
File "/root/ContestBot/.venv/lib/python3.7/site-packages/telebot/util.py", line 59, in run
task(*args, **kwargs)
File "main.py", line 99, in handle_join
if tg.check_subscription(message.from_user):
File "/root/ContestBot/main.py", line 25, in check_subscription
ids = [member.id for member in participants]
File "/root/ContestBot/.venv/lib/python3.7/site-packages/telethon/requestiter.py", line 102, in __iter__
'You must use "async for" if the event loop '
RuntimeError: You must use "async for" if the event loop is running (i.e. you are inside an "async def")
"
「async for」を作ろうとしましたが、非同期プログラミングの初心者です。
async def check_subscription(self, user):
ids = []
async for member in self._client.iter_participants(self._channel, search=self.get_target(user)):
await ids.append(member.id)
return user.id in ids
明らかに私は欲しくなりましたが、プログラムはまだ動作しません:
<coroutine object TeleHelper.check_subscription at 0x7ff9bc57f3c8>
/root/ContestBot/.venv/lib/python3.7/site-packages/telebot/util.py:59: RuntimeWarning: coroutine 'TeleHelper.check_subscription' was never awaited
task(*args, **kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
私はPython 3.7.3を使用しています