Module chatto.events

Expand source code
# Copyright (c) 2021-2022, Ethan Henderson
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
#    contributors may be used to endorse or promote products derived from
#    this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

import asyncio
import logging
import traceback
import typing as t
from collections import defaultdict
from collections.abc import Awaitable
from dataclasses import dataclass

from chatto.errors import NoEventQueue
from chatto.message import Message
from chatto.secrets import Secrets
from chatto.stream import Stream

log = logging.getLogger(__name__)


@dataclass(eq=True, frozen=True)
class Event:
    """The base dataclass for all events."""

    def __str__(self) -> str:
        return self.__class__.__name__


@dataclass(eq=True, frozen=True)
class ReadyEvent(Event):
    """Event dispatched once the bot is ready to start receiving
    messages."""


@dataclass(eq=True, frozen=True)
class MessageCreatedEvent(Event):
    """Event dispatched when a message has been sent to the live
    chat by a user."""

    message: Message
    """The received message."""


@dataclass(eq=True, frozen=True)
class StreamFetchedEvent(Event):
    """Event dispatched when stream information has been fetched."""

    stream: Stream
    """The stream for which information has been fetched for."""


@dataclass(eq=True, frozen=True)
class ChatPolledEvent(Event):
    """Event dispatched when the YouTube live chat is polled."""

    data: dict[str, t.Any]
    """The data received from the poll."""


@dataclass(eq=True, frozen=True)
class MessageSentEvent(Event):
    """Event dispatched when a message has been sent to the live
    chat by the bot."""

    message: Message
    """The sent message."""


@dataclass(eq=True, frozen=True)
class AuthorisedEvent(Event):
    """Event dispatched once the bot has been authorised with
    OAuth 2."""

    secrets: Secrets
    """The secrets data."""

    tokens: dict[str, t.Any]
    """The OAuth tokens."""


if t.TYPE_CHECKING:
    CallbacksT = dict[t.Type[Event], list[t.Callable[[Event], Awaitable[t.Any]]]]
    ListenerT = t.Callable[[t.Callable[[t.Any], t.Any]], None]


class EventHandler:
    """A class that can be attached to the bot to handle events."""

    __slots__ = ("_queue", "callbacks")

    def __init__(self) -> None:
        self.callbacks: CallbacksT = defaultdict(list)
        """A mapping of events to their respective callbacks."""

    @property
    def queue(self) -> asyncio.Queue[Event] | None:
        """The event queue the bot is using. If the event queue has not
        been created, this will be `None`."""
        return getattr(self, "_queue", None)

    @property
    def queue_size(self) -> int:
        """The size of the event queue. If the event queue has not been
        created, this will be 0."""
        if not self.queue:
            return 0

        return self._queue.qsize()

    async def create_queue(self) -> None:
        """Create the event queue. This is handled for you."""
        if self.queue:
            log.warning("The event handler already has an event queue")
        self._queue: asyncio.Queue[Event] = asyncio.Queue()

    async def process(self) -> None:
        """A forever-looping task that processes events once they are
        pushed onto the queue."""
        if not self.queue:
            raise NoEventQueue("there is no event queue")

        while True:
            try:
                event = await self._queue.get()
                log.debug(f"Retrieved {event} event")
                for cb in self.callbacks[event.__class__]:
                    log.debug(f"Running callback '{cb.__name__}' for event...")
                    await cb(event)

            except Exception:
                log.error(f"Ignoring error processing {event} event:")
                traceback.print_exc()

    async def dispatch(self, event_type: t.Type[Event], *args: t.Any) -> Event:
        """Dispatch an event. This puts the event on the event queue.

        ## Arguments
        * `event_type` -
            The event type to put on the queue. This **must** be a
            subclass of `Event`.
        * `*args` -
            A series of arguments to be passed to the event callback
            when called.

        ## Returns
        The event instance.

        ## Raises
        `NoEventQueue` -
            The event queue has not been created.
        """
        if not self.queue:
            raise NoEventQueue("there is no event queue")

        event = event_type(*args)
        await self._queue.put(event)
        log.debug(f"Dispatched {event_type.__name__} event")
        return event

    def subscribe(
        self, event_type: t.Type[Event], *callbacks: t.Callable[[t.Any], t.Any]
    ) -> None:
        """Subscribe callbacks to an event.

        ## Arguments
        * `event_type` -
            The event type to subscribe the callback to. This **must**
            be a subclass of `Event`.
        * `*callbacks` -
            A series of callbacks to subscribe to the event.

        ## Raises
        `NoEventQueue` -
            The event queue has not been created.
        """
        for cb in callbacks:
            self.callbacks[event_type].append(cb)
            log.info(
                f"Subscribed to {event_type.__name__} events "
                f"with callback '{cb.__name__}'"
            )

    def listen(self, event_type: type[Event]) -> ListenerT:
        """A decorator used to subscribe the wrapped callback to an
        event.

        ## Arguments
        * `event_type` -
            The event type to subscribe to. This **must** be a subclass
            of `events.Event`.

        ## Example
        ```py
        @bot.events.listen(events.StreamFetchedEvent)
        async def on_stream_fetched(event):
            print(f"Fetched stream with ID: {event.stream.id}")
        ```
        """
        return lambda callback: self.subscribe(event_type, callback)

