Reference

APP_CONTEXT: ContextVar[dict | None] = <ContextVar name='APP_CONTEXT' default=None>

Application context variable.

class AppConfig[source]

Bases: TypedDict

name: str

application unique name

env: str

application environment name: prod, test, qa, etc.

loglevel: Literal['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NEVER'] | None

default log level for the app and app services

settings: dict

args for application __init__

optional_services: list[str]

list of optional services (names)

services: list[ServiceConfig]

list of service settings

class Application[source]

Bases: object

Application is a service class combining multiple other service.

name: str

Unique service name. Service class name is used by default by the app service manager. When you declare more than one service of the same type you MUST explicitly provide unique names for them.

logger: Logger

Logger instance. During the app init a logger instance is provided automatically by the app constructor.

state: State

Service work state.

env: str

App environment (scope). See Environment for a list of standard environments. It’s not mandatory but recommended.

context: ContextVar[dict | None]

Context var to store a server call context.

debug: bool = False

Run app in debug mode.

service_start_timeout_s: float = 30.0

A timeout (sec) for each service start. An error will be produced if taking more than this interval.

post_init_timeout_s: float = 300.0

A post-init task timeout (sec) for ALL the services combined.

show_inspection_on_start: bool = False

Show inspection data in logs after the app start.

metadata: dict

Application metadata not used by it directly.

scheduler: Scheduler

Internal task scheduler.

optional_services: list[str]

List of optional services not required for the app start.

__init__(name: str, logger: ~uvlog.uvlog.Logger, env: str, context: ~_contextvars.ContextVar[dict | None], debug: bool = False, service_start_timeout_s: float = 30.0, post_init_timeout_s: float = 300.0, show_inspection_on_start: bool = False, metadata: dict = <factory>, optional_services: list[str] = <factory>) None
services: mappingproxy[str, Service] = mappingproxy({})

Application services registry.

add_services(*services: Service) None[source]

Add new services to the application.

The loading order must be resolved.

set_context_var(key: str, value: Any) None[source]

Set variable in the current async call context.

get_context_var(key: str) Any | None[source]

Get key from the current async call context or None if no key.

json_repr() dict[str, Any][source]
async inspect(services: list[str] | None = None) dict[source]

Inspect the app and get all services data and health.

async start() None[source]

Initialize all services and tasks.

async stop(_idx: int | None = None, /) None[source]

Stop all services and tasks.

async wait(_for_status: ServiceState = ServiceState.READY, /) None[source]

Wait until the service is ready.

class ApplicationLoader[source]

Bases: object

Application loader class constructs an application and services from a config object.

This class does several things to prepare the app before its start:

  1. Import services from kaiju packages using import_packages(). Packages are

    imported according to the packages list in the config file. The services are imported into the service_classes class registry by their class names.

  2. Load all services using the imported packages.

  3. Resolve dependencies in service fields and load service instances in these attributes.

  4. Resolve service starting order according to which dependency each service has.

  5. Create an application from these services.

service_classes: dict[str, type[Service]]

Registry of service classes.

allow_service_name_overrides: bool = False

Allow services with the same name to override each other.

create_all(app_class: type[_Application], config: ProjectConfig, *, context: ContextVar[dict | None] = APP_CONTEXT) _Application[source]

Load services from packages and return a new application.

static configure_loggers(config: _DictConfig, context: ContextVar[dict | None], /) None[source]
load_extensions(package_names: list[str], /) None[source]

Load services from specified packages.

create_app(app_class: type[_Application], config: AppConfig, context: ContextVar[dict | None], debug: bool, /) _Application[source]
__init__(service_classes: dict[str, type[~kaiju_app.app.Service]] = <factory>, allow_service_name_overrides: bool = False) None
class Configurator[source]

Configuration loader.

This class helps to prepare configuration dict from a list of configuration files.

