Source code for ManifoldMarketManager.rule.generic

"""Contains generic rules, which don't interact markets other than their assigned one, & don't cause any mutations."""

from __future__ import annotations

from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, Generic, Mapping, Optional, Tuple, Union, cast

from attrs import Factory, define

from .. import Rule
from ..caching import parallel
from ..consts import BinaryResolution, PseudoNumericResolution, T
from ..util import normalize_mapping
from . import DoResolveRule, ResolutionValueRule, get_rule
from .abstract import BinaryRule, ResolveRandomSeed, UnaryRule, VariadicRule

if TYPE_CHECKING:  # pragma: no cover
    from concurrent.futures import Future
    from typing import Any, ClassVar, DefaultDict, Literal, MutableSequence

    from ..consts import FreeResponseResolution, MultipleChoiceResolution
    from ..market import Market
    from ..util import ModJSONDict


[docs]@define(slots=False) class NegateRule(UnaryRule[Optional[BinaryResolution]]): """Negate another DoResolveRule.""" _explainer_stub: ClassVar[str] = "Resolve False if the below is True, and vice versa"
[docs] def _value(self, market: Market) -> bool: return not self.child._value(market)
[docs]@define(slots=False) class EitherRule(BinaryRule[Optional[BinaryResolution]]): """Return the OR of two other DoResolveRules.""" _explainer_stub: ClassVar[str] = "Resolve True if either of the below resolves True, otherwise resolve False"
[docs] def _value(self, market: Market) -> bool: return bool(self.rule1._value(market)) or bool(self.rule2._value(market))
[docs]@define(slots=False) class BothRule(BinaryRule[Optional[BinaryResolution]]): """Return the AND of two other DoResolveRules.""" _explainer_stub: ClassVar[str] = "Resolve True if both of the below resolve to True, otherwise resolve False"
[docs] def _value(self, market: Market) -> bool: return bool(self.rule1._value(market)) and bool(self.rule2._value(market))
[docs]@define(slots=False) class NANDRule(BinaryRule[Optional[BinaryResolution]]): """Return the NAND of two other DoResolveRules.""" _explainer_stub: ClassVar[str] = "Resolve True if one or more of the below resolves False, otherwise resolve False"
[docs] def _value(self, market: Market) -> bool: return not (self.rule1._value(market) and self.rule2._value(market))
[docs]@define(slots=False) class NeitherRule(BinaryRule[Optional[BinaryResolution]]): """Return the NOR of two other DoResolveRules.""" _explainer_stub: ClassVar[str] = "Resolve False if either of the below resolve to True, otherwise resolve True"
[docs] def _value(self, market: Market) -> bool: return not (self.rule1._value(market) or self.rule2._value(market))
[docs]@define(slots=False) class XORRule(BinaryRule[Optional[BinaryResolution]]): """Return the XOR of two other DoResolveRules.""" _explainer_stub: ClassVar[str] = "Resolve False if the below resolve to the same value, otherwise resolve True"
[docs] def _value(self, market: Market) -> bool: return bool(bool(self.rule1._value(market)) ^ bool(self.rule2._value(market)))
[docs]@define(slots=False) class XNORRule(BinaryRule[Optional[BinaryResolution]]): """Return the XNOR of two other DoResolveRules.""" _explainer_stub: ClassVar[str] = "Resolve True if the below resolve to the same value, otherwise resolve False"
[docs] def _value(self, market: Market) -> bool: return bool(self.rule1._value(market)) == bool(self.rule2._value(market))
[docs]@define(slots=False) class ImpliesRule(BinaryRule[Optional[BinaryResolution]]): """Return the implication of two other DoResolveRules.""" _explainer_stub: ClassVar[str] = ( "Resolve True if the next line resolves False, otherwise resolves to the value of the item after" )
[docs] def _value(self, market: Market) -> bool: return not self.rule1._value(market) or bool(self.rule2._value(market))
[docs]@define(slots=False) class ConditionalRule(BinaryRule[BinaryResolution]): """Cancels if the premise is false, and resolve to another value otherwise.""" _explainer_stub: ClassVar[str] = ( "Cancels if the next line resolves False, otherwise resolves to the value of the item after" )
[docs] def _value(self, market: Market) -> BinaryResolution: f_val1 = parallel(self.rule1._value, market) f_val2 = parallel(self.rule2._value, market) if not f_val1.result(): return "CANCEL" return f_val2.result()
[docs]@define(slots=False) class ResolveAtTime(DoResolveRule): """Return True if the specified time is in the past.""" resolve_at: datetime
[docs] def _value(self, market: Market) -> bool: """Return True iff the current time is after resolve_at.""" try: return datetime.now(timezone.utc) >= self.resolve_at except TypeError: return datetime.now() >= self.resolve_at
[docs] def _explain_abstract(self, indent: int = 0, **kwargs: Any) -> str: return f"{' ' * indent}- Resolve True if the current time is past {self.resolve_at}, otherwise resolve False\n"
[docs]@define(slots=False) class ResolveToValue(Generic[T], Rule[T]): """Resolve to a pre-specified value.""" resolve_value: T
[docs] def _value(self, market: Market) -> T: return self.resolve_value
[docs] def _explain_abstract(self, indent: int = 0, **kwargs: Any) -> str: return f"{' ' * indent}- Resolves to the specific value {self.resolve_value}\n"
[docs]@define(slots=False) class ModulusRule(BinaryRule[PseudoNumericResolution]): """Return the modulus of two other DoResolveRules.""" _explainer_stub: ClassVar[str] = "A mod B, where A is the next line and B the line after"
[docs] def _value(self, market: Market) -> Literal["CANCEL"] | float: val1, val2 = self.rule1._value(market), self.rule2._value(market) if val1 == "CANCEL" or val2 == "CANCEL": return "CANCEL" return val1 % val2
[docs]@define(slots=False) class AdditiveRule(VariadicRule[PseudoNumericResolution]): """Return the sum of many other Rules.""" _explainer_stub: ClassVar[str] = "The sum of the below"
[docs] def _value(self, market: Market) -> Literal["CANCEL"] | float: """Return the sum of the underlying rules.""" ret: float = 0 futures = [parallel(rule.value, market, format='PSEUDO_NUMERIC') for rule in self.rules] for f_rule in futures: val = cast( PseudoNumericResolution, f_rule.result() ) if val == "CANCEL": return "CANCEL" ret += val return ret
[docs]@define(slots=False) class MultiplicitiveRule(VariadicRule[PseudoNumericResolution]): """Return the product of many other Rules.""" _explainer_stub: ClassVar[str] = "The product of the below"
[docs] def _value(self, market: Market) -> Literal["CANCEL"] | float: """Return the product of the underlying rules.""" ret: float = 1 futures = [parallel(rule.value, market, format='PSEUDO_NUMERIC') for rule in self.rules] for f_rule in futures: val = cast( PseudoNumericResolution, f_rule.result() ) if val == "CANCEL": return "CANCEL" ret *= val return ret
[docs]@define(slots=False) class ResolveRandomIndex(ResolveRandomSeed): """Resolve to a random index in a market.""" size: Optional[int] = None start: int = 0 def __init__( self, seed: int | float | str | bytes | bytearray, *args: Any, size: Optional[int] = None, start: int = 0, **kwargs: Any ) -> None: """Ensure that we select a different method depending on the type of range that's requested.""" self.start = start self.size = size if size is None: method = 'choices' else: method = 'randrange' super().__init__(seed, method, *args, **kwargs)
[docs] def _value(self, market: Market) -> int: market.refresh() if self.method == 'randrange': self.args = (self.start, self.size) else: assert isinstance(market.market.pool, Mapping) items = [(int(idx), float(obj)) for idx, obj in market.market.pool.items() if int(idx) >= self.start] self.args = (range(self.start, self.start + len(items)), ) self.kwargs["weights"] = [prob for _, prob in items] return cast(int, super()._value(market))
[docs] def _explain_abstract(self, indent: int = 0, **kwargs: Any) -> str: ret = f"{' ' * indent}- Resolve to a random index, given some original seed. This one operates on a " if self.method == 'rand"range': ret += f"fixed range of integers in ({self.start} <= x < {self.size}).\n" else: ret += f"dynamic range based on the current pool and probabilities, but starting at {self.start}.\n" return ret
[docs]@define(slots=False) class ResolveMultipleValues(ResolutionValueRule): """Resolve to multiple values with different shares.""" shares: MutableSequence[tuple[ResolutionValueRule, float]] = Factory(list)
[docs] def _value(self, market: Market) -> FreeResponseResolution | MultipleChoiceResolution: ret: DefaultDict[int, float] = defaultdict(float) for rule, part in self.shares: val = cast(Dict[Union[str, int], Future[float]], parallel(rule.value, market, format='FREE_RESPONSE')) for idx, value in val.items(): ret[int(idx)] += value.result() * part return normalize_mapping(ret)
[docs] def _explain_abstract(self, indent: int = 0, **kwargs: Any) -> str: ret = f"{' ' * indent}Resolves to the weighted union of multiple other values.\n" indent += 1 for rule, weight in self.shares: ret += f"{' ' * indent} - At a weight of {weight}\n" ret += rule.explain_abstract(indent + 1, **kwargs) return ret
[docs] @classmethod def from_dict(cls, env: ModJSONDict) -> 'ResolveMultipleValues': """Take a dictionary and return an instance of the associated class.""" env_copy: ModJSONDict = {**env} shares: MutableSequence[tuple[ResolutionValueRule | tuple[str, ModJSONDict], float]] = ( env.get('shares', []) # type: ignore[assignment] ) new_shares = [] for rule, weight in shares: try: type_, kwargs = cast(Tuple[str, ModJSONDict], rule) new_rule = get_rule(type_).from_dict(kwargs) new_shares.append((new_rule, weight)) except Exception: pass env_copy['shares'] = new_shares # type: ignore return super().from_dict(env_copy)