Module aiogithubapi.namespaces.events

Methods for the events namespace

https://docs.github.com/en/rest/reference/activity#events

Expand source code
"""
Methods for the events namespace

https://docs.github.com/en/rest/reference/activity#events
"""
from __future__ import annotations

import asyncio
from datetime import datetime
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Literal
from uuid import uuid4

from ..const import LOGGER, GitHubRequestKwarg, RepositoryType
from ..exceptions import (
    GitHubAuthenticationException,
    GitHubException,
    GitHubNotFoundException,
    GitHubNotModifiedException,
    GitHubPermissionException,
)
from ..helpers import repository_full_name
from ..models.events import GitHubEventModel
from .base import BaseNamespace

if TYPE_CHECKING:
    from ..client import GitHubClient

_DEFAULT_BACKOFF = 300
_DEFAULT_POLL = 60


class _GitHubEventsBaseNamespace(BaseNamespace):
    """Methods for the events namespace"""

    def __init__(
        self,
        client: GitHubClient,
        space: Literal["users"] | Literal["repos"] | Literal["orgs"],
    ) -> None:
        super().__init__(client)
        self._space = space
        self._subscriptions: Dict[str, asyncio.TimerHandle[None]] = {}

    @staticmethod
    async def _wait(wait_time: float) -> None:
        """Wait for x seconds"""
        await asyncio.sleep(wait_time)

    async def subscribe(
        self,
        name: str,
        event_callback: Callable[[GitHubEventModel], Awaitable[None]],
        *,
        error_callback: Callable[[GitHubException], Awaitable[None]] | None = None,
        **kwargs: Dict[GitHubRequestKwarg, Any],
    ) -> str:
        """
         Subscribe to an event stream.
         This returns an ID you can use with the unsubscribe method to stop listening for events.

         **Arguments**:

         `name`

         The name to return evets from, example "octocat/hello-world"

         `event_callback`

         An async funtion that will be called when new events come in,
         the event will be passed as the first argument.

         `error_callback` (Optional)

         An async funtion that will be called when errors occour,
         the exception that where raised will be passed.

        https://docs.github.com/en/rest/reference/activity#list-public-events
        """
        subscription_id = str(uuid4())

        async def _subscriber():
            _last_etag: str | None = None
            _poll_time: int = 60
            _target_time = datetime.utcnow().isoformat()
            LOGGER.debug("Starting event subscription for github.com/%s", name)
            while subscription_id in self._subscriptions:
                try:
                    response = await self._client.async_call_api(
                        endpoint=f"/{self._space}/{name}/events",
                        etag=_last_etag,
                        **kwargs,
                    )
                except GitHubNotModifiedException:
                    await self._wait(_poll_time)
                    continue
                except (
                    GitHubAuthenticationException,
                    GitHubNotFoundException,
                    GitHubPermissionException,
                ) as err:
                    await error_callback(err)
                    break
                except GitHubException as err:
                    if error_callback is not None:
                        await error_callback(err)
                    await self._wait(_DEFAULT_BACKOFF)
                    continue
                else:
                    _last_etag = response.headers.etag
                    _poll_time = (
                        int(response.headers.x_poll_interval)
                        if response.headers.x_poll_interval
                        else _DEFAULT_POLL
                    )

                    response.data = [
                        GitHubEventModel(event) for event in reversed(response.data or [])
                    ]

                    for event in response.data:
                        if event.created_at < _target_time:
                            continue
                        _target_time = event.created_at

                        LOGGER.debug("New %s for %s", event.type, name)
                        try:
                            await event_callback(event)
                        except Exception as err:
                            if error_callback is not None:
                                await error_callback(GitHubException(err))

                await self._wait(_poll_time)

            LOGGER.debug("Stopping event subscription for github.com/%s", name)
            self.unsubscribe(subscription_id=subscription_id)

        handler = self._client._loop.call_soon(self._client._loop.create_task, _subscriber())
        self._subscriptions[subscription_id] = handler

        return subscription_id

    def unsubscribe(self, *, subscription_id: str | None = None) -> None:
        """
        Unsubscribe to an event stream

        **Arguments**:

        `subscription_id` (Optional)

        The ID you got when you subscribed, if omitted all active subscriptions will be stopped.
        """
        if not subscription_id:
            for subscription_id in list(self._subscriptions):
                handler = self._subscriptions[subscription_id]
                handler.cancel()
                del self._subscriptions[subscription_id]
            return
        if handler := self._subscriptions.get(subscription_id):
            handler.cancel()
            del self._subscriptions[subscription_id]