Classes

class AuthorisedEvent (secrets: Secrets, tokens: dict[str, t.Any])

Event dispatched once the bot has been authorised with OAuth 2.

Expand source code
@dataclass(eq=True, frozen=True)
class AuthorisedEvent(Event):
    """Event dispatched once the bot has been authorised with
    OAuth 2."""

    secrets: Secrets
    """The secrets data."""

    tokens: dict[str, t.Any]
    """The OAuth tokens."""

Ancestors

Class variables

var secretsSecrets

The secrets data.

var tokens : dict[str, typing.Any]

The OAuth tokens.

class ChatPolledEvent (data: dict[str, t.Any])

Event dispatched when the YouTube live chat is polled.

Expand source code
@dataclass(eq=True, frozen=True)
class ChatPolledEvent(Event):
    """Event dispatched when the YouTube live chat is polled."""

    data: dict[str, t.Any]
    """The data received from the poll."""

Ancestors

Class variables

var data : dict[str, typing.Any]

The data received from the poll.

class Event

The base dataclass for all events.

Expand source code
@dataclass(eq=True, frozen=True)
class Event:
    """The base dataclass for all events."""

    def __str__(self) -> str:
        return self.__class__.__name__

Subclasses

class EventHandler

A class that can be attached to the bot to handle events.

