klafytg/klafytg/bot.py
2020-04-19 17:59:12 +02:00

210 lines
7.3 KiB
Python

import re
import random
import importlib
import datetime
import logging
import os
import toml
import telegram
from telegram.ext import MessageHandler, Filters
from telegram.ext import Updater, Handler
from telegram.ext import CommandHandler
from telegram import InlineQueryResultArticle, InputTextMessageContent
from telegram.ext import InlineQueryHandler
class Bot:
def __init__(self, filename):
"""Initialize a bot object
Args:
filename: config file for the bot
"""
logging.info("Loading configuration from %s.", filename)
self.config = toml.load(filename)
self.name = self.config["bot"]['name']
self.min_time = self.config["bot"].get("min_time", 0)
self.bot = telegram.Bot(os.environ[self.config["bot"]['token']])
self.channels = {}
self.reactions = {}
for k in self.config.get("match", {}).keys():
self.add_reaction(k, self.config["match"][k])
for channel in self.config["bot"].get("channels", []):
self.channels[int(channel)] = datetime.datetime(1,1,1)
self.all_channel_allowed = not self.config["bot"].get("channels", [])
self.inline = self.config["bot"].get("inline", "ping")
self.commands = {}
self.updater = Updater(token=os.environ[self.config["bot"]["token"]], use_context=True)
self.dispatcher = self.updater.dispatcher
if "inline_python" in self.config["bot"]:
self.on_inline = self.fetch_callback(self.config["bot"]["inline_python"])
for command in self.config.get("cmd", {}).keys():
self.create_cmd(command)
handler = CommandHandler(command, getattr(self, "on_"+command))
self.dispatcher.add_handler(handler)
self.match_handler = MessageHandler(Filters.text, self.on_message)
self.inline_quote_handler = InlineQueryHandler(self.on_inline)
self.dispatcher.add_handler(self.inline_quote_handler)
self.dispatcher.add_handler(self.match_handler)
def create_cmd(self, name):
self.commands[name] = []
for quote in self.config["cmd"][name].get("quotes", []):
self.commands[name].append(quote)
for python in self.config["cmd"][name].get("python", []):
self.commands[name].append(self.fetch_callback(python))
def fun(update, context):
if not self.is_allowed(update, context):
context.bot.send_message(
chat_id=update.effective_chat.id,
text="Sorry, mamma told me not to talk with random strangers. You need to allow chat id {}.".format(update.effective_chat.id),
reply_to_message_id=update.effective_message.message_id
)
return
quote = random.choice(self.commands[name])
if callable(quote):
quote = quote(self, update, context)
else:
c = {
"channel": update.effective_chat.title,
"name": self.name,
"user": update.effective_user.name,
"message": update.effective_message.text,
}
quote = quote.format(**c)
if quote:
context.bot.send_message(chat_id=update.effective_chat.id, text=quote, reply_to_message_id=update.effective_message.message_id)
setattr(self, 'on_'+name, fun)
def is_allowed(self, update, context):
if self.all_channel_allowed:
return True
if update.inline_query:
return True
return update.effective_chat.id in self.channels
def start(self):
self.updater.start_polling()
def stop(self):
self.updater.stop()
def on_join(self):
pass
def on_inline(self, update, context):
if not self.is_allowed(update, context):
return
query = update.inline_query.query
if not query:
return
results = list()
c = {
"channel": getattr(update.effective_chat, "title", ""),
"name": self.name,
"user": update.effective_user.name,
"message": getattr(update.effective_message, "text", ""),
}
for i,quote in enumerate(self.commands.get(self.inline, [])):
actual_quote = ""
if not callable(quote):
actual_quote = quote.format(**c)
else:
actual_quote = quote(self, username, channel, message)
if re.search(query, actual_quote) or not query:
results.append(
InlineQueryResultArticle(
id=i,
title=actual_quote,
input_message_content=InputTextMessageContent(actual_quote)
)
)
context.bot.answer_inline_query(update.inline_query.id, results)
def on_message(self, update, context):
if not self.is_allowed(update, context):
return
user = update.effective_user
channel = update.effective_chat
message = update.effective_message
answer = self.get_reaction(user, channel, message)
if answer:
context.bot.send_message(chat_id=update.effective_chat.id, text=answer[0], reply_to_message_id=update.effective_message.message_id)
def add_reaction(self, match, reaction):
"""Add a reaction to the bot.
Args:
match: The string which, if matched will trigger the answer.
reaction: The string which will be sent.
"""
self.reactions[match] = reaction
def add_python_reaction(self, match, reaction):
""" Add a Python callback to the reactions.
Args:
match: The string which, if matched will trigger the answer.
reaction: The path to the callback
"""
self.add_reaction(match, self.fetch_callback(reaction))
def fetch_callback(self, path):
"""Fetch a Python callable"""
s = path.split(".")
module, callback = ".".join(s[:-1]), s[-1]
module = importlib.import_module(module)
return getattr(module, callback)
def get_reaction(self, user, channel, message):
"""Get a reaction to a message.
Args:
user: The user who sent the message.
channel: The channel on which the bot speak.
message: The message to which the bot has to react.
Returns:
Every matched reactions.
"""
if channel.id in self.channels and (datetime.datetime.now() - self.channels[channel.id]).total_seconds() < self.min_time:
return []
context = {
"channel": channel.title,
"name": self.name,
"user": user.name,
"message": message.text,
}
result = []
logging.debug("Looking for reactions.")
for m in self.reactions.keys():
search = re.search(m.format(**context), message.text)
if search:
r = self.reactions[m]
if callable(r):
r = r(self, username, channel, message)
else:
r = r.format(**context)
result.append(r)
if len(result) > 0:
self.channels[channel.id] = datetime.datetime.now()
return result