>>> template  = {'app': {'name': '[_doctest_app_name]', 'env': '[_doctest_app_env]'}}
>>> env = {'_doctest_app_name': 'app', '_doctest_app_env': 'prod'}
>>> configurator = Configurator()
>>> configurator.create_configuration([template], [env])
{'debug': False, 'packages': [], 'logging': {}, 'app': {'name': 'app', 'env': 'prod', 'loglevel': None, 'settings': {}, 'optional_services': [], 'services': []}}
create_configuration(templates: list[dict[str, Any]], envs: list[dict[str, Any]], *, load_os_env: bool = False, load_cli_env: bool = False) ProjectConfig[source]

Create a project configuration from template and environment data.

Usually you would store configs in config files. Load them using an appropriate method (json or yaml loader) and then pass to templates and envs arguments.

Initialization order:

  1. Merge templates from first to last

  2. Merge env dicts from first to last

  3. Load OS environment variables

  4. Load CLI environment variables from ‘–env’ flags

  5. Evaluate template using resulting env dict

  6. Normalize and return the project config dict

See merge_dicts() function on the rules of how dictionaries are merged.

See the template-dict documentation on template syntax.

class ConfigurationError[source]

Bases: RuntimeError

Invalid configuration.

__init__(*args, **kwargs)
class Contextable[source]

Bases: Protocol

async start() None[source]
async stop() None[source]
__init__(*args, **kwargs)
class DependencyCycleError[source]

Bases: RuntimeError

Dependency cycle has been detected.

__init__(*args, **kwargs)
class ServiceNameConflict[source]

Bases: RuntimeError

A dependency with the same name already exists.

__init__(*args, **kwargs)
class DependencyNotFound[source]

Bases: RuntimeError

Dependency service not found in the list of application services.

__init__(*args, **kwargs)
class Error[source]

Bases: BaseException, ABC

Base error class for application errors.

To get a JSONRPC compatible error user json_repr method:

>>> Error('some error', value=1).json_repr()
{'code': 0, 'message': 'some error', 'data': {'type': 'Error', 'type_base': 'BaseException', 'extra': {'value': 1}}}

To wrap a standard exception in error type:

>>> Error.wrap_exception(ValueError('something happened'))
Error('something happened')
code: ClassVar[int] = 0

Error JSONRPC code

__init__(msg: str, /, **extra)[source]
json_repr()[source]

JSONRPC compatible error data.

add_note()

Exception.add_note(note) – add a note to the exception

classmethod wrap_exception(exc: BaseException, /) Error[source]
class ErrorData[source]

Bases: TypedDict

code: int
message: str
data: _ErrorDataData
eval_string(value: str, /) Any[source]

Evaluate an environment text value into a python variable using save eval.

This method is useful when loading values from Unix environment or CLI arguments.

There are few predefined values: true, false, none, null will be evaluated to True, False and None with disregard of their case.

>>> eval_string('true')
True

Empty values are evaluated to None.

>>> eval_string('')

In all other cases a value will be evaluated with python eval() function.

>>> eval_string('[1, 2, 3]')
[1, 2, 3]
>>> eval_string('"42"')
'42'
class JSONSerializable[source]

Bases: Protocol

json_repr() dict[str, Any][source]
__init__(*args, **kwargs)
class Health[source]

Bases: TypedDict

Service health statistics.

healthy: bool

service is healthy

stats: dict[str, Any]

reserved for stats and metrics

errors: list[str]

list of error messages

class ProjectConfig[source]

Bases: TypedDict

debug: bool

run the project in debug mode

packages: list[str]

list of service packages to import

logging: _DictConfig

loggers and handlers settings

app: AppConfig

application settings

run_app(app: Application, /, loop: AbstractEventLoop | None = None) None[source]
merge_dicts(*dicts: Mapping) dict[source]

Merge multiple dicts into a new one recursively.

The function is optimized for simple JSON compatible data types. It may not work as expected for some custom collections. See the sources and decide for yourself.