Expand source code
class EventHandler:
    """A class that can be attached to the bot to handle events."""

    __slots__ = ("_queue", "callbacks")

    def __init__(self) -> None:
        self.callbacks: CallbacksT = defaultdict(list)
        """A mapping of events to their respective callbacks."""

    @property
    def queue(self) -> asyncio.Queue[Event] | None:
        """The event queue the bot is using. If the event queue has not
        been created, this will be `None`."""
        return getattr(self, "_queue", None)

    @property
    def queue_size(self) -> int:
        """The size of the event queue. If the event queue has not been
        created, this will be 0."""
        if not self.queue:
            return 0

        return self._queue.qsize()

    async def create_queue(self) -> None:
        """Create the event queue. This is handled for you."""
        if self.queue:
            log.warning("The event handler already has an event queue")
        self._queue: asyncio.Queue[Event] = asyncio.Queue()

    async def process(self) -> None:
        """A forever-looping task that processes events once they are
        pushed onto the queue."""
        if not self.queue:
            raise NoEventQueue("there is no event queue")

        while True:
            try:
                event = await self._queue.get()
                log.debug(f"Retrieved {event} event")
                for cb in self.callbacks[event.__class__]:
                    log.debug(f"Running callback '{cb.__name__}' for event...")
                    await cb(event)

            except Exception:
                log.error(f"Ignoring error processing {event} event:")
                traceback.print_exc()

    async def dispatch(self, event_type: t.Type[Event], *args: t.Any) -> Event:
        """Dispatch an event. This puts the event on the event queue.

        ## Arguments
        * `event_type` -
            The event type to put on the queue. This **must** be a
            subclass of `Event`.
        * `*args` -
            A series of arguments to be passed to the event callback
            when called.

        ## Returns
        The event instance.

        ## Raises
        `NoEventQueue` -
            The event queue has not been created.
        """
        if not self.queue:
            raise NoEventQueue("there is no event queue")

        event = event_type(*args)
        await self._queue.put(event)
        log.debug(f"Dispatched {event_type.__name__} event")
        return event

    def subscribe(
        self, event_type: t.Type[Event], *callbacks: t.Callable[[t.Any], t.Any]
    ) -> None:
        """Subscribe callbacks to an event.

        ## Arguments
        * `event_type` -
            The event type to subscribe the callback to. This **must**
            be a subclass of `Event`.
        * `*callbacks` -
            A series of callbacks to subscribe to the event.

        ## Raises
        `NoEventQueue` -
            The event queue has not been created.
        """
        for cb in callbacks:
            self.callbacks[event_type].append(cb)
            log.info(
                f"Subscribed to {event_type.__name__} events "
                f"with callback '{cb.__name__}'"
            )

    def listen(self, event_type: type[Event]) -> ListenerT:
        """A decorator used to subscribe the wrapped callback to an
        event.

        ## Arguments
        * `event_type` -
            The event type to subscribe to. This **must** be a subclass
            of `events.Event`.

        ## Example
        ```py
        @bot.events.listen(events.StreamFetchedEvent)
        async def on_stream_fetched(event):
            print(f"Fetched stream with ID: {event.stream.id}")
        ```
        """
        return lambda callback: self.subscribe(event_type, callback)

Instance variables

var callbacks

A mapping of events to their respective callbacks.

var queue : asyncio.queues.Queue | None

The event queue the bot is using. If the event queue has not been created, this will be None.

Expand source code
@property
def queue(self) -> asyncio.Queue[Event] | None:
    """The event queue the bot is using. If the event queue has not
    been created, this will be `None`."""
    return getattr(self, "_queue", None)
var queue_size : int

The size of the event queue. If the event queue has not been created, this will be 0.

Expand source code
@property
def queue_size(self) -> int:
    """The size of the event queue. If the event queue has not been
    created, this will be 0."""
    if not self.queue:
        return 0

    return self._queue.qsize()

Methods

async def create_queue(self) ‑> None

Create the event queue. This is handled for you.

Expand source code
async def create_queue(self) -> None:
    """Create the event queue. This is handled for you."""
    if self.queue:
        log.warning("The event handler already has an event queue")
    self._queue: asyncio.Queue[Event] = asyncio.Queue()
async def dispatch(self, event_type: t.Type[Event], *args: t.Any) ‑> Event

Dispatch an event. This puts the event on the event queue.

Arguments

  • event_type - The event type to put on the queue. This must be a subclass of Event.
  • *args - A series of arguments to be passed to the event callback when called.

Returns

The event instance.

Raises

NoEventQueue - The event queue has not been created.

Expand source code
async def dispatch(self, event_type: t.Type[Event], *args: t.Any) -> Event:
    """Dispatch an event. This puts the event on the event queue.

    ## Arguments
    * `event_type` -
        The event type to put on the queue. This **must** be a
        subclass of `Event`.
    * `*args` -
        A series of arguments to be passed to the event callback
        when called.

    ## Returns
    The event instance.

    ## Raises
    `NoEventQueue` -
        The event queue has not been created.
    """
    if not self.queue:
        raise NoEventQueue("there is no event queue")

    event = event_type(*args)
    await self._queue.put(event)
    log.debug(f"Dispatched {event_type.__name__} event")
    return event
