feat: delete ps node #20

Open
ivan.dev wants to merge 6 commits from VORKOUT-29 into master
14 changed files with 619 additions and 121 deletions

View File

@@ -0,0 +1,52 @@
"""add_cascade_delete_to_node_link_foreign_keys
Revision ID: 80840e78631e
Revises: cc3b95f1f99d
Create Date: 2025-10-26 18:47:24.004327
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '80840e78631e'
down_revision: Union[str, None] = 'cc3b95f1f99d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# Drop existing foreign key constraints
# Note: These constraint names are MySQL auto-generated names
# If they differ, check with: SHOW CREATE TABLE node_link;
op.drop_constraint('node_link_ibfk_2', 'node_link', type_='foreignkey') # next_node_id
op.drop_constraint('node_link_ibfk_3', 'node_link', type_='foreignkey') # node_id
# Add new foreign key constraints with CASCADE
op.create_foreign_key(
'fk_node_link_next_node_id_cascade',
'node_link', 'ps_node',
['next_node_id'], ['id'],
ondelete='CASCADE'
)
op.create_foreign_key(
'fk_node_link_node_id_cascade',
'node_link', 'ps_node',
['node_id'], ['id'],
ondelete='CASCADE'
)
def downgrade() -> None:
"""Downgrade schema."""
# Drop CASCADE foreign key constraints
op.drop_constraint('fk_node_link_next_node_id_cascade', 'node_link', type_='foreignkey')
op.drop_constraint('fk_node_link_node_id_cascade', 'node_link', type_='foreignkey')
# Restore original foreign key constraints without CASCADE
op.create_foreign_key('node_link_ibfk_2', 'node_link', 'ps_node', ['next_node_id'], ['id'])
op.create_foreign_key('node_link_ibfk_3', 'node_link', 'ps_node', ['node_id'], ['id'])

View File

@@ -114,6 +114,7 @@ async def get_process_schema_page_DTO(
limit=limit,
)
async def get_process_schema_by_id(connection: AsyncConnection, id: int) -> Optional[ProcessSchema]:
"""
Получает process_schema по id.

View File