The priority is from first to last, the last dict overwrites the first.

>>> merge_dicts({"a": 1, "b": 2}, {"a": 3, "c": 4})
{'a': 3, 'b': 2, 'c': 4}

Note that mutable collections will be merged (lists, sets, dicts).

>>> merge_dicts({"a": [1, 2], "b": {1}, "c": {"d": 1}}, {"a": [3], "b": {2}, "c": {"e": 2}})
{'a': [1, 2, 3], 'b': {1, 2}, 'c': {'d': 1, 'e': 2}}

Immutable collections are treated as frozen values and can only be replaced.

>>> merge_dicts({"a": (1, 2), "b": frozenset({5})}, {"a": (3, 4), "b": frozenset({6})})
{'a': (3, 4), 'b': frozenset({6})}
service(*, name=_Sentinel, metadata=None, required: bool = True, nowait: bool = False)[source]

Service field describing another service dependency.

Parameters:
  • name – custom service name

  • required – this dependency is required for the service to work

  • nowait – do not wait for this service initialization

  • metadata – additional field metadata, stored in the dataclass field

This field provides auto-discovery for dependency services in your service. The service will try to automatically discover a dependency under this field and assign it to the field. This happens in service init() method on application start.

Your service must be a dataclass for this to work. Then just add this field type hinting the dependency class.

class Service[source]

Bases: ABC

Application service - a building block for an application.

Service is a modular part of an application. Each service must implement only specific application logic in a limited scope.

app: Application

Application this service is linked to.

name: str

Unique service name for referencing it in other services of the app.

logger: Logger

Logger instance.

state: State

Service state

async init() None[source]

Initialize application context.

This method shouldn’t be directly called outside the service.

Here you should write service initialization procedures. It will be called through start() by the app service manager on the app start.

async post_init()[source]

Run additional scripts and commands after the start().

The main difference of post_init() from init() is that the service is considered READY when the post init is called and the app should be in working condition with all services initialized.

Post init is called without any time limit unless you implement it explicitly inside the method. There’s a global time limit on all post init tasks set by post_init_timeout_s in application settings.

async close() None[source]

Close application context.

This method shouldn’t be directly called outside the service.

Here you should write service de-initialization procedures. It will be called through stop() by the app service manager on the app close.

json_repr() dict[source]

Get service information for inspection or logging.

async get_health() Health[source]

Check if the service is healthy.

Return all occurred errors in error field.

final async start() None[source]

Start the service.

This method is executed on application start and when a service context is called. Use init() to implement custom initialization procedures and check_health() to check the service health afterward.

final async stop() None[source]

Close application context.

This method is executed on application exit and when a service context is called and exited. Use close() to implement custom de-initialization procedures.

async wait(_for_status: ServiceState = ServiceState.READY, /) None[source]

Wait until the service is ready.

__init__(app: Application, name: str, logger: Logger) None
class ServiceConfig[source]

Bases: TypedDict

cls: str

service class name

name: str

service custom name

loglevel: Literal['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NEVER'] | None

service logger log level

enabled: bool

enable or disable service

settings: dict

args for service __init__

class ServiceFieldType[source]

Bases: Field

Used to store dependent service settings for other services.

__init__(required: bool, nowait: bool, *args)[source]

Initialize.

required
nowait
class ServiceInitFailed[source]

Bases: Error, RuntimeError

Initialization of a service has failed.

__init__(msg: str, /, **extra)
class ServiceInitTimeout[source]

Bases: Error, RuntimeError

Service is exceeded its timeout during the initialization.

__init__(msg: str, /, **extra)
final class ServiceState[source]

Service state types.

class SortedStack[source]

Bases: Sized, Iterable, Generic[_Item]

Sorted stack of elements.

>>> stack = SortedStack({'dogs': 12, 'sobaki': 5})
>>> stack = SortedStack(stack)
>>> stack.add(*SortedStack({'cats': 5}))