def listen(self, event_type: type[Event]) ‑> ListenerT

A decorator used to subscribe the wrapped callback to an event.

Arguments

  • event_type - The event type to subscribe to. This must be a subclass of events.Event.

Example

@bot.events.listen(events.StreamFetchedEvent)
async def on_stream_fetched(event):
    print(f"Fetched stream with ID: {event.stream.id}")
Expand source code
def listen(self, event_type: type[Event]) -> ListenerT:
    """A decorator used to subscribe the wrapped callback to an
    event.

    ## Arguments
    * `event_type` -
        The event type to subscribe to. This **must** be a subclass
        of `events.Event`.

    ## Example
    ```py
    @bot.events.listen(events.StreamFetchedEvent)
    async def on_stream_fetched(event):
        print(f"Fetched stream with ID: {event.stream.id}")
    ```
    """
    return lambda callback: self.subscribe(event_type, callback)
async def process(self) ‑> None

A forever-looping task that processes events once they are pushed onto the queue.

Expand source code
async def process(self) -> None:
    """A forever-looping task that processes events once they are
    pushed onto the queue."""
    if not self.queue:
        raise NoEventQueue("there is no event queue")

    while True:
        try:
            event = await self._queue.get()
            log.debug(f"Retrieved {event} event")
            for cb in self.callbacks[event.__class__]:
                log.debug(f"Running callback '{cb.__name__}' for event...")
                await cb(event)

        except Exception:
            log.error(f"Ignoring error processing {event} event:")
            traceback.print_exc()
def subscribe(self, event_type: t.Type[Event], *callbacks: t.Callable[[t.Any], t.Any]) ‑> None

Subscribe callbacks to an event.

Arguments

  • event_type - The event type to subscribe the callback to. This must be a subclass of Event.
  • *callbacks - A series of callbacks to subscribe to the event.

Raises

NoEventQueue - The event queue has not been created.

Expand source code
def subscribe(
    self, event_type: t.Type[Event], *callbacks: t.Callable[[t.Any], t.Any]
) -> None:
    """Subscribe callbacks to an event.

    ## Arguments
    * `event_type` -
        The event type to subscribe the callback to. This **must**
        be a subclass of `Event`.
    * `*callbacks` -
        A series of callbacks to subscribe to the event.

    ## Raises
    `NoEventQueue` -
        The event queue has not been created.
    """
    for cb in callbacks:
        self.callbacks[event_type].append(cb)
        log.info(
            f"Subscribed to {event_type.__name__} events "
            f"with callback '{cb.__name__}'"
        )
class MessageCreatedEvent (message: Message)

Event dispatched when a message has been sent to the live chat by a user.

Expand source code
@dataclass(eq=True, frozen=True)
class MessageCreatedEvent(Event):
    """Event dispatched when a message has been sent to the live
    chat by a user."""

    message: Message
    """The received message."""

Ancestors

Class variables

var messageMessage

The received message.

class MessageSentEvent (message: Message)

Event dispatched when a message has been sent to the live chat by the bot.

Expand source code
@dataclass(eq=True, frozen=True)
class MessageSentEvent(Event):
    """Event dispatched when a message has been sent to the live
    chat by the bot."""

    message: Message
    """The sent message."""

Ancestors

Class variables

var messageMessage

The sent message.

class ReadyEvent

Event dispatched once the bot is ready to start receiving messages.

Expand source code
@dataclass(eq=True, frozen=True)
class ReadyEvent(Event):
    """Event dispatched once the bot is ready to start receiving
    messages."""

Ancestors

class StreamFetchedEvent (stream: Stream)

Event dispatched when stream information has been fetched.

Expand source code
@dataclass(eq=True, frozen=True)
class StreamFetchedEvent(Event):
    """Event dispatched when stream information has been fetched."""

    stream: Stream
    """The stream for which information has been fetched for."""

Ancestors

Class variables

var streamStream

The stream for which information has been fetched for.