Module aiogithubapi.client
This is the class that do the requests against the API.
Expand source code
"""This is the class that do the requests against the API."""
from __future__ import annotations
import asyncio
import json
from typing import Any, Dict
import aiohttp
from .const import (
GitHubClientKwarg,
GitHubRequestKwarg,
HttpContentType,
HttpMethod,
HttpStatusCode,
)
from .exceptions import (
GitHubAuthenticationException,
GitHubConnectionException,
GitHubException,
GitHubGraphQLException,
GitHubNotFoundException,
GitHubNotModifiedException,
GitHubPayloadException,
GitHubPermissionException,
GitHubRatelimitException,
)
from .legacy.client import AIOGitHubAPIClient as LegacyAIOGitHubAPIClient
from .models.base import GitHubBase
from .models.request_data import GitHubBaseRequestDataModel
from .models.response import GitHubResponseModel
STATUS_EXCEPTIONS: Dict[HttpStatusCode, GitHubException] = {
HttpStatusCode.FORBIDDEN: GitHubAuthenticationException,
HttpStatusCode.UNAUTHORIZED: GitHubAuthenticationException,
HttpStatusCode.NOT_MODIFIED: GitHubNotModifiedException,
HttpStatusCode.NOT_FOUND: GitHubNotFoundException,
HttpStatusCode.BAD_REQUEST: GitHubPayloadException,
HttpStatusCode.UNPROCESSABLE_ENTITY: GitHubException,
}
MESSAGE_EXCEPTIONS: Dict[str, GitHubException] = {
"Bad credentials": GitHubAuthenticationException,
"You have exceeded a secondary rate limit and have been temporarily blocked from content creation. Please retry your request again later.": GitHubRatelimitException,
"Must have push access to repository": GitHubPermissionException,
}
class AIOGitHubAPIClient(LegacyAIOGitHubAPIClient):
"""Dummy class to not break existing code."""
class GitHubClient(GitHubBase):
"""
Client to handle API calls.
Don't use this directly, use `aiogithubapi.github.GitHubApi` to get the client.
"""
def __init__(
self,
session: aiohttp.ClientSession,
token: str | None = None,
**kwargs: Dict[GitHubClientKwarg, Any],
) -> None:
"""Initialise the GitHub API client."""
self._base_request_data = GitHubBaseRequestDataModel(
token=token,
kwargs=kwargs,
)
self._session = session
self._loop = asyncio.get_running_loop()
async def async_call_api(
self,
endpoint: str,
*,
data: Dict[str, Any] | str | None = None,
**kwargs: Dict[GitHubRequestKwarg, Any],
) -> GitHubResponseModel:
"""Execute the API call."""
request_arguments: Dict[str, Any] = {
"url": self._base_request_data.request_url(endpoint),
"method": kwargs.get(GitHubRequestKwarg.METHOD, HttpMethod.GET).lower(),
"params": kwargs.get(GitHubRequestKwarg.PARAMS)
or kwargs.get(GitHubRequestKwarg.QUERY, {}),
"timeout": self._base_request_data.timeout,
"headers": {
**self._base_request_data.headers,
**kwargs.get("headers", {}),
},
}
if etag := kwargs.get(GitHubRequestKwarg.ETAG):
request_arguments["headers"][aiohttp.hdrs.IF_NONE_MATCH] = etag
if isinstance(data, dict):
request_arguments["json"] = data
else:
request_arguments["data"] = data
try:
result = await self._session.request(**request_arguments)
except (aiohttp.ClientError, asyncio.CancelledError) as exception:
raise GitHubConnectionException(
"Request exception for "
f"'{self._base_request_data.request_url(endpoint)}' with - {exception}"
) from exception
except asyncio.TimeoutError:
raise GitHubConnectionException(
f"Timeout of {self._base_request_data.timeout} reached while "
f"waiting for {self._base_request_data.request_url(endpoint)}"
) from None
except BaseException as exception:
raise GitHubException(
"Unexpected exception for "
f"'{self._base_request_data.request_url(endpoint)}' with - {exception}"
) from exception
response = GitHubResponseModel(result)
if response.status == HttpStatusCode.NO_CONTENT:
return response
try:
if HttpContentType.BASE_JSON in (response.headers.content_type or ""):
response.data = await result.json(encoding="utf-8")
elif (response.headers.content_type or "") in (
HttpContentType.BASE_ZIP,
HttpContentType.BASE_GZIP,
):
response.data = await result.read()
else:
response.data = await result.text(encoding="utf-8")
except BaseException as exception:
raise GitHubException(
f"Could not handle response data from '{self._base_request_data.request_url(endpoint)}' with - {exception}"
)
message = response.data.get("message") if isinstance(response.data, dict) else None
if message is not None and "rate limit" in message:
raise GitHubRatelimitException(message)
if exception := STATUS_EXCEPTIONS.get(response.status):
raise exception(message or response.data)
if isinstance(response.data, dict):
if message is not None:
if exception := MESSAGE_EXCEPTIONS.get(message):
raise exception(message)
raise GitHubException(message)
if endpoint == "/graphql" and response.data.get("errors", []):
raise GitHubGraphQLException(
", ".join(entry.get("message") for entry in response.data["errors"])
)
return response
Classes
class AIOGitHubAPIClient (session: aiohttp.ClientSession, token: str, headers: Optional[dict] = None, base_url: Optional[str] = None)
-
Dummy class to not break existing code.
Initialize the API client.
Expand source code
class AIOGitHubAPIClient(LegacyAIOGitHubAPIClient): """Dummy class to not break existing code."""
Ancestors
Inherited members
class GitHubClient (session: aiohttp.ClientSession, token: str | None = None, **kwargs: Dict[GitHubClientKwarg, Any])
-
Client to handle API calls.
Don't use this directly, use
aiogithubapi.github.GitHubApi
to get the client.Initialise the GitHub API client.
Expand source code
class GitHubClient(GitHubBase): """ Client to handle API calls. Don't use this directly, use `aiogithubapi.github.GitHubApi` to get the client. """ def __init__( self, session: aiohttp.ClientSession, token: str | None = None, **kwargs: Dict[GitHubClientKwarg, Any], ) -> None: """Initialise the GitHub API client.""" self._base_request_data = GitHubBaseRequestDataModel( token=token, kwargs=kwargs, ) self._session = session self._loop = asyncio.get_running_loop() async def async_call_api( self, endpoint: str, *, data: Dict[str, Any] | str | None = None, **kwargs: Dict[GitHubRequestKwarg, Any], ) -> GitHubResponseModel: """Execute the API call.""" request_arguments: Dict[str, Any] = { "url": self._base_request_data.request_url(endpoint), "method": kwargs.get(GitHubRequestKwarg.METHOD, HttpMethod.GET).lower(), "params": kwargs.get(GitHubRequestKwarg.PARAMS) or kwargs.get(GitHubRequestKwarg.QUERY, {}), "timeout": self._base_request_data.timeout, "headers": { **self._base_request_data.headers, **kwargs.get("headers", {}), }, } if etag := kwargs.get(GitHubRequestKwarg.ETAG): request_arguments["headers"][aiohttp.hdrs.IF_NONE_MATCH] = etag if isinstance(data, dict): request_arguments["json"] = data else: request_arguments["data"] = data try: result = await self._session.request(**request_arguments) except (aiohttp.ClientError, asyncio.CancelledError) as exception: raise GitHubConnectionException( "Request exception for " f"'{self._base_request_data.request_url(endpoint)}' with - {exception}" ) from exception except asyncio.TimeoutError: raise GitHubConnectionException( f"Timeout of {self._base_request_data.timeout} reached while " f"waiting for {self._base_request_data.request_url(endpoint)}" ) from None except BaseException as exception: raise GitHubException( "Unexpected exception for " f"'{self._base_request_data.request_url(endpoint)}' with - {exception}" ) from exception response = GitHubResponseModel(result) if response.status == HttpStatusCode.NO_CONTENT: return response try: if HttpContentType.BASE_JSON in (response.headers.content_type or ""): response.data = await result.json(encoding="utf-8") elif (response.headers.content_type or "") in ( HttpContentType.BASE_ZIP, HttpContentType.BASE_GZIP, ): response.data = await result.read() else: response.data = await result.text(encoding="utf-8") except BaseException as exception: raise GitHubException( f"Could not handle response data from '{self._base_request_data.request_url(endpoint)}' with - {exception}" ) message = response.data.get("message") if isinstance(response.data, dict) else None if message is not None and "rate limit" in message: raise GitHubRatelimitException(message) if exception := STATUS_EXCEPTIONS.get(response.status): raise exception(message or response.data) if isinstance(response.data, dict): if message is not None: if exception := MESSAGE_EXCEPTIONS.get(message): raise exception(message) raise GitHubException(message) if endpoint == "/graphql" and response.data.get("errors", []): raise GitHubGraphQLException( ", ".join(entry.get("message") for entry in response.data["errors"]) ) return response
Ancestors
Class variables
var logger : logging.Logger
Methods
async def async_call_api(self, endpoint: str, *, data: Dict[str, Any] | str | None = None, **kwargs: Dict[GitHubRequestKwarg, Any]) ‑> GitHubResponseModel
-
Execute the API call.
Expand source code
async def async_call_api( self, endpoint: str, *, data: Dict[str, Any] | str | None = None, **kwargs: Dict[GitHubRequestKwarg, Any], ) -> GitHubResponseModel: """Execute the API call.""" request_arguments: Dict[str, Any] = { "url": self._base_request_data.request_url(endpoint), "method": kwargs.get(GitHubRequestKwarg.METHOD, HttpMethod.GET).lower(), "params": kwargs.get(GitHubRequestKwarg.PARAMS) or kwargs.get(GitHubRequestKwarg.QUERY, {}), "timeout": self._base_request_data.timeout, "headers": { **self._base_request_data.headers, **kwargs.get("headers", {}), }, } if etag := kwargs.get(GitHubRequestKwarg.ETAG): request_arguments["headers"][aiohttp.hdrs.IF_NONE_MATCH] = etag if isinstance(data, dict): request_arguments["json"] = data else: request_arguments["data"] = data try: result = await self._session.request(**request_arguments) except (aiohttp.ClientError, asyncio.CancelledError) as exception: raise GitHubConnectionException( "Request exception for " f"'{self._base_request_data.request_url(endpoint)}' with - {exception}" ) from exception except asyncio.TimeoutError: raise GitHubConnectionException( f"Timeout of {self._base_request_data.timeout} reached while " f"waiting for {self._base_request_data.request_url(endpoint)}" ) from None except BaseException as exception: raise GitHubException( "Unexpected exception for " f"'{self._base_request_data.request_url(endpoint)}' with - {exception}" ) from exception response = GitHubResponseModel(result) if response.status == HttpStatusCode.NO_CONTENT: return response try: if HttpContentType.BASE_JSON in (response.headers.content_type or ""): response.data = await result.json(encoding="utf-8") elif (response.headers.content_type or "") in ( HttpContentType.BASE_ZIP, HttpContentType.BASE_GZIP, ): response.data = await result.read() else: response.data = await result.text(encoding="utf-8") except BaseException as exception: raise GitHubException( f"Could not handle response data from '{self._base_request_data.request_url(endpoint)}' with - {exception}" ) message = response.data.get("message") if isinstance(response.data, dict) else None if message is not None and "rate limit" in message: raise GitHubRatelimitException(message) if exception := STATUS_EXCEPTIONS.get(response.status): raise exception(message or response.data) if isinstance(response.data, dict): if message is not None: if exception := MESSAGE_EXCEPTIONS.get(message): raise exception(message) raise GitHubException(message) if endpoint == "/graphql" and response.data.get("errors", []): raise GitHubGraphQLException( ", ".join(entry.get("message") for entry in response.data["errors"]) ) return response
Inherited members