Selection:

>>> stack.select(8)
['sobaki', 'cats']
>>> stack.rselect(8)
['dogs']

Insertion and removal:

>>> stack.add(('koty', 1))
>>> stack.pop_many(3)
['koty']
>>> stack.pop()
'sobaki'
>>> len(stack)
2
>>> stack.clear()
>>> bool(stack)
False
__init__(items: Iterable[tuple[_Item, Any]] | dict[_Item, Any], /)[source]

Initialize.

property lowest_score: Any | None

Get the lowest score in the stack.

add(*items: tuple[_Item, Any])[source]

Extend the stack by adding more than one element.

select(score_threshold, /) list[_Item][source]

Select and return items without removing them from the lowest score to score_threshold.

The values are guaranteed to be in order.

rselect(score_threshold: Any, /) list[_Item][source]

Select and return items without removing them from the highest score to score_threshold.

The values are guaranteed to be in order.

pop() _Item[source]

Pop a single element which has the lowest score.

Raises:

StopIteration – if there are no values to return.

rpop() _Item[source]

Pop a single element which has the highest score.

Raises:

StopIteration – if there are no values to return.

pop_many(score_threshold: Any, /) list[_Item][source]

Pop and return values with scores less than score_threshold.

The returned values are guaranteed to be in order. Returns an empty list if no values.

rpop_many(score_threshold: Any, /) list[_Item][source]

Pop and return values with scores greater than score_threshold.

Returned values are guaranteed to be in order.

clear() None[source]

Clear all values.

class State[source]

Bases: Generic[_Status], ABC

Awaitable state machine.

To create you state machine you must create a status list and assign it to your subclass.

>>> from enum import Enum
...
>>> class UserStatus(Enum):
...     # it's recommended to use the same values for both name and a value
...     INACTIVE = 'INACTIVE'
...     ACTIVE = 'ACTIVE'
...     BANNED = 'BANNED'

Now you can create and maintain the state of your object. Note that by default it uses the first status from the enum unless state parameter is explicitly provided. You can get the current state using get() method.

>>> class User:
...     def __init__(self):
...         self.state = State(UserStatus, UserStatus.INACTIVE)
...
>>> user = User()
>>> user.state.get().name
'INACTIVE'
>>> str(user.state)
'INACTIVE'

You can set the state using set() method.

>>> user.state.set(UserStatus.ACTIVE)

It’s also possible to use the state change contextto change an object state inside a function.

>>> with user.state:
...     user.state.set(UserStatus.BANNED)
...     user.state.set(UserStatus.INACTIVE)
...     raise ValueError('Unhandled error')
Traceback (most recent call last):
...
ValueError: Unhandled error

In any error the last state before context is preserved.

>>> user.state.get().name
'ACTIVE'

Two object states can be compared.

>>> other_user = User()
>>> other_user.state.set(UserStatus.ACTIVE)
>>> other_user.state == user.state
True

You can also check if the state object has a particular inner state by calling this method.

>>> other_user.state.is_(UserStatus.ACTIVE)
True

Of course, a state machine would be useless in the async context if there would be no way to wait for a particular state. You can use wait() to wait for a particular state.

>>> async def wait_banned(_user: User):
...  await _user.state.wait(UserStatus.BANNED)
...  return 'Banned!'

Test example:

>>> async def ban_user(_user: User):
...     _user.state.set(UserStatus.BANNED)
>>> async def _main():
...     return await asyncio.gather(wait_banned(user), ban_user(user))
>>> asyncio.run(_main())
['Banned!', None]
__init__(status_list: Collection[_Status], status: _Status)[source]

Initialize.

Parameters:
  • status_list

  • status – initial status, the first value from self.Status is used by default

get() _Status[source]
is_(state: _Status, /) bool[source]
set(state: _Status, /) None[source]
async wait(state: _Status, /) None[source]