@@ -1,15 +1,15 @@
from typing import Optional
from typing import Optional, List
from datetime import datetime, timezone
from sqlalchemy import insert, select, desc
from sqlalchemy import insert, select, desc, and_, delete, update
from sqlalchemy.ext.asyncio import AsyncConnection
from orm.tables.process import ps_node_table
from orm.tables.process import ps_node_table, node_link_table, process_schema_table
from api.schemas.process.ps_node import Ps_Node
from model_nodes.node_listen_models import ListenNodeCoreSchema
from orm.tables.process import NodeStatus, NodeType
from orm.tables.process import NodeStatus
async def get_ps_node_by_id(connection: AsyncConnection, id: int) -> Optional[Ps_Node]:
@@ -27,21 +27,6 @@ async def get_ps_node_by_id(connection: AsyncConnection, id: int) -> Optional[Ps
return Ps_Node.model_validate(ps_node_data)
async def get_ps_node_by_type_and_ps_id(connection: AsyncConnection, node_type: str, ps_id: int) -> Optional[Ps_Node]:
"""
Получает ps_node по node_type и ps_id.
"""
query = select(ps_node_table).where(ps_node_table.c.node_type == node_type, ps_node_table.c.ps_id == ps_id)
ps_node_db_cursor = await connection.execute(query)
ps_node_data = ps_node_db_cursor.mappings().one_or_none()
if not ps_node_data:
return None
return Ps_Node.model_validate(ps_node_data)
async def get_last_ps_node_by_creator_and_ps_id(
connection: AsyncConnection, creator_id: int, ps_id: int
) -> Optional[Ps_Node]:
@@ -86,3 +71,121 @@ async def create_ps_node_schema(
await connection.commit()
return await get_last_ps_node_by_creator_and_ps_id(connection, creator_id, validated_schema.ps_id)
async def check_node_connection(connection: AsyncConnection, node_id: int, next_node_id: int, port: int) -> bool:
"""
Проверяет, подключен ли next_node_id к node_id через указанный порт.
"""
query = select(node_link_table).where(
and_(
node_link_table.c.node_id == node_id,
node_link_table.c.next_node_id == next_node_id,
node_link_table.c.link_point_id == port,
)
)
result = await connection.execute(query)
return result.mappings().first() is not None
async def get_nodes_for_deletion_ordered(connection: AsyncConnection, node_id: int) -> List[int]:
"""
Рекурсивно находит ВСЕ дочерние узлы и возвращает их ID в правильном порядке:
от самых глубоких к корневым.
"""
all_child_nodes = []
visited_nodes = set()
# тут надо будет поработать с зацикливанием на вышестояющую ноду, сейчас вышестоящая нода если она уже была учтена, не будет занесена в спиок на удаление.
async def find_children_with_depth(current_node_id: int, current_depth: int):
if current_node_id in visited_nodes:
return
visited_nodes.add(current_node_id)
query = (
select(ps_node_table)
ivan.dev marked this conversation as resolved
Review

У меня вопрос - а насколько много запросов тут может проходить?
Это я отсылаю к проблеме N + 1.

У меня вопрос - а насколько много запросов тут может проходить? Это я отсылаю к проблеме N + 1.
Review

Особой нагрузки тут не предвидится, как минимум в разрезе редактирования тела схемы процесса. Речь про максимум пару десятков узлов, в исключительных случаях может быть побольше.

Особой нагрузки тут не предвидится, как минимум в разрезе редактирования тела схемы процесса. Речь про максимум пару десятков узлов, в исключительных случаях может быть побольше.
Review

Даже если речь идёт о паре десятков узлов, то это уже получается 20 похожих запросов подряд.

Даже если речь идёт о паре десятков узлов, то это уже получается 20 похожих запросов подряд.
Review

С этим трудно спорить.
В целом, если учитывать латентность сетевую и парсинг SQL, оно уложится в 0.1-0.3сек на подобных данных.

Но согласен, что это антипаттерн с т.з. работы с БД.

С этим трудно спорить. В целом, если учитывать латентность сетевую и парсинг SQL, оно уложится в 0.1-0.3сек на подобных данных. Но согласен, что это антипаттерн с т.з. работы с БД.
.join(node_link_table, ps_node_table.c.id == node_link_table.c.next_node_id)
.where(node_link_table.c.node_id == current_node_id)
)
result = await connection.execute(query)
child_nodes = result.mappings().all()
for node_data in child_nodes:
node = Ps_Node.model_validate(node_data)
all_child_nodes.append((node, current_depth + 1))
await find_children_with_depth(node.id, current_depth + 1)
await find_children_with_depth(node_id, 0)
all_child_nodes.sort(key=lambda x: x[1], reverse=True)
ordered_node_ids = [node.id for node, depth in all_child_nodes]
ordered_node_ids.append(node_id)
return ordered_node_ids
async def delete_ps_nodes_delete_handler(connection: AsyncConnection, node_ids: List[int]) -> List[int]:
"""
Очищает settings и удаляет ноды для каждого ps_id.
Возвращает список успешно удаленных ID нод.
"""
if not node_ids:
return []
ps_id_rows = await connection.execute(
select(ps_node_table.c.id, ps_node_table.c.ps_id).where(ps_node_table.c.id.in_(node_ids))
)
rows = ps_id_rows.mappings().all()
if not rows:
return []
ps_to_node_ids = {}
for r in rows:
ps_to_node_ids.setdefault(r["ps_id"], []).append(r["id"])
deleted_all = []
for ps_id, ids_in_ps in ps_to_node_ids.items():
await remove_nodes_from_process_schema_settings(connection, ps_id, ids_in_ps)
result = await connection.execute(delete(ps_node_table).where(ps_node_table.c.id.in_(ids_in_ps)))
if result.rowcount and result.rowcount > 0:
deleted_all.extend(ids_in_ps)
else:
raise Exception(f"Failed to delete nodes for ps_id={ps_id}")
await connection.commit()
return deleted_all
async def remove_nodes_from_process_schema_settings(connection: AsyncConnection, ps_id: int, node_ids: List[int]):
"""
Удаляет ноды из поля settings в таблице process_schema по списку node_ids.
"""
from api.db.logic.process_schema import get_process_schema_by_id
process_schema = await get_process_schema_by_id(connection, ps_id)
if not process_schema or not process_schema.settings:
return
settings = process_schema.settings
if "nodes" in settings and isinstance(settings["nodes"], list):
node_ids_set = set(node_ids)
settings["nodes"] = [
node_item
for node_item in settings["nodes"]
if not (
isinstance(node_item, dict)
and "node" in node_item
and isinstance(node_item["node"], dict)
and node_item["node"].get("id") in node_ids_set
)
ivan.dev marked this conversation as resolved
Review

Ну и здесь на каждый id будет по несколько отдельных запросов. Неэффективно.

Ну и здесь на каждый id будет по несколько отдельных запросов. Неэффективно.
]
await connection.execute(
update(process_schema_table).where(process_schema_table.c.id == ps_id).values(settings=settings)
)

View File

@@ -17,7 +17,7 @@ from api.schemas.account.account import User
from api.schemas.base import bearer_schema
from api.schemas.endpoints.account import AllUserResponse, UserCreate, UserFilterDTO, UserUpdate
from api.services.auth import get_current_user
from api.services.user_role_validation import db_user_role_validation
from api.services.user_role_validation import UserRoleValidator
api_router = APIRouter(
prefix="/account",
@@ -38,7 +38,8 @@ async def get_all_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
filters = {
**({"status": status_filter} if status_filter else {}),
@@ -67,7 +68,8 @@ async def get_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
user = await get_user_by_id(connection, user_id)
@@ -83,7 +85,8 @@ async def create_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
user_validation = await get_user_by_login(connection, user.login)
@@ -104,7 +107,8 @@ async def update_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
user = await get_user_by_id(connection, user_id)
if user is None:
@@ -131,7 +135,8 @@ async def delete_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
user = await get_user_by_id(connection, user_id)
if user is None:

View File

@@ -13,7 +13,7 @@ from api.schemas.account.account_keyring import AccountKeyring
from api.schemas.base import bearer_schema
from api.schemas.endpoints.account_keyring import AccountKeyringUpdate
from api.services.auth import get_current_user
from api.services.user_role_validation import db_user_role_validation
from api.services.user_role_validation import UserRoleValidator
api_router = APIRouter(
prefix="/keyring",
@@ -25,7 +25,8 @@ api_router = APIRouter(
async def get_keyring_endpoint(
key_id: str, connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user)
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
keyring = await get_key_by_id(connection, key_id)
@@ -43,7 +44,8 @@ async def create_keyring_endpoint(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
keyring = await get_key_by_id(connection, key_id)
@@ -69,7 +71,8 @@ async def update_keyring_endpoint(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
keyring = await get_key_by_id(connection, key_id)
if keyring is None:
@@ -94,7 +97,8 @@ async def delete_keyring_endpoint(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_admin(current_user)
keyring = await get_key_by_id(connection, key_id)
if keyring is None:

View File

@@ -17,10 +17,7 @@ from api.schemas.base import bearer_schema
from api.schemas.endpoints.list_events import AllListEventResponse, ListEventFilterDTO, ListEventUpdate
from api.schemas.events.list_events import ListEvent
from api.services.auth import get_current_user
from api.services.user_role_validation import (
db_user_role_validation_for_list_events_and_process_schema,
db_user_role_validation_for_list_events_and_process_schema_by_list_event_id,
)
from api.services.user_role_validation import UserRoleValidator
api_router = APIRouter(
prefix="/list_events",
@@ -54,9 +51,8 @@ async def get_all_list_events_endpoint(
filters=filters if filters else None,
)
authorize_user, page_flag = await db_user_role_validation_for_list_events_and_process_schema(
connection, current_user
)
validator = UserRoleValidator(connection)
authorize_user, page_flag = await validator.get_user(current_user)
if not page_flag:
if filter_dto.filters is None:
@@ -82,9 +78,8 @@ async def get_list_events_endpoint(
if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, list_events_validation.creator_id
)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id)
if list_events_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
@@ -124,9 +119,8 @@ async def update_list_events(
if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, list_events_validation.creator_id
)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id)
updated_values = list_events_update.model_dump(by_alias=True, exclude_none=True)
@@ -151,9 +145,8 @@ async def delete_list_events_endpoint(
if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, list_events_validation.creator_id
)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id)
list_events_update = ListEventUpdate(status=EventStatus.DELETED.value)

View File

@@ -18,10 +18,7 @@ from api.schemas.process.process_schema import ProcessSchema, ProcessSchemaSetti
from api.schemas.process.ps_node import Ps_NodeFrontResponseNode
from api.schemas.process.ps_node import Ps_NodeFrontResponse
from api.services.auth import get_current_user
from api.services.user_role_validation import (
db_user_role_validation_for_list_events_and_process_schema,
db_user_role_validation_for_list_events_and_process_schema_by_list_event_id,
)
from api.services.user_role_validation import UserRoleValidator
from api.db.logic.ps_node import create_ps_node_schema
@@ -78,9 +75,8 @@ async def get_all_process_schema_endpoint(
filters=filters if filters else None,
)
authorize_user, page_flag = await db_user_role_validation_for_list_events_and_process_schema(
connection, current_user
)
validator = UserRoleValidator(connection)
authorize_user, page_flag = await validator.get_user(current_user)
if not page_flag:
if filter_dto.filters is None:
@@ -106,9 +102,8 @@ async def get_process_schema_endpoint(
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, process_schema_validation.creator_id
)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id)
if process_schema_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
@@ -146,8 +141,6 @@ async def create_processschema_endpoint(
validated_start_schema = start_node.validate()
print(validated_start_schema)
db_start_schema = await create_ps_node_schema(connection, validated_start_schema, user_validation.id)
node = ProcessSchemaSettingsNode(
@@ -192,9 +185,8 @@ async def update_process_schema_endpoint(
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, process_schema_validation.creator_id
)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id)
updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True)
@@ -219,9 +211,8 @@ async def delete_process_schema_endpoint(
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, process_schema_validation.creator_id
)
validator = UserRoleValidator(connection)
authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id)
process_schema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value)

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, status
from sqlalchemy.ext.asyncio import AsyncConnection
@@ -7,20 +7,28 @@ from api.db.logic.account import get_user_by_login
from api.schemas.base import bearer_schema
from api.schemas.process.process_schema import ProcessSchemaSettingsNodeLink, ProcessSchemaSettingsNode
from api.schemas.process.ps_node import Ps_NodeFrontResponseNode, Ps_NodeRequest
from api.schemas.process.ps_node import Ps_NodeFrontResponseNode, Ps_NodeRequest, Ps_NodeDeleteRequest
from api.schemas.process.ps_node import Ps_NodeFrontResponse
from api.services.auth import get_current_user
from api.db.logic.ps_node import create_ps_node_schema
from api.db.logic.ps_node import (
create_ps_node_schema,
get_ps_node_by_id,
check_node_connection,
get_nodes_for_deletion_ordered,
delete_ps_nodes_delete_handler,
)
from api.db.logic.node_link import get_last_link_name_by_node_id, create_node_link_schema
from api.db.logic.process_schema import update_process_schema_settings_by_id
from api.db.logic.process_schema import update_process_schema_settings_by_id, get_process_schema_by_id
from api.services.user_role_validation import UserRoleValidator
from core import VorkNodeRegistry, VorkNodeLink
from model_nodes import VorkNodeLinkData
from api.utils.to_camel_dict import to_camel_dict
from api.error import create_operation_error, create_access_error, create_validation_error, create_server_error
api_router = APIRouter(
@@ -29,6 +37,77 @@ api_router = APIRouter(
)
@api_router.delete("", dependencies=[Depends(bearer_schema)], status_code=status.HTTP_200_OK)
async def delete_ps_node_endpoint(
ps_node_delete_data: Ps_NodeDeleteRequest,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
process_schema = await get_process_schema_by_id(connection, ps_node_delete_data.schema_id)
if process_schema is None:
raise create_operation_error(
message="Process schema not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"schema_id": ps_node_delete_data.schema_id},
)
validator = UserRoleValidator(connection)
try:
await validator.validate_ownership(current_user, process_schema.creator_id)
except Exception as e:
ivan.dev marked this conversation as resolved
Review

Что за любовь к длиннейшим названиям?)

Что за любовь к длиннейшим названиям?)
Review

Это LLM, мне кажется )

Это LLM, мне кажется )
Review

В каком то из прошлых ПРов раз было замечание о неинформативности названий функций, мне переименовать?

В каком то из прошлых ПРов раз было замечание о неинформативности названий функций, мне переименовать?
Review

Это предмет крупного рефакторинга. В основном наблюдение говорит об избыточности семантики в названиях функций, т.к. при корректном подходе к проектированию (инкапсюляция) нет необходимости повторять в методе контекстную часть как минимум (т.е. если метод сидит в классе ps_node, то повторять это в названии скорее тавтология).

Парадигма ООП и частично ФП за сохранение читаемости в условиях умолчаний.

В большинстве случаев для соблюдения уникальности семантики названия метода и переменной должно хватать схемы Императив + Объект (save_file), Предикат + Императив (can_edit, is_valid()), в исключительных случаях (если это общая библиотека функций и ресурсов) можно использовать Функция/Объект верхнего уровня + Императив + Целевой Объект.

Всё - три слова и контекст должны составлять уникальную семантику. Если не получается - значит есть проблемы в композиции и стоит сначала решить их.

Это предмет крупного рефакторинга. В основном наблюдение говорит об избыточности семантики в названиях функций, т.к. при корректном подходе к проектированию (инкапсюляция) нет необходимости повторять в методе контекстную часть как минимум (т.е. если метод сидит в классе ps_node, то повторять это в названии скорее тавтология). Парадигма ООП и частично ФП за сохранение читаемости в условиях умолчаний. В большинстве случаев для соблюдения уникальности семантики названия метода и переменной должно хватать схемы Императив + Объект (save_file), Предикат + Императив (can_edit, is_valid()), в исключительных случаях (если это общая библиотека функций и ресурсов) можно использовать Функция/Объект верхнего уровня + Императив + Целевой Объект. Всё - три слова и контекст должны составлять уникальную семантику. Если не получается - значит есть проблемы в композиции и стоит сначала решить их.
Review

Понял

Понял
Review

Согласен. Семантика в названиях должна быть отображена, но это не значит, что для её выражения требуется по 10 слов, лишь главная суть для текущего контекста.

Согласен. Семантика в названиях должна быть отображена, но это не значит, что для её выражения требуется по 10 слов, лишь главная суть для текущего контекста.
raise create_access_error(
message="Access denied",
status_code=status.HTTP_403_FORBIDDEN,
details={"user_id": current_user, "schema_creator_id": process_schema.creator_id, "reason": str(e)},
)
ps_node = await get_ps_node_by_id(connection, ps_node_delete_data.node_id)
if ps_node is None:
raise create_operation_error(
message="PS node not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"node_id": ps_node_delete_data.node_id},
)
next_ps_node = await get_ps_node_by_id(connection, ps_node_delete_data.next_node_id)
if next_ps_node is None:
raise create_operation_error(
message="Next PS node not found",
status_code=status.HTTP_400_BAD_REQUEST,
details={"next_node_id": ps_node_delete_data.next_node_id},
)
is_connected = await check_node_connection(
connection, ps_node_delete_data.node_id, ps_node_delete_data.next_node_id, int(ps_node_delete_data.port)
)
if not is_connected:
raise create_validation_error(
message="Node connection validation failed",
status_code=status.HTTP_400_BAD_REQUEST,
details={
"node_id": ps_node_delete_data.node_id,
"next_node_id": ps_node_delete_data.next_node_id,
"port": ps_node_delete_data.port,
},
)
ordered_node_ids = await get_nodes_for_deletion_ordered(connection, ps_node_delete_data.next_node_id)
try:
deleted_node_ids = await delete_ps_nodes_delete_handler(connection, ordered_node_ids)
except Exception as e:
raise create_server_error(
message="Failed to delete nodes",
status_code=500,
details={"error": str(e), "ordered_node_ids": ordered_node_ids},
)
return {
"deleted_node_ids": deleted_node_ids,
}
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=Ps_NodeFrontResponse)
async def create_ps_node_endpoint(
ps_node: Ps_NodeRequest,
@@ -37,12 +116,34 @@ async def create_ps_node_endpoint(
):
user_validation = await get_user_by_login(connection, current_user)
process_schema = await get_process_schema_by_id(connection, ps_node.data["ps_id"])
if process_schema is None:
raise create_operation_error(
message="Process schema not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"schema_id": ps_node.data["ps_id"]},
)
validator = UserRoleValidator(connection)
try:
await validator.validate_ownership(current_user, process_schema.creator_id)
except Exception as e:
raise create_access_error(
message="Access denied",
status_code=status.HTTP_403_FORBIDDEN,
details={"user_id": current_user, "schema_creator_id": process_schema.creator_id, "reason": str(e)},
)
registery = VorkNodeRegistry()
vork_node = registery.get(ps_node.data["node_type"])
if vork_node is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Node not found")
raise create_operation_error(
message="Node type not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"node_type": ps_node.data["node_type"]},
)
node_descriptor = vork_node.form()
try:
@@ -51,43 +152,93 @@ async def create_ps_node_endpoint(
node_instance_validated = node_instance.validate()
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
raise create_validation_error(
message="Node validation failed",
status_code=status.HTTP_400_BAD_REQUEST,
details={"error": str(e)},
)
db_ps_node = await create_ps_node_schema(connection, node_instance_validated, user_validation.id)
link_name = await get_last_link_name_by_node_id(connection, db_ps_node.ps_id)
# Проверка: parent_id принадлежит тому же ps_id
parent_id = node_instance_validated.parent_id
target_ps_id = ps_node.data["ps_id"]
link_data = VorkNodeLinkData(
parent_port_number=node_instance_validated.parent_port_number,
to_id=db_ps_node.id,
from_id=node_instance_validated.parent_id,
last_link_name=link_name,
)
parent_node = await get_ps_node_by_id(connection, parent_id)
if parent_node is None:
raise create_operation_error(
message="Parent PS node not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"parent_id": parent_id},
)
if parent_node.ps_id != target_ps_id:
raise create_validation_error(
message="Parent PS node belongs to another process schema",
status_code=status.HTTP_400_BAD_REQUEST,
details={"parent_id": parent_id, "expected_ps_id": target_ps_id, "actual_ps_id": parent_node.ps_id},
)
link = VorkNodeLink(data=link_data.model_dump())
# Проверка: у родительской ноды есть указанный порт
parent_port_number = node_instance_validated.parent_port_number
validated_link = link.validate()
# Извлекаем номера портов из settings родительской ноды
parent_settings = parent_node.settings or {}
available_port_numbers = []
db_node_link = await create_node_link_schema(connection, validated_link, user_validation.id)
# Ищем все ключи, содержащие "port" в названии
for key, value in parent_settings.items():
if "port" in key.lower() and isinstance(value, int):
available_port_numbers.append(value)
links_settings = ProcessSchemaSettingsNodeLink(
id=db_node_link.id,
link_name=db_node_link.link_name,
parent_port_number=db_node_link.link_point_id,
from_id=db_node_link.node_id,
to_id=db_node_link.next_node_id,
)
# Проверяем, что указанный порт существует в settings
if parent_port_number not in available_port_numbers:
raise create_validation_error(
message="Parent port number is invalid",
status_code=status.HTTP_400_BAD_REQUEST,
details={"parent_id": parent_id, "parent_settings": parent_settings},
)
node_settings = ProcessSchemaSettingsNode(
id=db_ps_node.id,
node_type=db_ps_node.node_type,
data=node_instance_validated.data.model_dump(),
from_node=None,
links=[{"links": links_settings.model_dump()}],
)
try:
db_ps_node = await create_ps_node_schema(connection, node_instance_validated, user_validation.id)
link_name = await get_last_link_name_by_node_id(connection, db_ps_node.ps_id)
settings_dict = {"node": node_settings.model_dump(mode="json")}
link_data = VorkNodeLinkData(
parent_port_number=node_instance_validated.parent_port_number,
to_id=db_ps_node.id,
from_id=node_instance_validated.parent_id,
last_link_name=link_name,
)
await update_process_schema_settings_by_id(connection, db_ps_node.ps_id, settings_dict)
link = VorkNodeLink(data=link_data.model_dump())
validated_link = link.validate()
db_node_link = await create_node_link_schema(connection, validated_link, user_validation.id)
links_settings = ProcessSchemaSettingsNodeLink(
id=db_node_link.id,
link_name=db_node_link.link_name,
parent_port_number=db_node_link.link_point_id,
from_id=db_node_link.node_id,
to_id=db_node_link.next_node_id,
)
node_settings = ProcessSchemaSettingsNode(
id=db_ps_node.id,
node_type=db_ps_node.node_type,
data=node_instance_validated.data.model_dump(),
from_node=None,
links=[{"links": links_settings.model_dump()}],
)
settings_dict = {"node": node_settings.model_dump(mode="json")}
await update_process_schema_settings_by_id(connection, db_ps_node.ps_id, settings_dict)
except Exception as e:
raise create_server_error(
message="Failed to create node",
status_code=500,
details={"error": str(e)},
)
ps_node_front_response = Ps_NodeFrontResponse(
description=node_descriptor.model_dump(),

26
api/api/error/__init__.py Normal file
View File

@@ -0,0 +1,26 @@
"""
Модуль для обработки ошибок API.
"""
from .error_model.error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType
from .error_handlers import (
handle_api_error,
create_server_error,
create_access_error,
create_operation_error,
create_validation_error,
)
__all__ = [
"ServerError",
"AccessError",
"OperationError",
"ValidationError",
"ErrorType",
"handle_api_error",
"create_server_error",
"create_access_error",
"create_operation_error",
"create_validation_error",
]

View File

@@ -0,0 +1,54 @@
"""
Обработчики ошибок для API.
"""
from typing import Optional, Dict, Any
from fastapi import HTTPException
from .error_model.error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType
def handle_api_error(
error_type: ErrorType, message: str, status_code: int, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
"""
Функция для создания HTTPException с правильной структурой ошибки.
"""
match error_type:
case ErrorType.SERVER:
error = ServerError(message=message, details=details)
case ErrorType.ACCESS:
error = AccessError(message=message, details=details)
case ErrorType.OPERATION:
error = OperationError(message=message, details=details)
case ErrorType.VALIDATION:
error = ValidationError(message=message, details=details)
case _:
error = ServerError(message=message, details=details)
return HTTPException(status_code=status_code, detail=error.model_dump(mode="json"))
def create_server_error(
message: str, status_code: int = 500, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
return handle_api_error(error_type=ErrorType.SERVER, message=message, status_code=status_code, details=details)
def create_access_error(
message: str, status_code: int = 403, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
return handle_api_error(error_type=ErrorType.ACCESS, message=message, status_code=status_code, details=details)
def create_operation_error(
message: str, status_code: int = 400, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
return handle_api_error(error_type=ErrorType.OPERATION, message=message, status_code=status_code, details=details)
def create_validation_error(
message: str, status_code: int = 422, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
return handle_api_error(error_type=ErrorType.VALIDATION, message=message, status_code=status_code, details=details)

View File

@@ -0,0 +1,7 @@
"""
Модели ошибок для API.
"""
from .error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType
__all__ = ["ServerError", "AccessError", "OperationError", "ValidationError", "ErrorType"]

View File

@@ -0,0 +1,61 @@
"""
Типизированные модели ошибок для API.
"""
from enum import Enum
from typing import Optional, Dict, Any
from pydantic import BaseModel
class ErrorType(str, Enum):
"""
Типы ошибок API.
"""
SERVER = "SERVER"
ACCESS = "ACCESS"
OPERATION = "OPERATION"
VALIDATION = "VALIDATION"
class BaseError(BaseModel):
"""
Базовая модель ошибки.
"""
error_type: ErrorType
message: str
details: Optional[Dict[str, Any]] = None
class ServerError(BaseError):
"""
Критические серверные ошибки (БД, соединения и прочие неприятности).
"""
error_type: ErrorType = ErrorType.SERVER
class AccessError(BaseError):
"""
Ошибки доступа (несоответствие тенантности, ролям доступа).
"""
error_type: ErrorType = ErrorType.ACCESS
class OperationError(BaseError):
"""
Ошибки операции (несоответствие прохождению верификации, ошибки в датасете).
"""
error_type: ErrorType = ErrorType.OPERATION
class ValidationError(BaseError):
"""
Ошибки валидации (несоответствие первичной валидации).
"""
error_type: ErrorType = ErrorType.VALIDATION
field_errors: Optional[Dict[str, str]] = None

View File

@@ -6,6 +6,13 @@ from orm.tables.process import NodeStatus, NodeType
from api.schemas.base import Base
class Ps_NodeDeleteRequest(Base):
schema_id: int
node_id: int
port: str
next_node_id: int
class Ps_NodeRequest(Base):
data: Dict[str, Any]
links: Dict[str, Any]

View File

@@ -1,32 +1,75 @@
from fastapi import (
HTTPException,
status,
)
from fastapi import status
from orm.tables.account import AccountRole
from api.db.logic.account import get_user_by_login
from api.error import create_operation_error, create_access_error
async def db_user_role_validation(connection, current_user):
authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions")
return authorize_user
class UserRoleValidator:
"""Валидатор ролей пользователей"""
def __init__(self, connection):
self.connection = connection
async def db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, current_listevents_creator_id
):
authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
if authorize_user.id != current_listevents_creator_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions")
return authorize_user
async def validate_admin(self, current_user: int):
"""Проверяет права администратора или владельца"""
try:
authorize_user = await get_user_by_login(self.connection, current_user)
except Exception as e:
raise create_operation_error(
message="User not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"user_id": current_user, "error": str(e)},
)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
raise create_access_error(
message="Insufficient permissions",
status_code=status.HTTP_403_FORBIDDEN,
details={
"user_id": current_user,
"user_role": authorize_user.role.value,
"required_roles": [AccountRole.OWNER.value, AccountRole.ADMIN.value],
},
)
async def db_user_role_validation_for_list_events_and_process_schema(connection, current_user):
authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
return authorize_user, False
else:
return authorize_user, True
return authorize_user
async def validate_ownership(self, current_user: int, resource_owner_id: int):
"""Проверяет владение ресурсом или права администратора"""
try:
authorize_user = await get_user_by_login(self.connection, current_user)
except Exception as e:
raise create_operation_error(
message="User not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"user_id": current_user, "error": str(e)},
)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
if authorize_user.id != resource_owner_id:
raise create_access_error(
message="Access denied",
status_code=status.HTTP_403_FORBIDDEN,
details={
"user_id": current_user,
"resource_owner_id": resource_owner_id,
"user_role": authorize_user.role.value,
"reason": "User is not the owner and does not have admin privileges",
},
)
return authorize_user
async def get_user(self, current_user: int):
"""Получает пользователя с админ-статусом"""
try:
authorize_user = await get_user_by_login(self.connection, current_user)
except Exception as e:
raise create_operation_error(
message="User not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"user_id": current_user, "error": str(e)},
)
is_admin = authorize_user.role in {AccountRole.OWNER, AccountRole.ADMIN}
return authorize_user, is_admin