class GitHubEventsReposNamespace(_GitHubEventsBaseNamespace):
    """Methods for the repository events namespace"""

    def __init__(self, client: GitHubClient) -> None:
        super().__init__(client, space="repos")

    async def subscribe(
        self,
        repository: RepositoryType,
        event_callback: Callable[[GitHubEventModel], Awaitable[None]],
        *,
        error_callback: Callable[[], Awaitable[None]] | None = None,
        **kwargs: Dict[GitHubRequestKwarg, Any],
    ) -> str:
        """
         Subscribe to an event stream.
         This returns an ID you can use with the unsubscribe method to stop listening for events.

         **Arguments**:

         `repository`

         The repository to return evets from, example "octocat/hello-world"

         `event_callback`

         An async funtion that will be called when new events come in,
         the event will be passed as the first argument.

         `error_callback` (Optional)

         An async funtion that will be called when errors occour,
         the exception that where raised will be passed.

        https://docs.github.com/en/rest/reference/activity#list-repository-events
        """
        return await super().subscribe(
            name=repository_full_name(repository),
            event_callback=event_callback,
            error_callback=error_callback,
            **kwargs,
        )

Classes

class GitHubEventsReposNamespace (client: GitHubClient)

Methods for the repository events namespace

Initialise the namespace.

Expand source code
class GitHubEventsReposNamespace(_GitHubEventsBaseNamespace):
    """Methods for the repository events namespace"""

    def __init__(self, client: GitHubClient) -> None:
        super().__init__(client, space="repos")

    async def subscribe(
        self,
        repository: RepositoryType,
        event_callback: Callable[[GitHubEventModel], Awaitable[None]],
        *,
        error_callback: Callable[[], Awaitable[None]] | None = None,
        **kwargs: Dict[GitHubRequestKwarg, Any],
    ) -> str:
        """
         Subscribe to an event stream.
         This returns an ID you can use with the unsubscribe method to stop listening for events.

         **Arguments**:

         `repository`

         The repository to return evets from, example "octocat/hello-world"

         `event_callback`

         An async funtion that will be called when new events come in,
         the event will be passed as the first argument.

         `error_callback` (Optional)

         An async funtion that will be called when errors occour,
         the exception that where raised will be passed.

        https://docs.github.com/en/rest/reference/activity#list-repository-events
        """
        return await super().subscribe(
            name=repository_full_name(repository),
            event_callback=event_callback,
            error_callback=error_callback,
            **kwargs,
        )

Ancestors

  • aiogithubapi.namespaces.events._GitHubEventsBaseNamespace
  • BaseNamespace

Methods

async def subscribe(self, repository: RepositoryType, event_callback: Callable[[GitHubEventModel], Awaitable[None]], *, error_callback: Callable[[], Awaitable[None]] | None = None, **kwargs: Dict[GitHubRequestKwarg, Any]) ‑> str

Subscribe to an event stream. This returns an ID you can use with the unsubscribe method to stop listening for events.

Arguments:

repository

The repository to return evets from, example "octocat/hello-world"

event_callback

An async funtion that will be called when new events come in, the event will be passed as the first argument.

error_callback (Optional)

An async funtion that will be called when errors occour, the exception that where raised will be passed.

https://docs.github.com/en/rest/reference/activity#list-repository-events

Expand source code
async def subscribe(
    self,
    repository: RepositoryType,
    event_callback: Callable[[GitHubEventModel], Awaitable[None]],
    *,
    error_callback: Callable[[], Awaitable[None]] | None = None,
    **kwargs: Dict[GitHubRequestKwarg, Any],
) -> str:
    """
     Subscribe to an event stream.
     This returns an ID you can use with the unsubscribe method to stop listening for events.

     **Arguments**:

     `repository`

     The repository to return evets from, example "octocat/hello-world"

     `event_callback`

     An async funtion that will be called when new events come in,
     the event will be passed as the first argument.

     `error_callback` (Optional)

     An async funtion that will be called when errors occour,
     the exception that where raised will be passed.

    https://docs.github.com/en/rest/reference/activity#list-repository-events
    """
    return await super().subscribe(
        name=repository_full_name(repository),
        event_callback=event_callback,
        error_callback=error_callback,
        **kwargs,
    )