Source code for ManifoldMarketManager.application

"""Contains functions which are needed to run the runner script, but nowhere else."""

from __future__ import annotations

from argparse import ArgumentParser, Namespace
from asyncio import get_event_loop, new_event_loop, set_event_loop
from dataclasses import dataclass
from datetime import datetime, timedelta
from itertools import count
from logging import getLogger
from os import getenv
from pathlib import Path
from sqlite3 import PARSE_COLNAMES, PARSE_DECLTYPES, connect
from time import sleep
from traceback import format_exc
from typing import TYPE_CHECKING, Tuple, cast

from telegram import __version__ as TG_VER

try:
    from telegram import __version_info__
except ImportError:
    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]

if __version_info__ < (20, 0, 0, "alpha", 1):
    raise RuntimeError(
        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
        f"{TG_VER} version of this example, "
        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
    )

from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CallbackQueryHandler

from . import market, require_env
from .consts import AVAILABLE_SCANNERS, EnvironmentVariable, MarketStatus, Response

if TYPE_CHECKING:  # pragma: no cover
    from sqlite3 import Connection
    from typing import Any

    from telegram import Update
    from telegram.ext import ContextTypes

    from . import Market

logger = getLogger(__name__)


[docs]def parse_args(*args: Any, **kwargs: Any) -> Namespace: """Parse arguments for the CLI.""" main_parser = ArgumentParser() main_parser.add_argument('--no-logging', action='store_false', dest='logging', default=True) main_parser.add_argument('-v', '--verbose', action='count', default=0) main_parser.add_argument('--just-parse', action='store_true', default=False) subparsers = main_parser.add_subparsers() import_parser = subparsers.add_parser('import') import_parser.add_argument('account', action='store', type=str) import_parser.add_argument('file', action='store', type=str, nargs='?') import_parser.add_argument('--interactive', action='store_true') group = import_parser.add_mutually_exclusive_group(required=False) group.add_argument('--yaml', action='store_true') group.add_argument('--json', action='store_true') group.add_argument('--repl', action='store_true') import_parser.set_defaults(func=import_command) # TODO: add templates here quick_import_parser = subparsers.add_parser('quick-import') quick_import_parser.add_argument('account', action='store', type=str) quick_import_parser.add_argument( '--resolve-when', nargs=2, action='append', help="Should be a qualified rule name, followed by a JSON string of its initializers" ) quick_import_parser.add_argument( '--resolve-to', nargs=2, action='append', required=True, help="Should be a qualified rule name, followed by a JSON string of its initializers" ) quick_import_parser.add_argument('-n', '--notes', type=str, action='store', default='') group = quick_import_parser.add_mutually_exclusive_group(required=True) group.add_argument('-u', '--url', action='store', type=str) group.add_argument('-s', '--slug', action='store', type=str) group.add_argument('-i', '--id', dest='id_', action='store', type=str) quick_import_parser.add_argument('-c', '--check-rate', action='store', dest='rate', help='Check rate in hours') quick_import_parser.add_argument('-rnd', '--round', dest='round_', action='store_true') quick_import_parser.add_argument('-cur', '--current', action='store_true') quick_import_parser.add_argument( '-rd', '--rel-date', action='store', dest='rel_date', help='Please give as "year/month/day" or "year-month-day". Used in: poll, git PR' ) quick_import_parser.add_argument( '-pr', '--pull-request', action='store', dest='pr_slug', help='Please give as "owner/repo/num"' ) quick_import_parser.add_argument('-pb', '--pull-binary', action='store_true', dest='pr_bin') quick_import_parser.add_argument('-rs', '--random-seed', action='store') quick_import_parser.add_argument('-rr', '--random-rounds', action='store', type=int, default=1) quick_import_parser.add_argument('-ri', '--random-index', action='store_true') quick_import_parser.add_argument('-is', '--index-size', action='store', type=int) quick_import_parser.set_defaults(func=quick_create_command) # must finish import_parser first create_parser = subparsers.add_parser('create', parents=[import_parser], add_help=False) create_parser.add_argument('--queue-if-no-funds', action='store_true') create_parser.add_argument('--queue', action='store_true') create_parser.set_defaults(func=create_command) quick_create_parser = subparsers.add_parser('quick-create') quick_create_parser.add_argument( 'type', type=str, choices=["BINARY", "PSEUDO_NUMERIC", "FREE_RESPONSE", "MULTIPLE_CHOICE"] ) quick_create_parser.add_argument('account', action='store', type=str) quick_create_parser.add_argument('close-on', action='store', type=str) quick_create_parser.add_argument( '--resolve-when', nargs=2, action='append', help="Should be a qualified rule name, followed by a JSON string of its initializers" ) quick_create_parser.add_argument( '--resolve-to', nargs=2, action='append', required=True, help="Should be a qualified rule name, followed by a JSON string of its initializers" ) quick_create_parser.add_argument('-n', '--notes', type=str, action='store', default='') quick_create_parser.set_defaults(func=quick_create_command) scan_parser = subparsers.add_parser('scan') scan_parser.add_argument('--disable-all', action='store_false', dest='all_scanners', default=True) for scanner in AVAILABLE_SCANNERS: scan_parser.add_argument( f'--enable-{scanner.replace(".", "-")}', dest='scanners', action='append_const', const=scanner ) scan_parser.set_defaults(func=scan_command) run_parser = subparsers.add_parser('run') run_parser.add_argument('--enable-all-scanners', action='store_true', dest='all_scanners', default=False) for scanner in AVAILABLE_SCANNERS: run_parser.add_argument( f'--enable-{scanner.replace(".", "-")}', dest='scanners', action='append_const', const=scanner ) run_parser.add_argument( '-r', '--refresh', action='store_true', help="Ignore time last checked and look at all markets immediately" ) run_parser.add_argument('-c', '--console-only', action='store_true') run_parser.set_defaults(func=run_command) loop_parser = subparsers.add_parser('loop', parents=[run_parser], add_help=False) loop_parser.add_argument( '-p', '--period', action='store', type=float, help='how long to wait between loops, in minutes' ) loop_parser.add_argument( '-t', '--times', action='store', type=float, default=float('inf'), help='how many times to loop (default infinity)' ) loop_parser.set_defaults(func=loop_command) edit_parser = subparsers.add_parser('edit') edit_parser.add_argument('ids', nargs='+', type=int) edit_parser.set_defaults(func=edit_command) remove_parser = subparsers.add_parser('remove') remove_parser.add_argument('ids', nargs='+', type=int) remove_parser.add_argument('--assume-yes', '-y', action='store_true') remove_parser.set_defaults(func=remove_command) list_parser = subparsers.add_parser('list') list_parser.add_argument('--stats', action='store_true') list_parser.add_argument('--sig-figs', action='store', type=int, default=4) list_parser.set_defaults(func=list_command) parsed: Namespace = main_parser.parse_args(*args, **kwargs) if hasattr(parsed, 'all_scanners') and parsed.all_scanners: parsed.scanners = AVAILABLE_SCANNERS return parsed
[docs]def _print_uncaught_args(kwargs: dict[str, Any]) -> None: if getenv("DEBUG") and kwargs: print("Unrecognized arguments:") print("\n".join(f'{key}: {value}' for key, value in kwargs.items()))
[docs]def import_command(**kwargs: Any) -> int: """Import markets from a file without creating any.""" _print_uncaught_args(kwargs) return -1
[docs]def quick_import_command( url: str | None = None, slug: str | None = None, id_: str | None = None, rel_date: str | None = None, random_index: bool = False, random_seed: bool = False, random_rounds: int = 1, round_: bool = False, current: bool = False, index_size: int | None = None, pr_slug: str | None = None, pr_bin: bool = False, **kwargs: Any ) -> int: """Import a single market using the old-style arguments.""" _print_uncaught_args(kwargs) if url: mkt = Market.from_url(url) elif slug: mkt = Market.from_slug(slug) else: mkt = Market.from_id(cast(str, id_)) if rel_date: sections = rel_date.split('/') if len(sections) == 1: sections = rel_date.split('-') try: date: None | tuple[int, int, int] = tuple(int(x) for x in sections) # type: ignore[assignment] except ValueError: raise else: date = None if random_index: from .rule.generic import ResolveRandomIndex mkt.resolve_to_rules.append( ResolveRandomIndex(random_seed, size=index_size, rounds=random_rounds) ) if round_: from .rule.manifold.this import RoundValueRule mkt.resolve_to_rules.append(RoundValueRule()) # type: ignore if current: from .rule.manifold.this import CurrentValueRule mkt.resolve_to_rules.append(CurrentValueRule()) if pr_slug: from .rule.github import ResolveToPR, ResolveToPRDelta, ResolveWithPR pr_: list[str | int] = list(pr_slug.split('/')) pr_[-1] = int(pr_[-1]) pr = cast(Tuple[str, str, int], tuple(pr_)) mkt.do_resolve_rules.append(ResolveWithPR(*pr)) if date: mkt.resolve_to_rules.append(ResolveToPRDelta(*pr, datetime(*date))) elif pr_bin: mkt.resolve_to_rules.append(ResolveToPR(*pr)) else: raise ValueError("No resolve rule provided") if not mkt.do_resolve_rules: if not date: from .rule.manifold.this import ThisMarketClosed mkt.do_resolve_rules.append(ThisMarketClosed()) else: from .rule.generic import ResolveAtTime mkt.do_resolve_rules.append(ResolveAtTime(datetime(*date))) with register_db() as conn: idx = max(((0, ), *conn.execute("SELECT id FROM markets;")))[0] + 1 conn.execute("INSERT INTO markets values (?, ?, ?, ?);", (idx, mkt, 1, None)) conn.commit() msg = f"Successfully added as ID {idx}!" print(msg) logger.info(msg) return 0
[docs]def create_command(**kwargs: Any) -> int: """Create markets from a file, then import them.""" _print_uncaught_args(kwargs) return -1
[docs]def quick_create_command(**kwargs: Any) -> int: """Quickly create a single market without need for a file, then import it.""" _print_uncaught_args(kwargs) return -1
[docs]def scan_command(**kwargs: Any) -> int: """Scan services for markets to create.""" _print_uncaught_args(kwargs) return -1
[docs]def run_command( refresh: bool = False, console_only: bool = False, scanners: list[str] = None, # type: ignore[assignment] **kwargs: Any ) -> int: """Go through our markets and take actions if needed.""" _print_uncaught_args(kwargs) return main(refresh, console_only) or 0
[docs]def loop_command( period: float = 5, times: float = 5, **kwargs: Any ) -> int: """Run this service multiple times.""" # TODO: turn this into an event queue instead for i in count(): if i > times: break run_command(**kwargs) sleep(period * 60) return 0
[docs]def edit_command(**kwargs: Any) -> int: """Edit a market from a temporary file or repl.""" _print_uncaught_args(kwargs) return -1
[docs]def remove_command( ids: list[int], assume_yes: bool = False, **kwargs: Any ) -> int: """Remove markets from the database.""" _print_uncaught_args(kwargs) for id_ in ids: with register_db() as conn: try: ((mkt, ), ) = conn.execute( "SELECT market FROM markets WHERE id = ?;", (id_, ) ) except ValueError: print(f"No market with id {id_} exists.") return 1 question = f'Are you sure you want to remove {id_}: "{mkt.market.question}" (y/N)?' if (assume_yes or input(question).lower().startswith('y')): conn.execute( "DELETE FROM markets WHERE id = ?;", (id_, ) ) conn.commit() logger.info(f"{id_} removed from db") return 0
[docs]def list_command( stats: bool = False, verbose: int = 0, sig_figs: int = 4, **kwargs: Any ) -> int: """List markets from the database in varying verbosity.""" _print_uncaught_args(kwargs) with register_db() as conn: id_: int mkt: Market check_rate: float last_check: datetime | None for id_, mkt, check_rate, last_check in conn.execute("SELECT * FROM markets"): info = f"Market ID: {id_} (internal), {mkt.id} (manifold)\n" hours = int(check_rate) minutes = (check_rate - hours) // 60 seconds = ((check_rate - hours) / 60 - minutes) // 60 info += f"Checks every {hours}:{minutes}:{seconds}\tLast checked: {last_check}\n" info += f"Question: {mkt.market.question}\n" if verbose: info += mkt.explain_abstract(sig_figs=sig_figs) + "\n" print(info) return 0
[docs]@dataclass class State: """Keeps track of global state for while the Telegram Bot is running.""" application: Application = None # type: ignore last_response: Response = Response.NO_ACTION last_text: str = ""
state = State() keyboard1 = [ [ InlineKeyboardButton("Do Nothing", callback_data=Response.NO_ACTION), InlineKeyboardButton("Resolve to Default", callback_data=Response.USE_DEFAULT), ], [InlineKeyboardButton("Cancel Market", callback_data=Response.CANCEL)], ] keyboard2 = [ [ InlineKeyboardButton("Yes", callback_data="YES"), InlineKeyboardButton("No", callback_data="NO"), ], ] @require_env(EnvironmentVariable.DBName) def register_db() -> Connection: """Get a connection to the appropriate database for this bot.""" name = getenv("DBName") if name is None: raise EnvironmentError() do_initialize = not Path(name).exists() conn = connect(name, detect_types=PARSE_COLNAMES | PARSE_DECLTYPES) if do_initialize: conn.execute("CREATE TABLE markets " "(id INTEGER, market Market, check_rate REAL, last_checked TIMESTAMP);") conn.commit() logger.info("Database up and initialized.") return conn
[docs]async def buttons(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Parse the CallbackQuery and update the message text.""" logger.info("Got into the buttons handler") query = update.callback_query if query is None or query.data is None: raise ValueError() # CallbackQueries need to be answered, even if no notification to the user is needed # Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery await query.answer() logger.info("Got a response from Telegram (%r)", query.data) if query.data in ("YES", "NO"): state.last_text += "\n" + query.data await query.edit_message_text(text=state.last_text) if query.data != "YES": logger.info("Was not told yes. Backing out to ask again") reply_markup = InlineKeyboardMarkup(keyboard1) await query.edit_message_reply_markup(reply_markup=reply_markup) else: logger.info("Confirmation received, shutting dowm Telegram subsystem") get_event_loop().stop() # lets telegram bot know it can stop else: state.last_response = Response(int(query.data)) logger.info("This corresponds to %r", state.last_response) reply_markup = InlineKeyboardMarkup(keyboard2) state.last_text += f"\nSelected option: {state.last_response.name}. Are you sure?" await query.edit_message_text(text=state.last_text) await query.edit_message_reply_markup(reply_markup=reply_markup)
@require_env(EnvironmentVariable.TelegramAPIKey, EnvironmentVariable.TelegramChatID) def tg_main(text: str) -> Response: """Run the bot.""" async def post_init(self): # type: ignore reply_markup = InlineKeyboardMarkup(keyboard1) chat_id = getenv("TelegramChatID") if chat_id is None: raise EnvironmentError() await self.bot.send_message(text=text, reply_markup=reply_markup, chat_id=int(chat_id)) application = Application.builder().token(cast(str, getenv("TelegramAPIKey"))).post_init(post_init).build() application.add_handler(CallbackQueryHandler(buttons)) state.application = application state.last_text = text set_event_loop(new_event_loop()) application.run_polling() return state.last_response
[docs]def watch_reply(conn: Connection, id_: int, mkt: Market, console_only: bool = False) -> None: """Watch for a reply from the bot manager in order to check the bot's work.""" text = (f"Hey, we need to resolve {id_} to {mkt.resolve_to()}. It currently has a value of {mkt.current_answer()}." f"This market is called: {mkt.market.question}\n\n") text += mkt.explain_abstract() try: text += "\n\n" + mkt.explain_specific() except Exception: print(format_exc()) logger.exception("Unable to explain a market's resolution automatically") if not console_only: response = tg_main(text) else: if input(text + " Use this default value? (y/N) ").lower().startswith("y"): response = Response.USE_DEFAULT elif input("Cancel the market? (y/N) ").lower().startswith("y"): response = Response.CANCEL else: response = Response.NO_ACTION if response == Response.NO_ACTION: return elif response == Response.USE_DEFAULT: resp = mkt.resolve() elif response == Response.CANCEL: resp = mkt.cancel() if mkt.status != MarketStatus.RESOLVED: raise RuntimeError(resp) conn.execute( "DELETE FROM markets WHERE id = ?;", (id_, ) ) conn.commit()
@require_env(EnvironmentVariable.ManifoldAPIKey, EnvironmentVariable.DBName) def main(refresh: bool = False, console_only: bool = False) -> int: """Go through watched markets and act on rules (resolve, trade, etc).""" conn = register_db() mkt: market.Market for id_, mkt, check_rate, last_checked in conn.execute("SELECT * FROM markets"): msg = f"Currently checking ID {id_}: {mkt.market.question}" print(msg) logger.info(msg) # print(mkt.explain_abstract()) # print("\n\n" + mkt.explain_specific() + "\n\n") check = (refresh or not last_checked or (datetime.now() > last_checked + timedelta(hours=check_rate))) msg = f' - [{"x" if check else " "}] Should I check?' print(msg) logger.info(msg) if check: check = mkt.should_resolve() msg = f' - [{"x" if check else " "}] Is elligible to resolve?' print(msg) logger.info(msg) if check: watch_reply(conn, id_, mkt, console_only) if mkt.market.isResolved: msg = " - [x] Market resolved, removing from db" print(msg) logger.info(msg) conn.execute( "DELETE FROM markets WHERE id = ?;", (id_, ) ) conn.commit() conn.execute( "UPDATE markets SET last_checked = ?, market = ? WHERE id = ?;", (datetime.now(), mkt, id_) ) conn.commit() conn.close() return 0