Reference¶
- APP_CONTEXT: ContextVar[dict | None] = <ContextVar name='APP_CONTEXT' default=None>¶
Application context variable.
- class AppConfig[source]¶
Bases:
TypedDict
- name: str¶
application unique name
- env: str¶
application environment name: prod, test, qa, etc.
- loglevel: Literal['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NEVER'] | None¶
default log level for the app and app services
- settings: dict¶
args for application __init__
- optional_services: list[str]¶
list of optional services (names)
- services: list[ServiceConfig]¶
list of service settings
- class Application[source]¶
Bases:
object
Application is a service class combining multiple other service.
- name: str¶
Unique service name. Service class name is used by default by the app service manager. When you declare more than one service of the same type you MUST explicitly provide unique names for them.
- logger: Logger¶
Logger instance. During the app init a logger instance is provided automatically by the app constructor.
- env: str¶
App environment (scope). See
Environment
for a list of standard environments. It’s not mandatory but recommended.
- context: ContextVar[dict | None]¶
Context var to store a server call context.
- debug: bool = False¶
Run app in debug mode.
- service_start_timeout_s: float = 30.0¶
A timeout (sec) for each service start. An error will be produced if taking more than this interval.
- post_init_timeout_s: float = 300.0¶
A post-init task timeout (sec) for ALL the services combined.
- show_inspection_on_start: bool = False¶
Show inspection data in logs after the app start.
- metadata: dict¶
Application metadata not used by it directly.
- scheduler: Scheduler¶
Internal task scheduler.
- optional_services: list[str]¶
List of optional services not required for the app start.
- __init__(name: str, logger: ~uvlog.uvlog.Logger, env: str, context: ~_contextvars.ContextVar[dict | None], debug: bool = False, service_start_timeout_s: float = 30.0, post_init_timeout_s: float = 300.0, show_inspection_on_start: bool = False, metadata: dict = <factory>, optional_services: list[str] = <factory>) None ¶
- add_services(*services: Service) None [source]¶
Add new services to the application.
The loading order must be resolved.
- set_context_var(key: str, value: Any) None [source]¶
Set variable in the current async call context.
- get_context_var(key: str) Any | None [source]¶
Get key from the current async call context or None if no key.
- async inspect(services: list[str] | None = None) dict [source]¶
Inspect the app and get all services data and health.
- async wait(_for_status: ServiceState = ServiceState.READY, /) None [source]¶
Wait until the service is ready.
- class ApplicationLoader[source]¶
Bases:
object
Application loader class constructs an application and services from a config object.
This class does several things to prepare the app before its start:
- Import services from kaiju packages using
import_packages()
. Packages are imported according to the packages list in the config file. The services are imported into the
service_classes
class registry by their class names.
- Import services from kaiju packages using
Load all services using the imported packages.
Resolve dependencies in
service
fields and load service instances in these attributes.Resolve service starting order according to which dependency each service has.
Create an application from these services.
- allow_service_name_overrides: bool = False¶
Allow services with the same name to override each other.
- create_all(app_class: type[_Application], config: ProjectConfig, *, context: ContextVar[dict | None] = APP_CONTEXT) _Application [source]¶
Load services from packages and return a new application.
- create_app(app_class: type[_Application], config: AppConfig, context: ContextVar[dict | None], debug: bool, /) _Application [source]¶
- __init__(service_classes: dict[str, type[~kaiju_app.app.Service]] = <factory>, allow_service_name_overrides: bool = False) None ¶
- class Configurator[source]¶
Configuration loader.
This class helps to prepare configuration dict from a list of configuration files.
>>> template = {'app': {'name': '[_doctest_app_name]', 'env': '[_doctest_app_env]'}} >>> env = {'_doctest_app_name': 'app', '_doctest_app_env': 'prod'} >>> configurator = Configurator() >>> configurator.create_configuration([template], [env]) {'debug': False, 'packages': [], 'logging': {}, 'app': {'name': 'app', 'env': 'prod', 'loglevel': None, 'settings': {}, 'optional_services': [], 'services': []}}
- create_configuration(templates: list[dict[str, Any]], envs: list[dict[str, Any]], *, load_os_env: bool = False, load_cli_env: bool = False) ProjectConfig [source]¶
Create a project configuration from template and environment data.
Usually you would store configs in config files. Load them using an appropriate method (json or yaml loader) and then pass to templates and envs arguments.
Initialization order:
Merge templates from first to last
Merge env dicts from first to last
Load OS environment variables
Load CLI environment variables from ‘–env’ flags
Evaluate template using resulting env dict
Normalize and return the project config dict
See
merge_dicts()
function on the rules of how dictionaries are merged.See the template-dict documentation on template syntax.
- class ConfigurationError[source]¶
Bases:
RuntimeError
Invalid configuration.
- __init__(*args, **kwargs)¶
- class DependencyCycleError[source]¶
Bases:
RuntimeError
Dependency cycle has been detected.
- __init__(*args, **kwargs)¶
- class ServiceNameConflict[source]¶
Bases:
RuntimeError
A dependency with the same name already exists.
- __init__(*args, **kwargs)¶
- class DependencyNotFound[source]¶
Bases:
RuntimeError
Dependency service not found in the list of application services.
- __init__(*args, **kwargs)¶
- class Error[source]¶
Bases:
BaseException
,ABC
Base error class for application errors.
To get a JSONRPC compatible error user json_repr method:
>>> Error('some error', value=1).json_repr() {'code': 0, 'message': 'some error', 'data': {'type': 'Error', 'type_base': 'BaseException', 'extra': {'value': 1}}}
To wrap a standard exception in error type:
>>> Error.wrap_exception(ValueError('something happened')) Error('something happened')
- code: ClassVar[int] = 0¶
Error JSONRPC code
- add_note()¶
Exception.add_note(note) – add a note to the exception
- eval_string(value: str, /) Any [source]¶
Evaluate an environment text value into a python variable using save eval.
This method is useful when loading values from Unix environment or CLI arguments.
There are few predefined values: true, false, none, null will be evaluated to True, False and None with disregard of their case.
>>> eval_string('true') True
Empty values are evaluated to None.
>>> eval_string('')
In all other cases a value will be evaluated with python eval() function.
>>> eval_string('[1, 2, 3]') [1, 2, 3]
>>> eval_string('"42"') '42'
- class Health[source]¶
Bases:
TypedDict
Service health statistics.
- healthy: bool¶
service is healthy
- stats: dict[str, Any]¶
reserved for stats and metrics
- errors: list[str]¶
list of error messages
- class ProjectConfig[source]¶
Bases:
TypedDict
- debug: bool¶
run the project in debug mode
- packages: list[str]¶
list of service packages to import
- logging: _DictConfig¶
loggers and handlers settings
- run_app(app: Application, /, loop: AbstractEventLoop | None = None) None [source]¶
- merge_dicts(*dicts: Mapping) dict [source]¶
Merge multiple dicts into a new one recursively.
The function is optimized for simple JSON compatible data types. It may not work as expected for some custom collections. See the sources and decide for yourself.
The priority is from first to last, the last dict overwrites the first.
>>> merge_dicts({"a": 1, "b": 2}, {"a": 3, "c": 4}) {'a': 3, 'b': 2, 'c': 4}
Note that mutable collections will be merged (lists, sets, dicts).
>>> merge_dicts({"a": [1, 2], "b": {1}, "c": {"d": 1}}, {"a": [3], "b": {2}, "c": {"e": 2}}) {'a': [1, 2, 3], 'b': {1, 2}, 'c': {'d': 1, 'e': 2}}
Immutable collections are treated as frozen values and can only be replaced.
>>> merge_dicts({"a": (1, 2), "b": frozenset({5})}, {"a": (3, 4), "b": frozenset({6})}) {'a': (3, 4), 'b': frozenset({6})}
- service(*, name=_Sentinel, metadata=None, required: bool = True, nowait: bool = False)[source]¶
Service field describing another service dependency.
- Parameters:
name – custom service name
required – this dependency is required for the service to work
nowait – do not wait for this service initialization
metadata – additional field metadata, stored in the dataclass field
This field provides auto-discovery for dependency services in your service. The service will try to automatically discover a dependency under this field and assign it to the field. This happens in service
init()
method on application start.Your service must be a dataclass for this to work. Then just add this field type hinting the dependency class.
- class Service[source]¶
Bases:
ABC
Application service - a building block for an application.
Service is a modular part of an application. Each service must implement only specific application logic in a limited scope.
- app: Application¶
Application this service is linked to.
- name: str¶
Unique service name for referencing it in other services of the app.
- logger: Logger¶
Logger instance.
- async init() None [source]¶
Initialize application context.
This method shouldn’t be directly called outside the service.
Here you should write service initialization procedures. It will be called through
start()
by the app service manager on the app start.
- async post_init()[source]¶
Run additional scripts and commands after the
start()
.The main difference of
post_init()
frominit()
is that the service is consideredREADY
when the post init is called and the app should be in working condition with all services initialized.Post init is called without any time limit unless you implement it explicitly inside the method. There’s a global time limit on all post init tasks set by
post_init_timeout_s
in application settings.
- async close() None [source]¶
Close application context.
This method shouldn’t be directly called outside the service.
Here you should write service de-initialization procedures. It will be called through
stop()
by the app service manager on the app close.
- async get_health() Health [source]¶
Check if the service is healthy.
Return all occurred errors in error field.
- final async start() None [source]¶
Start the service.
This method is executed on application start and when a service context is called. Use
init()
to implement custom initialization procedures andcheck_health()
to check the service health afterward.
- final async stop() None [source]¶
Close application context.
This method is executed on application exit and when a service context is called and exited. Use
close()
to implement custom de-initialization procedures.
- async wait(_for_status: ServiceState = ServiceState.READY, /) None [source]¶
Wait until the service is ready.
- __init__(app: Application, name: str, logger: Logger) None ¶
- class ServiceConfig[source]¶
Bases:
TypedDict
- cls: str¶
service class name
- name: str¶
service custom name
- loglevel: Literal['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NEVER'] | None¶
service logger log level
- enabled: bool¶
enable or disable service
- settings: dict¶
args for service __init__
- class ServiceFieldType[source]¶
Bases:
Field
Used to store dependent service settings for other services.
- required¶
- nowait¶
- class ServiceInitFailed[source]¶
Bases:
Error
,RuntimeError
Initialization of a service has failed.
- __init__(msg: str, /, **extra)¶
- class ServiceInitTimeout[source]¶
Bases:
Error
,RuntimeError
Service is exceeded its timeout during the initialization.
- __init__(msg: str, /, **extra)¶
- class SortedStack[source]¶
Bases:
Sized
,Iterable
,Generic
[_Item
]Sorted stack of elements.
>>> stack = SortedStack({'dogs': 12, 'sobaki': 5}) >>> stack = SortedStack(stack) >>> stack.add(*SortedStack({'cats': 5}))
Selection:
>>> stack.select(8) ['sobaki', 'cats']
>>> stack.rselect(8) ['dogs']
Insertion and removal:
>>> stack.add(('koty', 1)) >>> stack.pop_many(3) ['koty']
>>> stack.pop() 'sobaki'
>>> len(stack) 2
>>> stack.clear() >>> bool(stack) False
- property lowest_score: Any | None¶
Get the lowest score in the stack.
- select(score_threshold, /) list[_Item] [source]¶
Select and return items without removing them from the lowest score to score_threshold.
The values are guaranteed to be in order.
- rselect(score_threshold: Any, /) list[_Item] [source]¶
Select and return items without removing them from the highest score to score_threshold.
The values are guaranteed to be in order.
- pop() _Item [source]¶
Pop a single element which has the lowest score.
- Raises:
StopIteration – if there are no values to return.
- rpop() _Item [source]¶
Pop a single element which has the highest score.
- Raises:
StopIteration – if there are no values to return.
- pop_many(score_threshold: Any, /) list[_Item] [source]¶
Pop and return values with scores less than score_threshold.
The returned values are guaranteed to be in order. Returns an empty list if no values.
- class State[source]¶
Bases:
Generic
[_Status
],ABC
Awaitable state machine.
To create you state machine you must create a status list and assign it to your subclass.
>>> from enum import Enum ... >>> class UserStatus(Enum): ... # it's recommended to use the same values for both name and a value ... INACTIVE = 'INACTIVE' ... ACTIVE = 'ACTIVE' ... BANNED = 'BANNED'
Now you can create and maintain the state of your object. Note that by default it uses the first status from the enum unless state parameter is explicitly provided. You can get the current state using
get()
method.>>> class User: ... def __init__(self): ... self.state = State(UserStatus, UserStatus.INACTIVE) ... >>> user = User() >>> user.state.get().name 'INACTIVE'
>>> str(user.state) 'INACTIVE'
You can set the state using
set()
method.>>> user.state.set(UserStatus.ACTIVE)
It’s also possible to use the state change contextto change an object state inside a function.
>>> with user.state: ... user.state.set(UserStatus.BANNED) ... user.state.set(UserStatus.INACTIVE) ... raise ValueError('Unhandled error') Traceback (most recent call last): ... ValueError: Unhandled error
In any error the last state before context is preserved.
>>> user.state.get().name 'ACTIVE'
Two object states can be compared.
>>> other_user = User() >>> other_user.state.set(UserStatus.ACTIVE) >>> other_user.state == user.state True
You can also check if the state object has a particular inner state by calling this method.
>>> other_user.state.is_(UserStatus.ACTIVE) True
Of course, a state machine would be useless in the async context if there would be no way to wait for a particular state. You can use
wait()
to wait for a particular state.>>> async def wait_banned(_user: User): ... await _user.state.wait(UserStatus.BANNED) ... return 'Banned!'
Test example:
>>> async def ban_user(_user: User): ... _user.state.set(UserStatus.BANNED)
>>> async def _main(): ... return await asyncio.gather(wait_banned(user), ban_user(user))
>>> asyncio.run(_main()) ['Banned!', None]