Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class RemoteMethodNotRegisteredError(BaseRPCError):
class RPCServerError(BaseRPCError):
msg_template = (
"While running method '{method_name}' raised "
"'{exc_type}': '{exc_message}'\n{traceback}"
"'{exc_type}' [{error_code}]: '{exc_message}'\n{traceback}"
)


Expand Down
26 changes: 22 additions & 4 deletions packages/service-library/src/servicelib/rabbitmq/_rpc_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
from dataclasses import dataclass, field
from typing import Any, TypeVar

from common_library.error_codes import create_error_code
from models_library.rabbitmq_basic_types import RPCMethodName
from servicelib.logging_errors import create_troubleshootting_log_kwargs

from ..logging_utils import log_context
from ._errors import RPCServerError

DecoratedCallable = TypeVar("DecoratedCallable", bound=Callable[..., Any])

# NOTE: this is equivalent to http access logs
_logger = logging.getLogger("rpc.access")

_logger = logging.getLogger(
# NOTE: this logger is equivalent to http access logs
"rpc.access"
)


def _create_func_msg(func, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
Expand All @@ -40,7 +45,9 @@ def expose(
*,
reraise_if_error_type: tuple[type[Exception], ...] | None = None,
) -> Callable[[DecoratedCallable], DecoratedCallable]:

def _decorator(func: DecoratedCallable) -> DecoratedCallable:

@functools.wraps(func)
async def _wrapper(*args, **kwargs):
with log_context(
Expand All @@ -64,9 +71,19 @@ async def _wrapper(*args, **kwargs):
):
raise

error_code = create_error_code(exc)
_logger.exception(
"Unhandled exception on the rpc-server side. Re-raising as %s.",
RPCServerError.__name__,
# NOTE: equivalent to a 500 http status code error
**create_troubleshootting_log_kwargs(
f"Unhandled exception on the rpc-server side for '{func.__name__}'",
error=exc,
error_code=error_code,
error_context={
"rpc_method": func.__name__,
"args": args,
"kwargs": kwargs,
},
)
)
# NOTE: we do not return internal exceptions over RPC
formatted_traceback = "\n".join(
Expand All @@ -77,6 +94,7 @@ async def _wrapper(*args, **kwargs):
exc_type=f"{exc.__class__.__module__}.{exc.__class__.__name__}",
exc_message=f"{exc}",
traceback=f"{formatted_traceback}",
error_code=error_code,
) from None

self.routes[RPCMethodName(func.__name__)] = _wrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from models_library.users import UserID
from pydantic import TypeAdapter, ValidationError, validate_call
from pyinstrument import Profiler
from servicelib.logging_utils import log_decorator
from servicelib.rabbitmq import RPCRouter
from servicelib.rabbitmq.rpc_interfaces.catalog.errors import (
CatalogForbiddenError,
Expand Down Expand Up @@ -106,7 +105,6 @@ async def list_services_paginated(
ValidationError,
)
)
@log_decorator(_logger, level=logging.DEBUG)
@_profile_rpc_call
@validate_call(config={"arbitrary_types_allowed": True})
async def get_service(
Expand Down Expand Up @@ -141,7 +139,6 @@ async def get_service(
ValidationError,
)
)
@log_decorator(_logger, level=logging.DEBUG)
@validate_call(config={"arbitrary_types_allowed": True})
async def update_service(
app: FastAPI,
Expand Down Expand Up @@ -179,7 +176,6 @@ async def update_service(
ValidationError,
)
)
@log_decorator(_logger, level=logging.DEBUG)
@validate_call(config={"arbitrary_types_allowed": True})
async def check_for_service(
app: FastAPI,
Expand All @@ -203,7 +199,6 @@ async def check_for_service(


@router.expose(reraise_if_error_type=(CatalogForbiddenError, ValidationError))
@log_decorator(_logger, level=logging.DEBUG)
@validate_call(config={"arbitrary_types_allowed": True})
async def batch_get_my_services(
app: FastAPI,
Expand Down Expand Up @@ -233,7 +228,6 @@ async def batch_get_my_services(


@router.expose(reraise_if_error_type=(ValidationError,))
@log_decorator(_logger, level=logging.DEBUG)
@validate_call(config={"arbitrary_types_allowed": True})
async def list_my_service_history_latest_first(
app: FastAPI,
Expand Down Expand Up @@ -281,7 +275,6 @@ async def list_my_service_history_latest_first(
ValidationError,
)
)
@log_decorator(_logger, level=logging.DEBUG)
@validate_call(config={"arbitrary_types_allowed": True})
async def get_service_ports(
app: FastAPI,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ async def _get_task_state(job_id: str) -> RunningState:
parsed_event = TaskLifeCycleState.model_validate(dask_events[-1][1])

if parsed_event.state == RunningState.FAILED:
log_error_context = {
"job_id": job_id,
"dask-scheduler": self.backend.scheduler_id,
}
try:
# find out if this was a cancellation
task_future: distributed.Future = (
Expand All @@ -461,10 +465,7 @@ async def _get_task_state(job_id: str) -> RunningState:
timeout=_DASK_DEFAULT_TIMEOUT_S
)
assert isinstance(exception, Exception) # nosec
log_error_context = {
"job_id": job_id,
"dask-scheduler": self.backend.scheduler_id,
}

if isinstance(exception, TaskCancelledError):
_logger.info(
**create_troubleshootting_log_kwargs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import logging

from aiohttp import web
from common_library.error_codes import create_error_code
from common_library.json_serialization import json_dumps
from common_library.user_messages import user_message
from models_library.api_schemas_catalog.service_access_rights import (
ServiceAccessRightsGet,
)
Expand All @@ -23,9 +25,11 @@
NodeServiceGet,
ProjectNodeServicesGet,
)
from models_library.basic_types import IDStr
from models_library.groups import EVERYONE_GROUP_ID, Group, GroupID, GroupType
from models_library.projects import Project, ProjectID
from models_library.projects_nodes_io import NodeID, NodeIDStr
from models_library.rest_error import ErrorGet
from models_library.services import ServiceKeyVersion
from models_library.services_resources import ServiceResourcesDict
from models_library.services_types import ServiceKey, ServiceVersion
Expand All @@ -43,6 +47,7 @@
UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE,
X_SIMCORE_USER_AGENT,
)
from servicelib.logging_errors import create_troubleshootting_log_kwargs
from servicelib.long_running_tasks.models import TaskProgress
from servicelib.long_running_tasks.task import TaskRegistry
from servicelib.mimetype_constants import MIMETYPE_APPLICATION_JSON
Expand All @@ -57,6 +62,7 @@
from ..._meta import API_VTAG as VTAG
from ...catalog import catalog_service
from ...dynamic_scheduler import api as dynamic_scheduler_service
from ...exception_handling import create_error_response
from ...groups import api as groups_service
from ...groups.exceptions import GroupNotFoundError
from ...login.decorators import login_required
Expand Down Expand Up @@ -304,7 +310,7 @@ async def _stop_dynamic_service_task(
*,
app: web.Application,
dynamic_service_stop: DynamicServiceStop,
):
) -> web.Response:
_ = progress
# NOTE: _handle_project_nodes_exceptions only decorate handlers
try:
Expand All @@ -323,18 +329,41 @@ async def _stop_dynamic_service_task(
return web.json_response(status=status.HTTP_204_NO_CONTENT)

except (RPCServerError, ServiceWaitingForManualInterventionError) as exc:
# in case there is an error reply as not found
raise web.HTTPNotFound(text=f"{exc}") from exc
error_code = getattr(exc, "error_code", None) or create_error_code(exc)
user_error_msg = user_message(
f"Could not stop dynamic service {dynamic_service_stop.project_id}.{dynamic_service_stop.node_id}"
)
_logger.debug(
**create_troubleshootting_log_kwargs(
user_error_msg,
error=exc,
error_code=error_code,
error_context={
"project_id": dynamic_service_stop.project_id,
"node_id": dynamic_service_stop.node_id,
"user_id": dynamic_service_stop.user_id,
"save_state": dynamic_service_stop.save_state,
"simcore_user_agent": dynamic_service_stop.simcore_user_agent,
},
)
)
# ANE: in case there is an error reply as not found
return create_error_response(
error=ErrorGet(
message=user_error_msg,
support_id=IDStr(error_code),
status=status.HTTP_404_NOT_FOUND,
),
status_code=status.HTTP_404_NOT_FOUND,
)

except ServiceWasNotFoundError:
# in case the service is not found reply as all OK
return web.json_response(status=status.HTTP_204_NO_CONTENT)


def register_stop_dynamic_service_task(app: web.Application) -> None:
TaskRegistry.register(
_stop_dynamic_service_task, allowed_errors=(web.HTTPNotFound,), app=app
)
TaskRegistry.register(_stop_dynamic_service_task, app=app)


@routes.post(
Expand Down
Loading