Module chatto.stream
Expand source code
# Copyright (c) 2021-2022, Ethan Henderson
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import annotations
import datetime as dt
import logging
import typing as t
from dataclasses import dataclass
from dateutil.parser import parse as parse_ts
import chatto
from chatto.errors import ChannelNotLive, HTTPError
if t.TYPE_CHECKING:
from aiohttp import ClientSession
log = logging.getLogger(__name__)
@dataclass(eq=True, frozen=True)
class Stream:
"""A dataclass representing a stream. All class variables are also
parameters that should be passed into the constructor."""
id: str
"""The stream ID."""
chat_id: str
"""The stream's active live chat ID."""
start_time: dt.datetime
"""The actual start time of the stream."""
@classmethod
def from_youtube(cls, resource: dict[str, t.Any]) -> Stream:
"""Create a `Stream` object from a video resource from the
YouTube Data API.
## Arguments
* `resource` -
The video resource.
## Returns
* `Stream` -
The newly created stream object.
"""
streaming_details = resource["liveStreamingDetails"]
chat_id = streaming_details.get("activeLiveChatId", None)
if not chat_id:
raise ChannelNotLive("the stream has no active chat ID")
start_time = parse_ts(streaming_details["actualStartTime"])
log.info(f"Retrieved stream info for stream {resource['id']}")
return cls(resource["id"], chat_id, start_time)
@staticmethod
async def fetch_stream_data(
stream_id: str, api_key: str, session: ClientSession
) -> dict[str, t.Any]:
"""A helper method for fetching a video resource for a given
stream.
## Arguments
* `stream_id` -
The ID of the stream to fetch info for.
* `api_key` -
The API key the bot is using.
* `session` -
The aiohttp session the bot is using.
## Returns
* `dict` -
A video resource for the given stream.
## Raises
* `HTTPError` -
The API request is invalid.
"""
url = chatto.YOUTUBE_API_BASE_URL + (
f"/videos?key={api_key}&part=liveStreamingDetails&id={stream_id}"
)
async with session.get(url) as r:
data = await r.json()
err = data.get("error", None)
if err:
raise HTTPError(err["code"], err["errors"][0]["message"])
return data["items"][0] # type: ignore
@staticmethod
async def fetch_active_stream_data(
channel_id: str, token: str, session: ClientSession
) -> dict[str, t.Any]:
"""A helper method for fetching a video resource for the given
channel's active stream.
## Arguments
* `channel_id` -
The ID of the channel to fetch stream info for.
* `api_key` -
The API key the bot is using.
* `session` -
The aiohttp session the bot is using.
## Returns
* `dict` -
A video resource for the given channel's stream.
## Raises
* `HTTPError` -
The API request is invalid.
"""
url = chatto.YOUTUBE_API_BASE_URL + (
"/search"
f"?key={token}"
f"&channelId={channel_id}"
"&eventType=live"
"&type=video"
)
async with session.get(url) as r:
data = await r.json()
err = data.get("error", None)
if err:
raise HTTPError(err["code"], err["errors"][0]["message"])
items = data["items"]
if not items:
raise ChannelNotLive("the provided channel is not live")
stream_id = items[0]["id"]["videoId"]
log.info(f"Retrieved ID of currently live stream ({stream_id})")
return await Stream.fetch_stream_data(stream_id, token, session)
Classes
class Stream (id: str, chat_id: str, start_time: dt.datetime)
-
A dataclass representing a stream. All class variables are also parameters that should be passed into the constructor.
Expand source code
@dataclass(eq=True, frozen=True) class Stream: """A dataclass representing a stream. All class variables are also parameters that should be passed into the constructor.""" id: str """The stream ID.""" chat_id: str """The stream's active live chat ID.""" start_time: dt.datetime """The actual start time of the stream.""" @classmethod def from_youtube(cls, resource: dict[str, t.Any]) -> Stream: """Create a `Stream` object from a video resource from the YouTube Data API. ## Arguments * `resource` - The video resource. ## Returns * `Stream` - The newly created stream object. """ streaming_details = resource["liveStreamingDetails"] chat_id = streaming_details.get("activeLiveChatId", None) if not chat_id: raise ChannelNotLive("the stream has no active chat ID") start_time = parse_ts(streaming_details["actualStartTime"]) log.info(f"Retrieved stream info for stream {resource['id']}") return cls(resource["id"], chat_id, start_time) @staticmethod async def fetch_stream_data( stream_id: str, api_key: str, session: ClientSession ) -> dict[str, t.Any]: """A helper method for fetching a video resource for a given stream. ## Arguments * `stream_id` - The ID of the stream to fetch info for. * `api_key` - The API key the bot is using. * `session` - The aiohttp session the bot is using. ## Returns * `dict` - A video resource for the given stream. ## Raises * `HTTPError` - The API request is invalid. """ url = chatto.YOUTUBE_API_BASE_URL + ( f"/videos?key={api_key}&part=liveStreamingDetails&id={stream_id}" ) async with session.get(url) as r: data = await r.json() err = data.get("error", None) if err: raise HTTPError(err["code"], err["errors"][0]["message"]) return data["items"][0] # type: ignore @staticmethod async def fetch_active_stream_data( channel_id: str, token: str, session: ClientSession ) -> dict[str, t.Any]: """A helper method for fetching a video resource for the given channel's active stream. ## Arguments * `channel_id` - The ID of the channel to fetch stream info for. * `api_key` - The API key the bot is using. * `session` - The aiohttp session the bot is using. ## Returns * `dict` - A video resource for the given channel's stream. ## Raises * `HTTPError` - The API request is invalid. """ url = chatto.YOUTUBE_API_BASE_URL + ( "/search" f"?key={token}" f"&channelId={channel_id}" "&eventType=live" "&type=video" ) async with session.get(url) as r: data = await r.json() err = data.get("error", None) if err: raise HTTPError(err["code"], err["errors"][0]["message"]) items = data["items"] if not items: raise ChannelNotLive("the provided channel is not live") stream_id = items[0]["id"]["videoId"] log.info(f"Retrieved ID of currently live stream ({stream_id})") return await Stream.fetch_stream_data(stream_id, token, session)
Class variables
var chat_id : str
-
The stream's active live chat ID.
var id : str
-
The stream ID.
var start_time : datetime.datetime
-
The actual start time of the stream.
Static methods
async def fetch_active_stream_data(channel_id: str, token: str, session: ClientSession) ‑> dict[str, t.Any]
-
A helper method for fetching a video resource for the given channel's active stream.
Arguments
channel_id
- The ID of the channel to fetch stream info for.api_key
- The API key the bot is using.session
- The aiohttp session the bot is using.
Returns
dict
- A video resource for the given channel's stream.
Raises
HTTPError
- The API request is invalid.
Expand source code
@staticmethod async def fetch_active_stream_data( channel_id: str, token: str, session: ClientSession ) -> dict[str, t.Any]: """A helper method for fetching a video resource for the given channel's active stream. ## Arguments * `channel_id` - The ID of the channel to fetch stream info for. * `api_key` - The API key the bot is using. * `session` - The aiohttp session the bot is using. ## Returns * `dict` - A video resource for the given channel's stream. ## Raises * `HTTPError` - The API request is invalid. """ url = chatto.YOUTUBE_API_BASE_URL + ( "/search" f"?key={token}" f"&channelId={channel_id}" "&eventType=live" "&type=video" ) async with session.get(url) as r: data = await r.json() err = data.get("error", None) if err: raise HTTPError(err["code"], err["errors"][0]["message"]) items = data["items"] if not items: raise ChannelNotLive("the provided channel is not live") stream_id = items[0]["id"]["videoId"] log.info(f"Retrieved ID of currently live stream ({stream_id})") return await Stream.fetch_stream_data(stream_id, token, session)
async def fetch_stream_data(stream_id: str, api_key: str, session: ClientSession) ‑> dict[str, t.Any]
-
A helper method for fetching a video resource for a given stream.
Arguments
stream_id
- The ID of the stream to fetch info for.api_key
- The API key the bot is using.session
- The aiohttp session the bot is using.
Returns
dict
- A video resource for the given stream.
Raises
HTTPError
- The API request is invalid.
Expand source code
@staticmethod async def fetch_stream_data( stream_id: str, api_key: str, session: ClientSession ) -> dict[str, t.Any]: """A helper method for fetching a video resource for a given stream. ## Arguments * `stream_id` - The ID of the stream to fetch info for. * `api_key` - The API key the bot is using. * `session` - The aiohttp session the bot is using. ## Returns * `dict` - A video resource for the given stream. ## Raises * `HTTPError` - The API request is invalid. """ url = chatto.YOUTUBE_API_BASE_URL + ( f"/videos?key={api_key}&part=liveStreamingDetails&id={stream_id}" ) async with session.get(url) as r: data = await r.json() err = data.get("error", None) if err: raise HTTPError(err["code"], err["errors"][0]["message"]) return data["items"][0] # type: ignore
def from_youtube(resource: dict[str, t.Any]) ‑> Stream
-
Create a
Stream
object from a video resource from the YouTube Data API.Arguments
resource
- The video resource.
Returns
Stream
- The newly created stream object.
Expand source code
@classmethod def from_youtube(cls, resource: dict[str, t.Any]) -> Stream: """Create a `Stream` object from a video resource from the YouTube Data API. ## Arguments * `resource` - The video resource. ## Returns * `Stream` - The newly created stream object. """ streaming_details = resource["liveStreamingDetails"] chat_id = streaming_details.get("activeLiveChatId", None) if not chat_id: raise ChannelNotLive("the stream has no active chat ID") start_time = parse_ts(streaming_details["actualStartTime"]) log.info(f"Retrieved stream info for stream {resource['id']}") return cls(resource["id"], chat_id, start_time)