From 0330356ea2ab578167b901970ecd65edf28e257c Mon Sep 17 00:00:00 2001 From: TheNoxium Date: Sat, 15 Nov 2025 16:00:05 +0500 Subject: [PATCH] feat: settings, delete port validation --- api/api/db/logic/list_events.py | 94 ------------------- api/api/db/logic/node_link.py | 19 +++- api/api/db/logic/process_schema.py | 45 --------- api/api/db/logic/ps_node.py | 23 ++++- api/api/endpoints/process_schema.py | 35 +++---- api/api/endpoints/ps_node.py | 50 +++++----- api/api/schemas/process/process_schema.py | 22 +---- .../process/process_version_archive.py | 13 --- api/api/schemas/process/ps_node.py | 19 +--- api/api/services/endpoints/process_schema.py | 89 +++++++++++------- api/api/services/endpoints/ps_node.py | 35 +------ 11 files changed, 140 insertions(+), 304 deletions(-) delete mode 100644 api/api/schemas/process/process_version_archive.py diff --git a/api/api/db/logic/list_events.py b/api/api/db/logic/list_events.py index 85a7cef..b50cc00 100644 --- a/api/api/db/logic/list_events.py +++ b/api/api/db/logic/list_events.py @@ -121,100 +121,6 @@ async def get_list_events_page_DTO( ) -async def get_list_events_page_by_creator_id( - connection: AsyncConnection, creator_id: int, page: int, limit: int -) -> Optional[AllListEventResponse]: - """ - Получает список событий заданного создателя по значениям page и limit и creator_id. - """ - - first_event = page * limit - limit - query = ( - select( - list_events_table.c.id, - list_events_table.c.name, - list_events_table.c.title, - list_events_table.c.creator_id, - list_events_table.c.created_at, - list_events_table.c.schema, - list_events_table.c.state, - list_events_table.c.status, - ) - .where(list_events_table.c.creator_id == creator_id) # Фильтрация по creator_id - .order_by(list_events_table.c.id) - .offset(first_event) - .limit(limit) - ) - - count_query = ( - select(func.count()) - .select_from(list_events_table) - .where(list_events_table.c.creator_id == creator_id) # Фильтрация по creator_id - ) - - result = await connection.execute(query) - count_result = await connection.execute(count_query) - - events_data = result.mappings().all() - total_count = count_result.scalar() - total_pages = math.ceil(total_count / limit) - - # Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные - validated_list_event = all_list_event_adapter.validate_python(events_data) - - return AllListEventResponse( - list_event=validated_list_event, - amount_count=total_count, - amount_pages=total_pages, - current_page=page, - limit=limit, - ) - - -async def get_list_events_page(connection: AsyncConnection, page, limit) -> Optional[AllListEventResponse]: - """ - Получает список событий заданного создателя по значениям page и limit. - """ - - first_event = page * limit - (limit) - - query = ( - select( - list_events_table.c.id, - list_events_table.c.name, - list_events_table.c.title, - list_events_table.c.creator_id, - list_events_table.c.created_at, - list_events_table.c.schema, - list_events_table.c.state, - list_events_table.c.status, - ) - .order_by(list_events_table.c.id) - .offset(first_event) - .limit(limit) - ) - - count_query = select(func.count()).select_from(list_events_table) - - result = await connection.execute(query) - count_result = await connection.execute(count_query) - - events_data = result.mappings().all() - total_count = count_result.scalar() - total_pages = math.ceil(total_count / limit) - - # Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные - validated_list_event = all_list_event_adapter.validate_python(events_data) - - return AllListEventResponse( - list_event=validated_list_event, - amount_count=total_count, - amount_pages=total_pages, - current_page=page, - limit=limit, - ) - - async def get_list_events_by_name(connection: AsyncConnection, name: str) -> Optional[ListEvent]: """ Получает list events по name. diff --git a/api/api/db/logic/node_link.py b/api/api/db/logic/node_link.py index 85a5f0a..ca8d9b3 100644 --- a/api/api/db/logic/node_link.py +++ b/api/api/db/logic/node_link.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, List from datetime import datetime, timezone @@ -79,3 +79,20 @@ async def create_node_link_schema( await connection.commit() return await get_last_node_link_by_creator_and_ps_id(connection, creator_id, validated_link_schema.from_id) + + +async def get_all_node_links_by_next_node_ids(connection: AsyncConnection, next_node_ids: List[int]) -> List[NodeLink]: + """ + Получает все активные node_link для списка next_node_id одним запросом. + """ + if not next_node_ids: + return [] + + query = select(node_link_table).where( + node_link_table.c.next_node_id.in_(next_node_ids), node_link_table.c.status == NodeLinkStatus.ACTIVE.value + ) + + node_link_db_cursor = await connection.execute(query) + node_links_data = node_link_db_cursor.mappings().all() + + return [NodeLink.model_validate(link_data) for link_data in node_links_data] diff --git a/api/api/db/logic/process_schema.py b/api/api/db/logic/process_schema.py index c953eb8..ec097e1 100644 --- a/api/api/db/logic/process_schema.py +++ b/api/api/db/logic/process_schema.py @@ -141,51 +141,6 @@ async def update_process_schema_by_id(connection: AsyncConnection, update_values await connection.commit() -async def update_process_schema_settings_by_id( - connection: AsyncConnection, process_schema_id: int, node_data: Dict[str, Any] -): - """ - Добавляет новый узел в массив 'nodes' в настройках процесса. - Если массив 'nodes' не существует, создает его. - """ - # Получаем текущие settings - query = select(process_schema_table.c.settings).where(process_schema_table.c.id == process_schema_id) - result = await connection.execute(query) - current_settings = result.scalar_one_or_none() - - # Если settings пустые, создаем пустой словарь - if current_settings is None: - current_settings = {} - - # Инициализируем массив nodes, если его нет - if "nodes" not in current_settings: - current_settings["nodes"] = [] - - # Добавляем новый узел в массив - current_settings["nodes"].append(node_data) - - # Обновляем поле settings - await connection.execute( - process_schema_table.update() - .where(process_schema_table.c.id == process_schema_id) - .values(settings=current_settings) - ) - - await connection.commit() - - -async def get_last_created_process_schema(connection: AsyncConnection) -> Optional[int]: - """ - Получает ID последней созданной схемы процесса. - """ - query = select(process_schema_table.c.id).order_by(desc(process_schema_table.c.id)).limit(1) - - result = await connection.execute(query) - last_id = result.scalar_one_or_none() - - return last_id - - async def create_process_schema( connection: AsyncConnection, creator_id: int, title: str, description: str ) -> Optional[int]: diff --git a/api/api/db/logic/ps_node.py b/api/api/db/logic/ps_node.py index ad7389c..c77b860 100644 --- a/api/api/db/logic/ps_node.py +++ b/api/api/db/logic/ps_node.py @@ -1,4 +1,4 @@ -from typing import Optional, List +from typing import Optional, List, Dict, Any from datetime import datetime, timezone @@ -8,7 +8,6 @@ from sqlalchemy.ext.asyncio import AsyncConnection from orm.tables.process import ps_node_table, node_link_table, process_schema_table from api.schemas.process.ps_node import Ps_Node -from model_nodes.node_listen_models import ListenNodeCoreSchema from orm.tables.process import NodeStatus @@ -49,18 +48,34 @@ async def get_last_ps_node_by_creator_and_ps_id( return Ps_Node.model_validate(ps_node_data) +async def get_all_ps_nodes_by_ps_id(connection: AsyncConnection, ps_id: int) -> List[Ps_Node]: + """ + Получает все активные ps_node для данной process_schema. + """ + query = select(ps_node_table).where( + ps_node_table.c.ps_id == ps_id, ps_node_table.c.status == NodeStatus.ACTIVE.value + ) + + ps_node_db_cursor = await connection.execute(query) + ps_nodes_data = ps_node_db_cursor.mappings().all() + + return [Ps_Node.model_validate(node_data) for node_data in ps_nodes_data] + + async def create_ps_node_schema( connection: AsyncConnection, validated_schema, + node_descriptor, creator_id: int, -) -> Optional[ListenNodeCoreSchema]: + settings_payload: Optional[Dict[str, Any]] = None, +) -> Optional[Ps_Node]: """ Создает нове поле в таблице process_schema_table. """ query = insert(ps_node_table).values( ps_id=validated_schema.ps_id, node_type=validated_schema.node_type, - settings=validated_schema.data.model_dump(), + settings=settings_payload if settings_payload is not None else node_descriptor.model_dump(), creator_id=creator_id, created_at=datetime.now(timezone.utc), status=NodeStatus.ACTIVE.value, diff --git a/api/api/endpoints/process_schema.py b/api/api/endpoints/process_schema.py index 1902540..741b217 100644 --- a/api/api/endpoints/process_schema.py +++ b/api/api/endpoints/process_schema.py @@ -75,16 +75,16 @@ async def get_all_process_schema_endpoint( return to_camel_dict(process_schema_page.model_dump()) -@api_router.get("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema) +@api_router.get("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchemaResponse) async def get_process_schema_endpoint( process_schema_id: int, connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): service = ProcessSchemaService(connection) - process_schema_validation = await service.get(process_schema_id) + process_schema_response = await service.get(process_schema_id) - if process_schema_validation is None: + if process_schema_response is None: raise create_operation_error( message="Process schema not found", status_code=status.HTTP_404_NOT_FOUND, @@ -92,16 +92,9 @@ async def get_process_schema_endpoint( ) validator = UserRoleValidator(connection) - await validator.validate_ownership(current_user, process_schema_validation.creator_id) + await validator.validate_ownership(current_user, process_schema_response.process_schema.creator_id) - if process_schema_id is None: - raise create_operation_error( - message="Process schema not found", - status_code=status.HTTP_404_NOT_FOUND, - details={"process_schema_id": process_schema_id}, - ) - - return to_camel_dict(process_schema_validation.model_dump()) + return to_camel_dict(process_schema_response.model_dump()) @api_router.post("", dependencies=[Depends(bearer_schema)], response_model=ProcessSchemaResponse) @@ -125,9 +118,9 @@ async def update_process_schema_endpoint( current_user=Depends(get_current_user), ): service = ProcessSchemaService(connection) - process_schema_validation = await service.get(process_schema_id) + process_schema_response = await service.get(process_schema_id) - if process_schema_validation is None: + if process_schema_response is None: raise create_operation_error( message="Process schema not found", status_code=status.HTTP_404_NOT_FOUND, @@ -135,14 +128,14 @@ async def update_process_schema_endpoint( ) validator = UserRoleValidator(connection) - await validator.validate_ownership(current_user, process_schema_validation.creator_id) + await validator.validate_ownership(current_user, process_schema_response.process_schema.creator_id) updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True) if not updated_values: - return process_schema_validation + return process_schema_response.process_schema - process_schema = await service.update(process_schema_id, updated_values, process_schema_validation) + process_schema = await service.update(process_schema_id, updated_values, process_schema_response.process_schema) return process_schema @@ -154,9 +147,9 @@ async def delete_process_schema_endpoint( current_user=Depends(get_current_user), ): service = ProcessSchemaService(connection) - process_schema_validation = await service.get(process_schema_id) + process_schema_response = await service.get(process_schema_id) - if process_schema_validation is None: + if process_schema_response is None: raise create_operation_error( message="Process schema not found", status_code=status.HTTP_404_NOT_FOUND, @@ -164,8 +157,8 @@ async def delete_process_schema_endpoint( ) validator = UserRoleValidator(connection) - await validator.validate_ownership(current_user, process_schema_validation.creator_id) + await validator.validate_ownership(current_user, process_schema_response.process_schema.creator_id) - await service.delete(process_schema_id, process_schema_validation) + await service.delete(process_schema_id, process_schema_response.process_schema) return HTTPException(status_code=status.HTTP_200_OK, detail="Process schema deleted successfully") diff --git a/api/api/endpoints/ps_node.py b/api/api/endpoints/ps_node.py index 815acfb..4f8fb2a 100644 --- a/api/api/endpoints/ps_node.py +++ b/api/api/endpoints/ps_node.py @@ -61,20 +61,20 @@ async def delete_ps_node_endpoint( details={"next_node_id": ps_node_delete_data.next_node_id}, ) - is_connected = await check_node_connection( - connection, ps_node_delete_data.node_id, ps_node_delete_data.next_node_id, int(ps_node_delete_data.port) - ) + # is_connected = await check_node_connection( + # connection, ps_node_delete_data.node_id, ps_node_delete_data.next_node_id, int(ps_node_delete_data.port) + # ) - if not is_connected: - raise create_validation_error( - message="Node connection validation failed", - status_code=status.HTTP_400_BAD_REQUEST, - details={ - "node_id": ps_node_delete_data.node_id, - "next_node_id": ps_node_delete_data.next_node_id, - "port": ps_node_delete_data.port, - }, - ) + # if not is_connected: + # raise create_validation_error( + # message="Node connection validation failed", + # status_code=status.HTTP_400_BAD_REQUEST, + # details={ + # "node_id": ps_node_delete_data.node_id, + # "next_node_id": ps_node_delete_data.next_node_id, + # "port": ps_node_delete_data.port, + # }, + # ) service = PsNodeService(connection) try: @@ -153,21 +153,21 @@ async def create_ps_node_endpoint( details={"parent_id": parent_id, "expected_ps_id": target_ps_id, "actual_ps_id": parent_node.ps_id}, ) - parent_port_number = node_instance_validated.parent_port_number + # parent_port_number = node_instance_validated.parent_port_number - parent_settings = parent_node.settings or {} - available_port_numbers = [] + # parent_settings = parent_node.settings or {} + # available_port_numbers = [] - for key, value in parent_settings.items(): - if "port" in key.lower() and isinstance(value, int): - available_port_numbers.append(value) + # for key, value in parent_settings.items(): + # if "port" in key.lower() and isinstance(value, int): + # available_port_numbers.append(value) - if parent_port_number not in available_port_numbers: - raise create_validation_error( - message="Parent port number is invalid", - status_code=status.HTTP_400_BAD_REQUEST, - details={"parent_id": parent_id, "parent_settings": parent_settings}, - ) + # if parent_port_number not in available_port_numbers: + # raise create_validation_error( + # message="Parent port number is invalid", + # status_code=status.HTTP_400_BAD_REQUEST, + # details={"parent_id": parent_id, "parent_settings": parent_settings}, + # ) service = PsNodeService(connection) try: diff --git a/api/api/schemas/process/process_schema.py b/api/api/schemas/process/process_schema.py index 9bc7721..65254fe 100644 --- a/api/api/schemas/process/process_schema.py +++ b/api/api/schemas/process/process_schema.py @@ -1,7 +1,7 @@ from datetime import datetime -from typing import Any, Dict, Optional, List +from typing import Any, Dict, List -from orm.tables.process import ProcessStatus, NodeType +from orm.tables.process import ProcessStatus from pydantic import Field from api.schemas.base import Base @@ -19,22 +19,6 @@ class ProcessSchema(Base): status: ProcessStatus -class ProcessSchemaSettingsNodeLink(Base): - id: int - link_name: str - parent_port_number: int - from_id: int - to_id: int - - -class ProcessSchemaSettingsNode(Base): - id: int - node_type: NodeType - from_node: Optional[Dict[str, Any]] = None - data: Dict[str, Any] # Переименовано с 'from' на 'from_node' - links: Optional[List[Dict[str, Any]]] = None - - class ProcessSchemaResponse(Base): process_schema: ProcessSchema - node_listen: Ps_NodeFrontResponse + nodes: List[Ps_NodeFrontResponse] diff --git a/api/api/schemas/process/process_version_archive.py b/api/api/schemas/process/process_version_archive.py deleted file mode 100644 index 38d369e..0000000 --- a/api/api/schemas/process/process_version_archive.py +++ /dev/null @@ -1,13 +0,0 @@ -from datetime import datetime -from typing import Any, Dict - -from api.schemas.base import Base - - -class ProcessStatusSchema(Base): - id: int - version: int - snapshot: Dict[str, Any] - owner_id: int - created_at: datetime - is_last: int diff --git a/api/api/schemas/process/ps_node.py b/api/api/schemas/process/ps_node.py index 49d7bb3..0722d9a 100644 --- a/api/api/schemas/process/ps_node.py +++ b/api/api/schemas/process/ps_node.py @@ -28,21 +28,6 @@ class Ps_Node(Base): status: NodeStatus -class Ps_NodeFrontResponseLink(Base): - id: int - link_name: str - parent_port_number: int - from_id: int - to_id: int - - -class Ps_NodeFrontResponseNode(Base): - id: int - node_type: NodeType - data: Dict[str, Any] # Переименовано с 'from' на 'from_node' - - class Ps_NodeFrontResponse(Base): - description: Optional[Dict[str, Any]] = None - node: Optional[Ps_NodeFrontResponseNode] = None - links: Optional[List[Dict[str, Any]]] = None + node: Optional[Dict[str, Any]] = None + link: Optional[List[Dict[str, Any]]] = None diff --git a/api/api/services/endpoints/process_schema.py b/api/api/services/endpoints/process_schema.py index 244693e..749ebaa 100644 --- a/api/api/services/endpoints/process_schema.py +++ b/api/api/services/endpoints/process_schema.py @@ -7,12 +7,12 @@ from api.db.logic.process_schema import ( get_process_schema_page_DTO, update_process_schema_by_id, create_process_schema, - update_process_schema_settings_by_id, ) -from api.db.logic.ps_node import create_ps_node_schema -from api.schemas.process.process_schema import ProcessSchema, ProcessSchemaSettingsNode +from api.db.logic.ps_node import create_ps_node_schema, get_all_ps_nodes_by_ps_id +from api.db.logic.node_link import get_all_node_links_by_next_node_ids +from api.schemas.process.process_schema import ProcessSchema, ProcessSchemaResponse from api.schemas.endpoints.process_schema import AllProcessSchemaResponse, ProcessSchemaFilterDTO, ProcessSchemaUpdate -from api.schemas.process.ps_node import Ps_NodeFrontResponse, Ps_NodeFrontResponseNode +from api.schemas.process.ps_node import Ps_NodeFrontResponse from orm.tables.process import NodeType from core import VorkNodeRegistry from model_nodes import ListenNodeData @@ -31,11 +31,40 @@ class ProcessSchemaService: """ return await get_process_schema_page_DTO(self.connection, filter_dto) - async def get(self, process_schema_id: int) -> Optional[ProcessSchema]: + async def get(self, process_schema_id: int) -> Optional[ProcessSchemaResponse]: """ - Получает схему процесса по ID. + Получает схему процесса по ID со всеми нодами и линками. """ - return await get_process_schema_by_id(self.connection, process_schema_id) + process_schema = await get_process_schema_by_id(self.connection, process_schema_id) + if process_schema is None: + return None + + nodes = await get_all_ps_nodes_by_ps_id(self.connection, process_schema_id) + + node_ids = [node.id for node in nodes] + all_links = await get_all_node_links_by_next_node_ids(self.connection, node_ids) + + links_by_node_id = {} + for link in all_links: + if link.next_node_id not in links_by_node_id: + links_by_node_id[link.next_node_id] = [] + links_by_node_id[link.next_node_id].append(link) + + nodes_response = [] + for node in nodes: + node_links = links_by_node_id.get(node.id, []) + links_list = [{"link": link.model_dump()} for link in node_links] + + ps_node_front_response = Ps_NodeFrontResponse( + node=node.model_dump(), + link=links_list, + ) + nodes_response.append(ps_node_front_response) + + return ProcessSchemaResponse( + process_schema=process_schema, + nodes=nodes_response, + ) async def create(self, creator_id: int) -> Dict[str, Any]: """ @@ -63,35 +92,27 @@ class ProcessSchemaService: validated_start_schema = start_node.validate() - db_start_schema = await create_ps_node_schema(self.connection, validated_start_schema, creator_id) - - node = ProcessSchemaSettingsNode( - id=db_start_schema.id, - node_type=NodeType.LISTEN.value, - data=validated_start_schema.data.model_dump(), - from_node=None, - links=None, - ) - - settings_dict = {"node": node.model_dump(mode="json")} - - await update_process_schema_settings_by_id(self.connection, process_schema_new.id, settings_dict) - - process_schema_new = await get_process_schema_by_id(self.connection, node_id) - - ps_node_front_response = Ps_NodeFrontResponse( - description=node_descriptor.model_dump(), - node=Ps_NodeFrontResponseNode( - id=db_start_schema.id, node_type=NodeType.LISTEN.value, data=validated_start_schema.data.model_dump() - ), - link=None, - ) - - response_data = { - "process_schema": process_schema_new.model_dump(), - "node_listen": ps_node_front_response.model_dump(), + start_settings_payload = { + **node_descriptor.model_dump(), + **validated_start_schema.data.model_dump(), } + db_start_schema = await create_ps_node_schema( + self.connection, + validated_start_schema, + node_descriptor, + creator_id, + start_settings_payload, + ) + + ps_node_front_response = Ps_NodeFrontResponse( + node=db_start_schema.model_dump(), + link=[], + ) + response_data = { + "process_schema": process_schema_new.model_dump(), + "nodes": [ps_node_front_response], # Список объектов, а не словарей + } return response_data async def update(self, process_schema_id: int, update_data: dict, process_schema: ProcessSchema) -> ProcessSchema: diff --git a/api/api/services/endpoints/ps_node.py b/api/api/services/endpoints/ps_node.py index 5711f9d..260afb7 100644 --- a/api/api/services/endpoints/ps_node.py +++ b/api/api/services/endpoints/ps_node.py @@ -7,12 +7,9 @@ from api.db.logic.ps_node import ( create_ps_node_schema, ) from api.db.logic.node_link import get_last_link_name_by_node_id, create_node_link_schema -from api.db.logic.process_schema import update_process_schema_settings_by_id -from api.schemas.process.process_schema import ProcessSchemaSettingsNodeLink, ProcessSchemaSettingsNode -from api.schemas.process.ps_node import Ps_NodeFrontResponse, Ps_NodeFrontResponseNode +from api.schemas.process.ps_node import Ps_NodeFrontResponse from core import VorkNodeRegistry, VorkNodeLink from model_nodes import VorkNodeLinkData -from api.utils.to_camel_dict import to_camel_dict class PsNodeService: @@ -45,7 +42,7 @@ class PsNodeService: node_instance = vork_node(data=ps_node_data, links=links) node_instance_validated = node_instance.validate() - db_ps_node = await create_ps_node_schema(self.connection, node_instance_validated, creator_id) + db_ps_node = await create_ps_node_schema(self.connection, node_instance_validated, node_descriptor, creator_id) link_name = await get_last_link_name_by_node_id(self.connection, db_ps_node.ps_id) link_data = VorkNodeLinkData( @@ -60,33 +57,9 @@ class PsNodeService: db_node_link = await create_node_link_schema(self.connection, validated_link, creator_id) - links_settings = ProcessSchemaSettingsNodeLink( - id=db_node_link.id, - link_name=db_node_link.link_name, - parent_port_number=db_node_link.link_point_id, - from_id=db_node_link.node_id, - to_id=db_node_link.next_node_id, - ) - - node_settings = ProcessSchemaSettingsNode( - id=db_ps_node.id, - node_type=db_ps_node.node_type, - data=node_instance_validated.data.model_dump(), - from_node=None, - links=[{"links": links_settings.model_dump()}], - ) - - settings_dict = {"node": node_settings.model_dump(mode="json")} - await update_process_schema_settings_by_id(self.connection, db_ps_node.ps_id, settings_dict) - ps_node_front_response = Ps_NodeFrontResponse( - description=node_descriptor.model_dump(), - node=Ps_NodeFrontResponseNode( - id=db_ps_node.id, - node_type=db_ps_node.node_type, - data=to_camel_dict(node_instance_validated.data.model_dump()), - ), - links=[{"links": links_settings.model_dump()}], + node=db_ps_node.model_dump(), + link=[{"link": db_node_link.model_dump()}], ) return ps_node_front_response