111 lines
4.4 KiB
Python
111 lines
4.4 KiB
Python
from typing import Optional, Dict, Any
|
||
from sqlalchemy.ext.asyncio import AsyncConnection
|
||
from orm.tables.process import ProcessStatus
|
||
|
||
from api.db.logic.process_schema import (
|
||
get_process_schema_by_id,
|
||
get_process_schema_page_DTO,
|
||
update_process_schema_by_id,
|
||
create_process_schema,
|
||
update_process_schema_settings_by_id,
|
||
)
|
||
from api.db.logic.ps_node import create_ps_node_schema
|
||
from api.schemas.process.process_schema import ProcessSchema, ProcessSchemaSettingsNode
|
||
from api.schemas.endpoints.process_schema import AllProcessSchemaResponse, ProcessSchemaFilterDTO, ProcessSchemaUpdate
|
||
from api.schemas.process.ps_node import Ps_NodeFrontResponse, Ps_NodeFrontResponseNode
|
||
from orm.tables.process import NodeType
|
||
from core import VorkNodeRegistry
|
||
from model_nodes import ListenNodeData
|
||
from api.utils.node_counter import increment_node_counter
|
||
|
||
|
||
class ProcessSchemaService:
|
||
"""Сервис для работы с process schema"""
|
||
|
||
def __init__(self, connection: AsyncConnection):
|
||
self.connection = connection
|
||
|
||
async def list(self, filter_dto: ProcessSchemaFilterDTO) -> Optional[AllProcessSchemaResponse]:
|
||
"""
|
||
Получает список схем процессов с пагинацией и фильтрацией.
|
||
"""
|
||
return await get_process_schema_page_DTO(self.connection, filter_dto)
|
||
|
||
async def get(self, process_schema_id: int) -> Optional[ProcessSchema]:
|
||
"""
|
||
Получает схему процесса по ID.
|
||
"""
|
||
return await get_process_schema_by_id(self.connection, process_schema_id)
|
||
|
||
async def create(self, creator_id: int) -> Dict[str, Any]:
|
||
"""
|
||
Создаёт новую схему процесса с начальной нодой LISTEN.
|
||
"""
|
||
current_node_counter = increment_node_counter()
|
||
title = f"Новая схема {current_node_counter}"
|
||
description = "Default description"
|
||
|
||
node_id = await create_process_schema(self.connection, creator_id, title, description)
|
||
|
||
process_schema_new = await get_process_schema_by_id(self.connection, node_id)
|
||
|
||
start_node_data = ListenNodeData(ps_id=process_schema_new.id, node_type=NodeType.START.value, is_start="True")
|
||
|
||
start_node_links = {}
|
||
|
||
registery = VorkNodeRegistry()
|
||
|
||
vork_node = registery.get("LISTEN")
|
||
|
||
node_descriptor = vork_node.form()
|
||
|
||
start_node = vork_node(data=start_node_data.model_dump(), links=start_node_links)
|
||
|
||
validated_start_schema = start_node.validate()
|
||
|
||
db_start_schema = await create_ps_node_schema(self.connection, validated_start_schema, creator_id)
|
||
|
||
node = ProcessSchemaSettingsNode(
|
||
id=db_start_schema.id,
|
||
node_type=NodeType.LISTEN.value,
|
||
data=validated_start_schema.data.model_dump(),
|
||
from_node=None,
|
||
links=None,
|
||
)
|
||
|
||
settings_dict = {"node": node.model_dump(mode="json")}
|
||
|
||
await update_process_schema_settings_by_id(self.connection, process_schema_new.id, settings_dict)
|
||
|
||
process_schema_new = await get_process_schema_by_id(self.connection, node_id)
|
||
|
||
ps_node_front_response = Ps_NodeFrontResponse(
|
||
description=node_descriptor.model_dump(),
|
||
node=Ps_NodeFrontResponseNode(
|
||
id=db_start_schema.id, node_type=NodeType.LISTEN.value, data=validated_start_schema.data.model_dump()
|
||
),
|
||
link=None,
|
||
)
|
||
|
||
response_data = {
|
||
"process_schema": process_schema_new.model_dump(),
|
||
"node_listen": ps_node_front_response.model_dump(),
|
||
}
|
||
|
||
return response_data
|
||
|
||
async def update(self, process_schema_id: int, update_data: dict, process_schema: ProcessSchema) -> ProcessSchema:
|
||
"""
|
||
Обновляет данные схемы процесса.
|
||
"""
|
||
await update_process_schema_by_id(self.connection, update_data, process_schema)
|
||
return await get_process_schema_by_id(self.connection, process_schema_id)
|
||
|
||
async def delete(self, process_schema_id: int, process_schema: ProcessSchema) -> None:
|
||
"""
|
||
Помечает схему процесса как удалённую.
|
||
"""
|
||
process_schema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value)
|
||
update_data = process_schema_update.model_dump(by_alias=True, exclude_none=True)
|
||
await update_process_schema_by_id(self.connection, update_data, process_schema)
|