import logging
from asyncio.queues import Queue
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from uuid import UUID, uuid4
from aiohttp import ClientResponse, ClientSession
from pfmsoft.aiohttp_queue.utilities import optional_object
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
[docs]@dataclass
class AiohttpRequest:
method: str
url: str
params: Optional[Dict] = None
data: Any = None
json: Optional[Union[List, Dict]] = None
headers: Optional[Dict] = None
kwargs: Dict = field(default_factory=dict)
def as_dict(self):
kwarg_dict = {
"method": self.method,
"url": self.url,
"params": self.params,
"data": self.data,
"json": self.json,
"headers": self.headers,
}
kwarg_dict.update(self.kwargs)
return kwarg_dict
class AiohttpQueueWorker:
def __init__(self) -> None:
self.uid = uuid4()
self.task_count = 0
async def consumer(self, queue: Queue, session: ClientSession):
while True:
action: AiohttpAction = await queue.get()
try:
self.task_count += 1
await action.do_action(session, queue)
except Exception as ex:
logger.exception(
"Queue worker %s caught an exception from %r", self.uid, action
)
queue.task_done()
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}("
f"uid={self.uid!r}, task_count={self.task_count!r}"
")"
)
[docs]class CallbackState(Enum):
NOT_SET = "not_set"
SUCCESS = "success"
FAIL = "fail"
class AiohttpActionCallback:
def __init__(self, *args, **kwargs) -> None:
_, _ = args, kwargs
self.state: CallbackState = CallbackState.NOT_SET
self.state_message: str = ""
def success(self, caller: "AiohttpAction", msg: str = "", **kwargs):
_, _ = caller, kwargs
self.state = CallbackState.SUCCESS
self.state_message = msg
def fail(self, caller: "AiohttpAction", msg: str, **kwargs):
_ = kwargs
self.state = CallbackState.FAIL
self.state_message = msg
caller.update_state(ActionState.CALLBACK_FAIL, self.__class__.__name__, msg)
async def do_callback(self, caller: "AiohttpAction"):
raise NotImplementedError()
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}("
f"state={self.state!r}, state_message={self.state_message!r}"
")"
)
class ActionObserver:
def __init__(self) -> None:
pass
def update(
self,
action: "AiohttpAction",
source: str,
msg: str,
**kwargs,
):
_ = kwargs
print(action, source, msg)
def __repr__(self) -> str:
return f"{self.__class__.__name__}(" ")"
[docs]@dataclass
class ActionCallbacks:
success: List[AiohttpActionCallback] = field(default_factory=list)
retry: List[AiohttpActionCallback] = field(default_factory=list)
fail: List[AiohttpActionCallback] = field(default_factory=list)
[docs]class ActionState(Enum):
NOT_SET = "not_set"
SUCCESS = "success"
RETRY = "retry"
FAIL = "fail"
CALLBACK_FAIL = "callback_fail"
class AiohttpAction:
def __init__(
self,
aiohttp_args: AiohttpRequest,
name: str = "",
id_: str = "",
max_attempts: int = 1,
context: Optional[Dict] = None,
callbacks: Optional[ActionCallbacks] = None,
observers: Optional[List[ActionObserver]] = None,
retry_codes: Optional[List[int]] = None,
) -> None:
self.aiohttp_args = aiohttp_args
self.id_ = id_
self.name = name
self.uid: UUID = uuid4()
self.max_attempts = max_attempts
self.context = optional_object(context, dict)
self.observers = optional_object(observers, list)
self.callbacks: ActionCallbacks = optional_object(callbacks, ActionCallbacks)
self.retry_codes = optional_object(retry_codes, list, [500, 502, 503, 504])
self.attempts: int = 0
self.response: Optional[ClientResponse] = None
self.response_data: Any = None
self.state: ActionState = ActionState.NOT_SET
def __repr__(self):
return (
f"{self.__class__.__name__}("
f"name={self.name!r}, id_={self.id_!r}, uid={self.uid!r}, "
f"aiohttp_args={self.aiohttp_args!r}, max_attempts={self.max_attempts!r}, "
f"context={self.context!r}, observers={self.observers!r}, "
f"callbacks={self.callbacks!r}, retry_codes={self.retry_codes!r}, "
f"attempts={self.attempts!r}, response={self.response!r}, "
f"response_data={self.response_data!r}, state={self.state}"
")"
)
def __str__(self) -> str:
if self.response is not None:
code: Optional[int] = self.response.status
reason = self.response.reason
else:
code = None
reason = None
return (
f"{self.__class__.__name__}("
f"state={self.state}, name={self.name}, id_={self.id_}, uid={self.uid}, "
f"method={self.aiohttp_args.method!r}, url={self.aiohttp_args.url!r}, "
f"status_code={code!r}, reason={reason!r}"
")"
)
async def do_action(self, session: ClientSession, queue: Optional[Queue] = None):
self.attempts += 1
try:
if self.attempts <= self.max_attempts or self.max_attempts == -1:
async with session.request(**self.aiohttp_args.as_dict()) as response:
self.response = response
await self.check_response(queue)
else:
logger.warning("Retry fail: %r retry_count:%s", self, self.attempts)
await self.fail()
except Exception as ex:
logger.exception(
"Exception: %s raised while doing action: %s",
ex.__class__.__name__,
self,
)
raise ex
async def check_response(self, queue: Optional[Queue]):
if self.response is not None:
if 200 <= self.response.status <= 299:
await self.success()
elif self.response.status in self.retry_codes:
await self.retry(queue)
else:
await self.fail()
else:
logger.error(
"Checked response before response recieved. This should not be possible."
)
async def success(self):
self.update_state(ActionState.SUCCESS, "action", self.response.status)
logger.debug("Successful response for %s", self)
for callback in self.callbacks.success:
try:
await callback.do_callback(caller=self)
except Exception as ex:
logger.exception(
"Exception: %s during success callback: %s for action: %s",
ex.__class__.__name__,
callback,
self,
)
raise ex
async def fail(self):
self.update_state(ActionState.FAIL, "action", str(self))
for callback in self.callbacks.fail:
try:
await callback.do_callback(caller=self)
except Exception as ex:
logger.exception(
"Exception: %s during fail callback: %s for action: %r",
ex.__class__.__name__,
callback,
self,
)
raise ex
logger.warning(
"Fail response for %r meta: %r", self, self.response_meta_to_dict()
)
async def retry(self, queue: Optional[Queue]):
self.update_state(
ActionState.RETRY,
"action",
f"Made {self.attempts} of {self.max_attempts} attempts.",
)
logger.info(
"Retrying %s Made %s of %s} attempts.",
self,
self.attempts,
self.max_attempts,
)
if queue is not None:
await queue.put(self)
else:
logger.info(
"Could have retried this action if used with a queue. Action: %s",
self,
)
await self.fail()
return
for callback in self.callbacks.retry:
try:
await callback.do_callback(caller=self)
except Exception as ex:
logger.exception(
"Exception: %s during retry callback: %s for action: %s",
ex.__class__.__name__,
callback,
self,
)
raise ex
def update_state(
self,
state: ActionState,
source: str,
msg: str,
**kwargs,
):
self.state = state
for observer in self.observers:
observer.update(self, source, msg, **kwargs)
def response_meta_to_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if self.response is None:
return {}
request_headers = [
{key: value} for key, value in self.response.request_info.headers.items()
]
response_headers = [
{key: value} for key, value in self.response.headers.items()
]
data["version"] = self.response.version
data["status"] = self.response.status
data["reason"] = self.response.reason
data["cookies"] = self.response.cookies
data["response_headers"] = response_headers
data["request_info"] = {
"method": self.response.request_info.method,
"url": str(self.response.request_info.url),
"real_url": str(self.response.request_info.real_url),
"headers": request_headers,
}
return data