3 Commits

Author SHA1 Message Date
TheNoxium
4aa2c12a48 feat: listen - start node 2025-09-26 15:09:09 +05:00
TheNoxium
0ab9727223 fix: camel case response 2025-09-15 18:51:02 +05:00
TheNoxium
1d367a1bad feat: node enpoin, node_start process_schema 2025-09-12 22:35:46 +05:00
22 changed files with 277 additions and 796 deletions

View File

@@ -1,52 +0,0 @@
"""add_cascade_delete_to_node_link_foreign_keys
Revision ID: 80840e78631e
Revises: cc3b95f1f99d
Create Date: 2025-10-26 18:47:24.004327
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '80840e78631e'
down_revision: Union[str, None] = 'cc3b95f1f99d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# Drop existing foreign key constraints
# Note: These constraint names are MySQL auto-generated names
# If they differ, check with: SHOW CREATE TABLE node_link;
op.drop_constraint('node_link_ibfk_2', 'node_link', type_='foreignkey') # next_node_id
op.drop_constraint('node_link_ibfk_3', 'node_link', type_='foreignkey') # node_id
# Add new foreign key constraints with CASCADE
op.create_foreign_key(
'fk_node_link_next_node_id_cascade',
'node_link', 'ps_node',
['next_node_id'], ['id'],
ondelete='CASCADE'
)
op.create_foreign_key(
'fk_node_link_node_id_cascade',
'node_link', 'ps_node',
['node_id'], ['id'],
ondelete='CASCADE'
)
def downgrade() -> None:
"""Downgrade schema."""
# Drop CASCADE foreign key constraints
op.drop_constraint('fk_node_link_next_node_id_cascade', 'node_link', type_='foreignkey')
op.drop_constraint('fk_node_link_node_id_cascade', 'node_link', type_='foreignkey')
# Restore original foreign key constraints without CASCADE
op.create_foreign_key('node_link_ibfk_2', 'node_link', 'ps_node', ['next_node_id'], ['id'])
op.create_foreign_key('node_link_ibfk_3', 'node_link', 'ps_node', ['node_id'], ['id'])

View File

@@ -11,18 +11,20 @@ from orm.tables.process import ps_node_table, node_link_table
from orm.tables.process import NodeLinkStatus from orm.tables.process import NodeLinkStatus
async def get_last_link_name_by_node_id(connection: AsyncConnection, ps_id: int) -> Optional[str]:
async def get_last_link_name_by_node_id(
connection: AsyncConnection, ps_id: int
) -> Optional[str]:
""" """
Получает link_name из последней записи node_link по ps_id. Получает link_name из последней записи node_link по ps_id.
Находит все node_id в ps_node по ps_id, затем ищет связи в node_link Находит все node_id в ps_node по ps_id, затем ищет связи в node_link
и возвращает link_name из самой последней записи. и возвращает link_name из самой последней записи.
""" """
query = ( query = select(node_link_table.c.link_name).where(
select(node_link_table.c.link_name) node_link_table.c.node_id.in_(
.where(node_link_table.c.node_id.in_(select(ps_node_table.c.id).where(ps_node_table.c.ps_id == ps_id))) select(ps_node_table.c.id).where(ps_node_table.c.ps_id == ps_id)
.order_by(desc(node_link_table.c.created_at))
.limit(1)
) )
).order_by(desc(node_link_table.c.created_at)).limit(1)
result = await connection.execute(query) result = await connection.execute(query)
link_name = result.scalar_one_or_none() link_name = result.scalar_one_or_none()
@@ -36,15 +38,12 @@ async def get_last_node_link_by_creator_and_ps_id(
""" """
Получает последнюю созданную node_link для данного создателя и процесса. Получает последнюю созданную node_link для данного создателя и процесса.
""" """
query = ( query = select(node_link_table).where(
select(node_link_table)
.where(
node_link_table.c.creator_id == creator_id, node_link_table.c.creator_id == creator_id,
node_link_table.c.node_id.in_(select(ps_node_table.c.id).where(ps_node_table.c.id == node_link_id)), node_link_table.c.node_id.in_(
) select(ps_node_table.c.id).where(ps_node_table.c.id == node_link_id)
.order_by(desc(node_link_table.c.created_at))
.limit(1)
) )
).order_by(desc(node_link_table.c.created_at)).limit(1)
node_link_db_cursor = await connection.execute(query) node_link_db_cursor = await connection.execute(query)
node_link_data = node_link_db_cursor.mappings().one_or_none() node_link_data = node_link_db_cursor.mappings().one_or_none()
@@ -56,9 +55,7 @@ async def get_last_node_link_by_creator_and_ps_id(
async def create_node_link_schema( async def create_node_link_schema(
connection: AsyncConnection, connection: AsyncConnection, validated_link_schema, creator_id: int,
validated_link_schema,
creator_id: int,
) -> Optional[NodeLink]: ) -> Optional[NodeLink]:
""" """
Создает нове поле в таблице process_schema_table. Создает нове поле в таблице process_schema_table.

View File

@@ -6,7 +6,7 @@ from datetime import datetime, timezone
from sqlalchemy import insert, select, func, or_, and_, asc, desc from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
from orm.tables.process import process_schema_table, ProcessStatus from orm.tables.process import process_schema_table
from api.schemas.process.process_schema import ProcessSchema from api.schemas.process.process_schema import ProcessSchema
@@ -50,9 +50,8 @@ async def get_process_schema_page_DTO(
or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term)) or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term))
) )
filter_conditions = []
if filter_dto.filters: if filter_dto.filters:
filter_conditions = []
for field, values in filter_dto.filters.items(): for field, values in filter_dto.filters.items():
column = getattr(process_schema_table.c, field, None) column = getattr(process_schema_table.c, field, None)
if column is not None and values: if column is not None and values:
@@ -61,9 +60,6 @@ async def get_process_schema_page_DTO(
else: else:
filter_conditions.append(column.in_(values)) filter_conditions.append(column.in_(values))
if filter_dto.filters is None or "status" not in filter_dto.filters:
filter_conditions.append(process_schema_table.c.status != "DELETED")
if filter_conditions: if filter_conditions:
query = query.where(and_(*filter_conditions)) query = query.where(and_(*filter_conditions))
@@ -90,7 +86,7 @@ async def get_process_schema_page_DTO(
or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term)) or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term))
) )
if filter_conditions: if filter_dto.filters and filter_conditions:
count_query = count_query.where(and_(*filter_conditions)) count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query) result = await connection.execute(query)
@@ -115,6 +111,21 @@ async def get_process_schema_page_DTO(
) )
async def get_process_schema_by_title(connection: AsyncConnection, title: str) -> Optional[ProcessSchema]:
"""
Получает process schema по title.
"""
query = select(process_schema_table).where(process_schema_table.c.title == title)
process_schema_db_cursor = await connection.execute(query)
process_schema_data = process_schema_db_cursor.mappings().one_or_none()
if not process_schema_data:
return None
return ProcessSchema.model_validate(process_schema_data)
async def get_process_schema_by_id(connection: AsyncConnection, id: int) -> Optional[ProcessSchema]: async def get_process_schema_by_id(connection: AsyncConnection, id: int) -> Optional[ProcessSchema]:
""" """
Получает process_schema по id. Получает process_schema по id.
@@ -141,8 +152,11 @@ async def update_process_schema_by_id(connection: AsyncConnection, update_values
await connection.commit() await connection.commit()
async def update_process_schema_settings_by_id( async def update_process_schema_settings_by_id(
connection: AsyncConnection, process_schema_id: int, node_data: Dict[str, Any] connection: AsyncConnection,
process_schema_id: int,
node_data: Dict[str, Any]
): ):
""" """
Добавляет новый узел в массив 'nodes' в настройках процесса. Добавляет новый узел в массив 'nodes' в настройках процесса.
@@ -174,35 +188,24 @@ async def update_process_schema_settings_by_id(
await connection.commit() await connection.commit()
async def get_last_created_process_schema(connection: AsyncConnection) -> Optional[int]:
"""
Получает ID последней созданной схемы процесса.
"""
query = select(process_schema_table.c.id).order_by(desc(process_schema_table.c.id)).limit(1)
result = await connection.execute(query)
last_id = result.scalar_one_or_none()
return last_id
async def create_process_schema( async def create_process_schema(
connection: AsyncConnection, creator_id: int, title: str, description: str connection: AsyncConnection, process_schema: ProcessSchema, creator_id: int
) -> Optional[int]: ) -> Optional[ProcessSchema]:
""" """
Создает новое поле в таблице process_schema_table. Создает нове поле в таблице process_schema_table.
""" """
query = insert(process_schema_table).values( query = insert(process_schema_table).values(
title=title, title=process_schema.title,
description=description, description=process_schema.description,
owner_id=creator_id, owner_id=process_schema.owner_id,
creator_id=creator_id, creator_id=creator_id,
created_at=datetime.now(timezone.utc), created_at=datetime.now(timezone.utc),
settings={}, settings=process_schema.settings,
status=ProcessStatus.ACTIVE.value, status=process_schema.status.value,
) )
result = await connection.execute(query) await connection.execute(query)
await connection.commit() await connection.commit()
return result.lastrowid return process_schema

View File

@@ -1,15 +1,15 @@
from typing import Optional, List from typing import Optional
from datetime import datetime, timezone from datetime import datetime, timezone
from sqlalchemy import insert, select, desc, and_, delete, update from sqlalchemy import insert, select, desc
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
from orm.tables.process import ps_node_table, node_link_table, process_schema_table from orm.tables.process import ps_node_table
from api.schemas.process.ps_node import Ps_Node from api.schemas.process.ps_node import Ps_Node
from model_nodes.node_listen_models import ListenNodeCoreSchema from model_nodes.node_start_models import StartNodeCoreSchema
from orm.tables.process import NodeStatus from orm.tables.process import NodeStatus,NodeType
async def get_ps_node_by_id(connection: AsyncConnection, id: int) -> Optional[Ps_Node]: async def get_ps_node_by_id(connection: AsyncConnection, id: int) -> Optional[Ps_Node]:
@@ -27,18 +27,54 @@ async def get_ps_node_by_id(connection: AsyncConnection, id: int) -> Optional[Ps
return Ps_Node.model_validate(ps_node_data) return Ps_Node.model_validate(ps_node_data)
async def get_ps_node_by_type_and_ps_id(connection: AsyncConnection, node_type: str, ps_id: int) -> Optional[Ps_Node]:
"""
Получает ps_node по node_type и ps_id.
"""
query = select(ps_node_table).where(
ps_node_table.c.node_type == node_type,
ps_node_table.c.ps_id == ps_id
)
ps_node_db_cursor = await connection.execute(query)
ps_node_data = ps_node_db_cursor.mappings().one_or_none()
if not ps_node_data:
return None
return Ps_Node.model_validate(ps_node_data)
async def create_ps_node_start_schema(
connection: AsyncConnection, validated_start_schema: StartNodeCoreSchema, creator_id: int
) -> Optional[Ps_Node]:
"""
Создает нове поле в таблице process_schema_table.
"""
query = insert(ps_node_table).values(
ps_id=validated_start_schema.ps_id,
node_type=NodeType.START.value,
settings=validated_start_schema.data.model_dump(),
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
status=NodeStatus.ACTIVE.value,
)
await connection.execute(query)
await connection.commit()
# return validated_start_schema
async def get_last_ps_node_by_creator_and_ps_id( async def get_last_ps_node_by_creator_and_ps_id(
connection: AsyncConnection, creator_id: int, ps_id: int connection: AsyncConnection, creator_id: int, ps_id: int
) -> Optional[Ps_Node]: ) -> Optional[Ps_Node]:
""" """
Получает последнюю созданную ps_node для данного создателя и процесса. Получает последнюю созданную ps_node для данного создателя и процесса.
""" """
query = ( query = select(ps_node_table).where(
select(ps_node_table) ps_node_table.c.creator_id == creator_id,
.where(ps_node_table.c.creator_id == creator_id, ps_node_table.c.ps_id == ps_id) ps_node_table.c.ps_id == ps_id
.order_by(desc(ps_node_table.c.created_at)) ).order_by(desc(ps_node_table.c.created_at)).limit(1)
.limit(1)
)
ps_node_db_cursor = await connection.execute(query) ps_node_db_cursor = await connection.execute(query)
ps_node_data = ps_node_db_cursor.mappings().one_or_none() ps_node_data = ps_node_db_cursor.mappings().one_or_none()
@@ -48,12 +84,9 @@ async def get_last_ps_node_by_creator_and_ps_id(
return Ps_Node.model_validate(ps_node_data) return Ps_Node.model_validate(ps_node_data)
async def create_ps_node_schema( async def create_ps_node_schema(
connection: AsyncConnection, connection: AsyncConnection, validated_schema, creator_id: int,
validated_schema, ) -> Optional[Ps_Node]:
creator_id: int,
) -> Optional[ListenNodeCoreSchema]:
""" """
Создает нове поле в таблице process_schema_table. Создает нове поле в таблице process_schema_table.
""" """
@@ -71,121 +104,3 @@ async def create_ps_node_schema(
await connection.commit() await connection.commit()
return await get_last_ps_node_by_creator_and_ps_id(connection, creator_id, validated_schema.ps_id) return await get_last_ps_node_by_creator_and_ps_id(connection, creator_id, validated_schema.ps_id)
async def check_node_connection(connection: AsyncConnection, node_id: int, next_node_id: int, port: int) -> bool:
"""
Проверяет, подключен ли next_node_id к node_id через указанный порт.
"""
query = select(node_link_table).where(
and_(
node_link_table.c.node_id == node_id,
node_link_table.c.next_node_id == next_node_id,
node_link_table.c.link_point_id == port,
)
)
result = await connection.execute(query)
return result.mappings().first() is not None
async def get_nodes_for_deletion_ordered(connection: AsyncConnection, node_id: int) -> List[int]:
"""
Рекурсивно находит ВСЕ дочерние узлы и возвращает их ID в правильном порядке:
от самых глубоких к корневым.
"""
all_child_nodes = []
visited_nodes = set()
# тут надо будет поработать с зацикливанием на вышестояющую ноду, сейчас вышестоящая нода если она уже была учтена, не будет занесена в спиок на удаление.
async def find_children_with_depth(current_node_id: int, current_depth: int):
if current_node_id in visited_nodes:
return
visited_nodes.add(current_node_id)
query = (
select(ps_node_table)
.join(node_link_table, ps_node_table.c.id == node_link_table.c.next_node_id)
.where(node_link_table.c.node_id == current_node_id)
)
result = await connection.execute(query)
child_nodes = result.mappings().all()
for node_data in child_nodes:
node = Ps_Node.model_validate(node_data)
all_child_nodes.append((node, current_depth + 1))
await find_children_with_depth(node.id, current_depth + 1)
await find_children_with_depth(node_id, 0)
all_child_nodes.sort(key=lambda x: x[1], reverse=True)
ordered_node_ids = [node.id for node, depth in all_child_nodes]
ordered_node_ids.append(node_id)
return ordered_node_ids
async def delete_ps_nodes_delete_handler(connection: AsyncConnection, node_ids: List[int]) -> List[int]:
"""
Очищает settings и удаляет ноды для каждого ps_id.
Возвращает список успешно удаленных ID нод.
"""
if not node_ids:
return []
ps_id_rows = await connection.execute(
select(ps_node_table.c.id, ps_node_table.c.ps_id).where(ps_node_table.c.id.in_(node_ids))
)
rows = ps_id_rows.mappings().all()
if not rows:
return []
ps_to_node_ids = {}
for r in rows:
ps_to_node_ids.setdefault(r["ps_id"], []).append(r["id"])
deleted_all = []
for ps_id, ids_in_ps in ps_to_node_ids.items():
await remove_nodes_from_process_schema_settings(connection, ps_id, ids_in_ps)
result = await connection.execute(delete(ps_node_table).where(ps_node_table.c.id.in_(ids_in_ps)))
if result.rowcount and result.rowcount > 0:
deleted_all.extend(ids_in_ps)
else:
raise Exception(f"Failed to delete nodes for ps_id={ps_id}")
await connection.commit()
return deleted_all
async def remove_nodes_from_process_schema_settings(connection: AsyncConnection, ps_id: int, node_ids: List[int]):
"""
Удаляет ноды из поля settings в таблице process_schema по списку node_ids.
"""
from api.db.logic.process_schema import get_process_schema_by_id
process_schema = await get_process_schema_by_id(connection, ps_id)
if not process_schema or not process_schema.settings:
return
settings = process_schema.settings
if "nodes" in settings and isinstance(settings["nodes"], list):
node_ids_set = set(node_ids)
settings["nodes"] = [
node_item
for node_item in settings["nodes"]
if not (
isinstance(node_item, dict)
and "node" in node_item
and isinstance(node_item["node"], dict)
and node_item["node"].get("id") in node_ids_set
)
]
await connection.execute(
update(process_schema_table).where(process_schema_table.c.id == ps_id).values(settings=settings)
)

View File

@@ -6,15 +6,7 @@ from api.endpoints.list_events import api_router as listevents_router
from api.endpoints.process_schema import api_router as processschema_router from api.endpoints.process_schema import api_router as processschema_router
from api.endpoints.ps_node import api_router as ps_node_router from api.endpoints.ps_node import api_router as ps_node_router
list_of_routes = [ list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router, processschema_router, ps_node_router]
auth_router,
profile_router,
account_router,
keyring_router,
listevents_router,
processschema_router,
ps_node_router,
]
__all__ = [ __all__ = [
"list_of_routes", "list_of_routes",

View File

@@ -17,7 +17,7 @@ from api.schemas.account.account import User
from api.schemas.base import bearer_schema from api.schemas.base import bearer_schema
from api.schemas.endpoints.account import AllUserResponse, UserCreate, UserFilterDTO, UserUpdate from api.schemas.endpoints.account import AllUserResponse, UserCreate, UserFilterDTO, UserUpdate
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.services.user_role_validation import UserRoleValidator from api.services.user_role_validation import db_user_role_validation
api_router = APIRouter( api_router = APIRouter(
prefix="/account", prefix="/account",
@@ -38,8 +38,7 @@ async def get_all_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
filters = { filters = {
**({"status": status_filter} if status_filter else {}), **({"status": status_filter} if status_filter else {}),
@@ -68,8 +67,7 @@ async def get_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
user = await get_user_by_id(connection, user_id) user = await get_user_by_id(connection, user_id)
@@ -85,8 +83,7 @@ async def create_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
user_validation = await get_user_by_login(connection, user.login) user_validation = await get_user_by_login(connection, user.login)
@@ -107,8 +104,7 @@ async def update_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
user = await get_user_by_id(connection, user_id) user = await get_user_by_id(connection, user_id)
if user is None: if user is None:
@@ -135,8 +131,7 @@ async def delete_account_endpoint(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
user = await get_user_by_id(connection, user_id) user = await get_user_by_id(connection, user_id)
if user is None: if user is None:

View File

@@ -13,7 +13,7 @@ from api.schemas.account.account_keyring import AccountKeyring
from api.schemas.base import bearer_schema from api.schemas.base import bearer_schema
from api.schemas.endpoints.account_keyring import AccountKeyringUpdate from api.schemas.endpoints.account_keyring import AccountKeyringUpdate
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.services.user_role_validation import UserRoleValidator from api.services.user_role_validation import db_user_role_validation
api_router = APIRouter( api_router = APIRouter(
prefix="/keyring", prefix="/keyring",
@@ -22,11 +22,10 @@ api_router = APIRouter(
@api_router.get("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring) @api_router.get("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def get_keyring_endpoint( async def get_keyring_endpoint (
key_id: str, connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user) key_id: str, connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user)
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
keyring = await get_key_by_id(connection, key_id) keyring = await get_key_by_id(connection, key_id)
@@ -44,8 +43,7 @@ async def create_keyring_endpoint(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
keyring = await get_key_by_id(connection, key_id) keyring = await get_key_by_id(connection, key_id)
@@ -71,8 +69,7 @@ async def update_keyring_endpoint(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
keyring = await get_key_by_id(connection, key_id) keyring = await get_key_by_id(connection, key_id)
if keyring is None: if keyring is None:
@@ -97,8 +94,7 @@ async def delete_keyring_endpoint(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation(connection, current_user)
authorize_user = await validator.validate_admin(current_user)
keyring = await get_key_by_id(connection, key_id) keyring = await get_key_by_id(connection, key_id)
if keyring is None: if keyring is None:

View File

@@ -17,7 +17,10 @@ from api.schemas.base import bearer_schema
from api.schemas.endpoints.list_events import AllListEventResponse, ListEventFilterDTO, ListEventUpdate from api.schemas.endpoints.list_events import AllListEventResponse, ListEventFilterDTO, ListEventUpdate
from api.schemas.events.list_events import ListEvent from api.schemas.events.list_events import ListEvent
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.services.user_role_validation import UserRoleValidator from api.services.user_role_validation import (
db_user_role_validation_for_list_events_and_process_schema,
db_user_role_validation_for_list_events_and_process_schema_by_list_event_id,
)
api_router = APIRouter( api_router = APIRouter(
prefix="/list_events", prefix="/list_events",
@@ -51,8 +54,9 @@ async def get_all_list_events_endpoint(
filters=filters if filters else None, filters=filters if filters else None,
) )
validator = UserRoleValidator(connection) authorize_user, page_flag = await db_user_role_validation_for_list_events_and_process_schema(
authorize_user, page_flag = await validator.get_user(current_user) connection, current_user
)
if not page_flag: if not page_flag:
if filter_dto.filters is None: if filter_dto.filters is None:
@@ -78,8 +82,9 @@ async def get_list_events_endpoint(
if list_events_validation is None: if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id) connection, current_user, list_events_validation.creator_id
)
if list_events_id is None: if list_events_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
@@ -119,8 +124,9 @@ async def update_list_events(
if list_events_validation is None: if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id) connection, current_user, list_events_validation.creator_id
)
updated_values = list_events_update.model_dump(by_alias=True, exclude_none=True) updated_values = list_events_update.model_dump(by_alias=True, exclude_none=True)
@@ -145,8 +151,9 @@ async def delete_list_events_endpoint(
if list_events_validation is None: if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id) connection, current_user, list_events_validation.creator_id
)
list_events_update = ListEventUpdate(status=EventStatus.DELETED.value) list_events_update = ListEventUpdate(status=EventStatus.DELETED.value)

View File

@@ -9,6 +9,7 @@ from api.db.logic.account import get_user_by_login
from api.db.logic.process_schema import ( from api.db.logic.process_schema import (
create_process_schema, create_process_schema,
get_process_schema_by_id, get_process_schema_by_id,
get_process_schema_by_title,
get_process_schema_page_DTO, get_process_schema_page_DTO,
update_process_schema_by_id, update_process_schema_by_id,
) )
@@ -18,10 +19,14 @@ from api.schemas.process.process_schema import ProcessSchema, ProcessSchemaSetti
from api.schemas.process.ps_node import Ps_NodeFrontResponseNode from api.schemas.process.ps_node import Ps_NodeFrontResponseNode
from api.schemas.process.ps_node import Ps_NodeFrontResponse from api.schemas.process.ps_node import Ps_NodeFrontResponse
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.services.user_role_validation import UserRoleValidator from api.services.user_role_validation import (
db_user_role_validation_for_list_events_and_process_schema,
db_user_role_validation_for_list_events_and_process_schema_by_list_event_id,
)
from api.db.logic.ps_node import create_ps_node_schema from api.db.logic.ps_node import create_ps_node_schema
from api.db.logic.process_schema import update_process_schema_settings_by_id from api.db.logic.process_schema import update_process_schema_settings_by_id
from orm.tables.process import NodeType from orm.tables.process import NodeType
@@ -32,8 +37,6 @@ from core import VorkNodeRegistry
from model_nodes import ListenNodeData from model_nodes import ListenNodeData
from api.utils.node_counter import increment_node_counter
api_router = APIRouter( api_router = APIRouter(
prefix="/process_schema", prefix="/process_schema",
@@ -41,7 +44,10 @@ api_router = APIRouter(
) )
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllProcessSchemaResponse) @api_router.get("", dependencies=[Depends(bearer_schema)],
# response_model=AllProcessSchemaResponse
)
async def get_all_process_schema_endpoint( async def get_all_process_schema_endpoint(
page: int = Query(1, description="Page number", gt=0), page: int = Query(1, description="Page number", gt=0),
limit: int = Query(10, description="Number of items per page", gt=0), limit: int = Query(10, description="Number of items per page", gt=0),
@@ -50,20 +56,12 @@ async def get_all_process_schema_endpoint(
order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"), order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"),
status_filter: Optional[List[str]] = Query(None, description="Filter by status"), status_filter: Optional[List[str]] = Query(None, description="Filter by status"),
owner_id: Optional[List[str]] = Query(None, description="Filter by owner id"), owner_id: Optional[List[str]] = Query(None, description="Filter by owner id"),
show_deleted: bool = Query(False, description="Show only deleted schemas"),
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
creator_id: Optional[int] = Query(None, description="Filter by creator id"), creator_id: Optional[int] = Query(None, description="Filter by creator id"),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
if show_deleted:
status_to_filter = ["DELETED"]
elif status_filter:
status_to_filter = status_filter
else:
status_to_filter = None
filters = { filters = {
**({"status": status_to_filter} if status_to_filter else {}), **({"status": status_filter} if status_filter else {}),
**({"owner_id": owner_id} if owner_id else {}), **({"owner_id": owner_id} if owner_id else {}),
**({"creator_id": [str(creator_id)]} if creator_id else {}), **({"creator_id": [str(creator_id)]} if creator_id else {}),
} }
@@ -75,8 +73,9 @@ async def get_all_process_schema_endpoint(
filters=filters if filters else None, filters=filters if filters else None,
) )
validator = UserRoleValidator(connection) authorize_user, page_flag = await db_user_role_validation_for_list_events_and_process_schema(
authorize_user, page_flag = await validator.get_user(current_user) connection, current_user
)
if not page_flag: if not page_flag:
if filter_dto.filters is None: if filter_dto.filters is None:
@@ -102,8 +101,9 @@ async def get_process_schema_endpoint(
if process_schema_validation is None: if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id) connection, current_user, process_schema_validation.creator_id
)
if process_schema_id is None: if process_schema_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
@@ -111,23 +111,27 @@ async def get_process_schema_endpoint(
return to_camel_dict(process_schema_validation.model_dump()) return to_camel_dict(process_schema_validation.model_dump())
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=ProcessSchemaResponse) @api_router.post("", dependencies=[Depends(bearer_schema)],
response_model=ProcessSchemaResponse
)
async def create_processschema_endpoint( async def create_processschema_endpoint(
process_schema: ProcessSchemaUpdate,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
user_validation = await get_user_by_login(connection, current_user) user_validation = await get_user_by_login(connection, current_user)
process_schema_validation = await get_process_schema_by_title(connection, process_schema.title)
current_node_counter = increment_node_counter() if process_schema_validation is None:
title = f"Новая схема {current_node_counter}"
description = "Default description" await create_process_schema(connection, process_schema, user_validation.id)
process_schema_new = await get_process_schema_by_title(connection, process_schema.title)
node_id = await create_process_schema(connection, user_validation.id, title, description) start_node_data = ListenNodeData(
ps_id=process_schema_new.id,
process_schema_new = await get_process_schema_by_id(connection, node_id) node_type=NodeType.START.value,
is_start="True"
start_node_data = ListenNodeData(ps_id=process_schema_new.id, node_type=NodeType.START.value, is_start="True") )
start_node_links = {} start_node_links = {}
@@ -137,10 +141,13 @@ async def create_processschema_endpoint(
node_descriptor = vork_node.form() node_descriptor = vork_node.form()
start_node = vork_node(data=start_node_data.model_dump(), links=start_node_links) start_node = vork_node(data=start_node_data.model_dump(), links=start_node_links)
validated_start_schema = start_node.validate() validated_start_schema = start_node.validate()
print(validated_start_schema)
db_start_schema = await create_ps_node_schema(connection, validated_start_schema, user_validation.id) db_start_schema = await create_ps_node_schema(connection, validated_start_schema, user_validation.id)
node = ProcessSchemaSettingsNode( node = ProcessSchemaSettingsNode(
@@ -148,31 +155,36 @@ async def create_processschema_endpoint(
node_type=NodeType.LISTEN.value, node_type=NodeType.LISTEN.value,
data=validated_start_schema.data.model_dump(), data=validated_start_schema.data.model_dump(),
from_node=None, from_node=None,
links=None, links=None)
)
settings_dict = {"node": node.model_dump(mode="json")} settings_dict = {"node": node.model_dump(mode='json')}
await update_process_schema_settings_by_id(connection, process_schema_new.id, settings_dict) await update_process_schema_settings_by_id(connection, process_schema_new.id, settings_dict)
process_schema_new = await get_process_schema_by_id(connection, node_id) process_schema_new = await get_process_schema_by_title(connection, process_schema.title)
ps_node_front_response = Ps_NodeFrontResponse( ps_node_front_response = Ps_NodeFrontResponse(
description=node_descriptor.model_dump(), description=node_descriptor,
node=Ps_NodeFrontResponseNode( node=Ps_NodeFrontResponseNode(
id=db_start_schema.id, node_type=NodeType.LISTEN.value, data=validated_start_schema.data.model_dump() id=db_start_schema.id,
), node_type=NodeType.LISTEN.value,
link=None, data=validated_start_schema.data.model_dump()),
) link=None)
response_data = { response_data = {
"process_schema": process_schema_new.model_dump(), "process_schema": process_schema_new.model_dump(),
"node_listen": ps_node_front_response.model_dump(), "node_listen": ps_node_front_response.model_dump()}
}
return to_camel_dict(response_data) return to_camel_dict(response_data)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="An process schema with this information already exists."
)
@api_router.put("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema) @api_router.put("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def update_process_schema_endpoint( async def update_process_schema_endpoint(
process_schema_id: int, process_schema_id: int,
@@ -185,8 +197,9 @@ async def update_process_schema_endpoint(
if process_schema_validation is None: if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id) connection, current_user, process_schema_validation.creator_id
)
updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True) updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True)
@@ -200,7 +213,7 @@ async def update_process_schema_endpoint(
return process_schema return process_schema
@api_router.delete("/{process_schema_id}", dependencies=[Depends(bearer_schema)], status_code=status.HTTP_200_OK) @api_router.delete("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def delete_process_schema_endpoint( async def delete_process_schema_endpoint(
process_schema_id: int, process_schema_id: int,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
@@ -211,8 +224,9 @@ async def delete_process_schema_endpoint(
if process_schema_validation is None: if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
validator = UserRoleValidator(connection) authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id) connection, current_user, process_schema_validation.creator_id
)
process_schema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value) process_schema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value)
@@ -223,6 +237,6 @@ async def delete_process_schema_endpoint(
await update_process_schema_by_id(connection, updated_values, process_schema_validation) await update_process_schema_by_id(connection, updated_values, process_schema_validation)
await get_process_schema_by_id(connection, process_schema_id) process_schema = await get_process_schema_by_id(connection, process_schema_id)
return HTTPException(status_code=status.HTTP_200_OK, detail="Process schema deleted successfully") return process_schema

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, status from fastapi import APIRouter, Depends, HTTPException,status
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
@@ -7,28 +7,23 @@ from api.db.logic.account import get_user_by_login
from api.schemas.base import bearer_schema from api.schemas.base import bearer_schema
from api.schemas.process.process_schema import ProcessSchemaSettingsNodeLink, ProcessSchemaSettingsNode from api.schemas.process.process_schema import ProcessSchemaSettingsNodeLink, ProcessSchemaSettingsNode
from api.schemas.process.ps_node import Ps_NodeFrontResponseNode, Ps_NodeRequest, Ps_NodeDeleteRequest from api.schemas.process.ps_node import Ps_NodeFrontResponseNode, Ps_NodeRequest
from api.schemas.process.ps_node import Ps_NodeFrontResponse from api.schemas.process.ps_node import Ps_NodeFrontResponse
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.db.logic.ps_node import ( from api.db.logic.ps_node import create_ps_node_schema
create_ps_node_schema,
get_ps_node_by_id,
check_node_connection,
get_nodes_for_deletion_ordered,
delete_ps_nodes_delete_handler,
)
from api.db.logic.node_link import get_last_link_name_by_node_id, create_node_link_schema from api.db.logic.node_link import get_last_link_name_by_node_id, create_node_link_schema
from api.db.logic.process_schema import update_process_schema_settings_by_id, get_process_schema_by_id from api.db.logic.process_schema import update_process_schema_settings_by_id
from api.services.user_role_validation import UserRoleValidator
from core import VorkNodeRegistry, VorkNodeLink from core import VorkNodeRegistry, VorkNodeLink
from model_nodes import VorkNodeLinkData from model_nodes import VorkNodeLinkData
from api.utils.to_camel_dict import to_camel_dict from api.utils.to_camel_dict import to_camel_dict
from api.error import create_operation_error, create_access_error, create_validation_error, create_server_error
api_router = APIRouter( api_router = APIRouter(
@@ -37,113 +32,22 @@ api_router = APIRouter(
) )
@api_router.delete("", dependencies=[Depends(bearer_schema)], status_code=status.HTTP_200_OK) @api_router.post("", dependencies=[Depends(bearer_schema)],response_model=Ps_NodeFrontResponse)
async def delete_ps_node_endpoint(
ps_node_delete_data: Ps_NodeDeleteRequest,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
process_schema = await get_process_schema_by_id(connection, ps_node_delete_data.schema_id)
if process_schema is None:
raise create_operation_error(
message="Process schema not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"schema_id": ps_node_delete_data.schema_id},
)
validator = UserRoleValidator(connection)
try:
await validator.validate_ownership(current_user, process_schema.creator_id)
except Exception as e:
raise create_access_error(
message="Access denied",
status_code=status.HTTP_403_FORBIDDEN,
details={"user_id": current_user, "schema_creator_id": process_schema.creator_id, "reason": str(e)},
)
ps_node = await get_ps_node_by_id(connection, ps_node_delete_data.node_id)
if ps_node is None:
raise create_operation_error(
message="PS node not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"node_id": ps_node_delete_data.node_id},
)
next_ps_node = await get_ps_node_by_id(connection, ps_node_delete_data.next_node_id)
if next_ps_node is None:
raise create_operation_error(
message="Next PS node not found",
status_code=status.HTTP_400_BAD_REQUEST,
details={"next_node_id": ps_node_delete_data.next_node_id},
)
is_connected = await check_node_connection(
connection, ps_node_delete_data.node_id, ps_node_delete_data.next_node_id, int(ps_node_delete_data.port)
)
if not is_connected:
raise create_validation_error(
message="Node connection validation failed",
status_code=status.HTTP_400_BAD_REQUEST,
details={
"node_id": ps_node_delete_data.node_id,
"next_node_id": ps_node_delete_data.next_node_id,
"port": ps_node_delete_data.port,
},
)
ordered_node_ids = await get_nodes_for_deletion_ordered(connection, ps_node_delete_data.next_node_id)
try:
deleted_node_ids = await delete_ps_nodes_delete_handler(connection, ordered_node_ids)
except Exception as e:
raise create_server_error(
message="Failed to delete nodes",
status_code=500,
details={"error": str(e), "ordered_node_ids": ordered_node_ids},
)
return {
"deleted_node_ids": deleted_node_ids,
}
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=Ps_NodeFrontResponse)
async def create_ps_node_endpoint( async def create_ps_node_endpoint(
ps_node: Ps_NodeRequest, ps_node: Ps_NodeRequest,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
user_validation = await get_user_by_login(connection, current_user) user_validation = await get_user_by_login(connection, current_user)
process_schema = await get_process_schema_by_id(connection, ps_node.data["ps_id"])
if process_schema is None:
raise create_operation_error(
message="Process schema not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"schema_id": ps_node.data["ps_id"]},
)
validator = UserRoleValidator(connection)
try:
await validator.validate_ownership(current_user, process_schema.creator_id)
except Exception as e:
raise create_access_error(
message="Access denied",
status_code=status.HTTP_403_FORBIDDEN,
details={"user_id": current_user, "schema_creator_id": process_schema.creator_id, "reason": str(e)},
)
registery = VorkNodeRegistry() registery = VorkNodeRegistry()
vork_node = registery.get(ps_node.data["node_type"]) vork_node = registery.get(ps_node.data["node_type"])
if vork_node is None: if vork_node is None:
raise create_operation_error( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Node not found")
message="Node type not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"node_type": ps_node.data["node_type"]},
)
node_descriptor = vork_node.form() node_descriptor = vork_node.form()
try: try:
@@ -152,67 +56,34 @@ async def create_ps_node_endpoint(
node_instance_validated = node_instance.validate() node_instance_validated = node_instance.validate()
except Exception as e: except Exception as e:
raise create_validation_error( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
message="Node validation failed",
status_code=status.HTTP_400_BAD_REQUEST,
details={"error": str(e)},
)
# Проверка: parent_id принадлежит тому же ps_id
parent_id = node_instance_validated.parent_id
target_ps_id = ps_node.data["ps_id"]
parent_node = await get_ps_node_by_id(connection, parent_id)
if parent_node is None:
raise create_operation_error(
message="Parent PS node not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"parent_id": parent_id},
)
if parent_node.ps_id != target_ps_id:
raise create_validation_error(
message="Parent PS node belongs to another process schema",
status_code=status.HTTP_400_BAD_REQUEST,
details={"parent_id": parent_id, "expected_ps_id": target_ps_id, "actual_ps_id": parent_node.ps_id},
)
# Проверка: у родительской ноды есть указанный порт
parent_port_number = node_instance_validated.parent_port_number
# Извлекаем номера портов из settings родительской ноды
parent_settings = parent_node.settings or {}
available_port_numbers = []
# Ищем все ключи, содержащие "port" в названии
for key, value in parent_settings.items():
if "port" in key.lower() and isinstance(value, int):
available_port_numbers.append(value)
# Проверяем, что указанный порт существует в settings
if parent_port_number not in available_port_numbers:
raise create_validation_error(
message="Parent port number is invalid",
status_code=status.HTTP_400_BAD_REQUEST,
details={"parent_id": parent_id, "parent_settings": parent_settings},
)
try:
db_ps_node = await create_ps_node_schema(connection, node_instance_validated, user_validation.id) db_ps_node = await create_ps_node_schema(connection, node_instance_validated, user_validation.id)
link_name = await get_last_link_name_by_node_id(connection, db_ps_node.ps_id) link_name = await get_last_link_name_by_node_id(connection, db_ps_node.ps_id)
link_data = VorkNodeLinkData( link_data = VorkNodeLinkData(
parent_port_number=node_instance_validated.parent_port_number, parent_port_number=node_instance_validated.parent_port_number,
to_id=db_ps_node.id, to_id=db_ps_node.id,
from_id=node_instance_validated.parent_id, from_id=node_instance_validated.parent_id,
last_link_name=link_name, last_link_name=link_name)
)
link = VorkNodeLink(data=link_data.model_dump()) link = VorkNodeLink(data=link_data.model_dump())
validated_link = link.validate() validated_link = link.validate()
db_node_link = await create_node_link_schema(connection, validated_link, user_validation.id) db_node_link = await create_node_link_schema(connection, validated_link, user_validation.id)
links_settings = ProcessSchemaSettingsNodeLink( links_settings = ProcessSchemaSettingsNodeLink(
id=db_node_link.id, id=db_node_link.id,
link_name=db_node_link.link_name, link_name=db_node_link.link_name,
@@ -226,28 +97,21 @@ async def create_ps_node_endpoint(
node_type=db_ps_node.node_type, node_type=db_ps_node.node_type,
data=node_instance_validated.data.model_dump(), data=node_instance_validated.data.model_dump(),
from_node=None, from_node=None,
links=[{"links": links_settings.model_dump()}], links=links_settings.model_dump())
)
settings_dict = {"node": node_settings.model_dump(mode="json")}
settings_dict = {"node": node_settings.model_dump(mode='json')}
await update_process_schema_settings_by_id(connection, db_ps_node.ps_id, settings_dict) await update_process_schema_settings_by_id(connection, db_ps_node.ps_id, settings_dict)
except Exception as e:
raise create_server_error(
message="Failed to create node",
status_code=500,
details={"error": str(e)},
)
ps_node_front_response = Ps_NodeFrontResponse( ps_node_front_response = Ps_NodeFrontResponse(
description=node_descriptor.model_dump(), description=node_descriptor,
node=Ps_NodeFrontResponseNode( node=Ps_NodeFrontResponseNode(
id=db_ps_node.id, id=db_ps_node.id,
node_type=db_ps_node.node_type, node_type=db_ps_node.node_type,
data=to_camel_dict(node_instance_validated.data.model_dump()), data=to_camel_dict(node_instance_validated.data.model_dump())),
), link=links_settings.model_dump())
links=[{"links": links_settings.model_dump()}],
)
return ps_node_front_response return ps_node_front_response

View File

@@ -1,26 +0,0 @@
"""
Модуль для обработки ошибок API.
"""
from .error_model.error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType
from .error_handlers import (
handle_api_error,
create_server_error,
create_access_error,
create_operation_error,
create_validation_error,
)
__all__ = [
"ServerError",
"AccessError",
"OperationError",
"ValidationError",
"ErrorType",
"handle_api_error",
"create_server_error",
"create_access_error",
"create_operation_error",
"create_validation_error",
]

View File

@@ -1,54 +0,0 @@
"""
Обработчики ошибок для API.
"""
from typing import Optional, Dict, Any
from fastapi import HTTPException
from .error_model.error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType
def handle_api_error(
error_type: ErrorType, message: str, status_code: int, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
"""
Функция для создания HTTPException с правильной структурой ошибки.
"""
match error_type:
case ErrorType.SERVER:
error = ServerError(message=message, details=details)
case ErrorType.ACCESS:
error = AccessError(message=message, details=details)
case ErrorType.OPERATION:
error = OperationError(message=message, details=details)
case ErrorType.VALIDATION:
error = ValidationError(message=message, details=details)
case _:
error = ServerError(message=message, details=details)
return HTTPException(status_code=status_code, detail=error.model_dump(mode="json"))
def create_server_error(
message: str, status_code: int = 500, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
return handle_api_error(error_type=ErrorType.SERVER, message=message, status_code=status_code, details=details)
def create_access_error(
message: str, status_code: int = 403, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
return handle_api_error(error_type=ErrorType.ACCESS, message=message, status_code=status_code, details=details)
def create_operation_error(
message: str, status_code: int = 400, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
return handle_api_error(error_type=ErrorType.OPERATION, message=message, status_code=status_code, details=details)
def create_validation_error(
message: str, status_code: int = 422, details: Optional[Dict[str, Any]] = None
) -> HTTPException:
return handle_api_error(error_type=ErrorType.VALIDATION, message=message, status_code=status_code, details=details)

View File

@@ -1,7 +0,0 @@
"""
Модели ошибок для API.
"""
from .error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType
__all__ = ["ServerError", "AccessError", "OperationError", "ValidationError", "ErrorType"]

View File

@@ -1,61 +0,0 @@
"""
Типизированные модели ошибок для API.
"""
from enum import Enum
from typing import Optional, Dict, Any
from pydantic import BaseModel
class ErrorType(str, Enum):
"""
Типы ошибок API.
"""
SERVER = "SERVER"
ACCESS = "ACCESS"
OPERATION = "OPERATION"
VALIDATION = "VALIDATION"
class BaseError(BaseModel):
"""
Базовая модель ошибки.
"""
error_type: ErrorType
message: str
details: Optional[Dict[str, Any]] = None
class ServerError(BaseError):
"""
Критические серверные ошибки (БД, соединения и прочие неприятности).
"""
error_type: ErrorType = ErrorType.SERVER
class AccessError(BaseError):
"""
Ошибки доступа (несоответствие тенантности, ролям доступа).
"""
error_type: ErrorType = ErrorType.ACCESS
class OperationError(BaseError):
"""
Ошибки операции (несоответствие прохождению верификации, ошибки в датасете).
"""
error_type: ErrorType = ErrorType.OPERATION
class ValidationError(BaseError):
"""
Ошибки валидации (несоответствие первичной валидации).
"""
error_type: ErrorType = ErrorType.VALIDATION
field_errors: Optional[Dict[str, str]] = None

View File

@@ -10,7 +10,7 @@ from api.schemas.base import Base
class ProcessSchemaUpdate(Base): class ProcessSchemaUpdate(Base):
title: Optional[str] = Field(None, max_length=100) title: Optional[str] = Field(None, max_length=100)
description: Optional[str] = None description: Optional[str] = None
# owner_id: Optional[int] = None owner_id: Optional[int] = None
settings: Optional[Dict[str, Any]] = None settings: Optional[Dict[str, Any]] = None
status: Optional[ProcessStatus] = None status: Optional[ProcessStatus] = None

View File

@@ -1,5 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Optional, List from typing import Any, Dict, Optional
from orm.tables.process import ProcessStatus, NodeType from orm.tables.process import ProcessStatus, NodeType
from pydantic import Field from pydantic import Field
@@ -31,8 +31,8 @@ class ProcessSchemaSettingsNode(Base):
id: int id: int
node_type: NodeType node_type: NodeType
from_node: Optional[Dict[str, Any]] = None from_node: Optional[Dict[str, Any]] = None
data: Dict[str, Any] # Переименовано с 'from' на 'from_node' data: Dict[str, Any]# Переименовано с 'from' на 'from_node'
links: Optional[List[Dict[str, Any]]] = None links: Optional[ProcessSchemaSettingsNodeLink] = None
class ProcessSchemaResponse(Base): class ProcessSchemaResponse(Base):

View File

@@ -1,23 +1,16 @@
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Optional, List from typing import Any, Dict, Optional
from orm.tables.process import NodeStatus, NodeType from orm.tables.process import NodeStatus, NodeType
from pydantic import Field
from api.schemas.base import Base from api.schemas.base import Base
class Ps_NodeDeleteRequest(Base):
schema_id: int
node_id: int
port: str
next_node_id: int
class Ps_NodeRequest(Base): class Ps_NodeRequest(Base):
data: Dict[str, Any] data: Dict[str, Any]
links: Dict[str, Any] links: Dict[str, Any]
class Ps_Node(Base): class Ps_Node(Base):
id: int id: int
ps_id: int ps_id: int
@@ -28,6 +21,7 @@ class Ps_Node(Base):
status: NodeStatus status: NodeStatus
class Ps_NodeFrontResponseLink(Base): class Ps_NodeFrontResponseLink(Base):
id: int id: int
link_name: str link_name: str
@@ -39,10 +33,10 @@ class Ps_NodeFrontResponseLink(Base):
class Ps_NodeFrontResponseNode(Base): class Ps_NodeFrontResponseNode(Base):
id: int id: int
node_type: NodeType node_type: NodeType
data: Dict[str, Any] # Переименовано с 'from' на 'from_node' data: Dict[str, Any]# Переименовано с 'from' на 'from_node'
class Ps_NodeFrontResponse(Base): class Ps_NodeFrontResponse(Base):
description: Optional[Dict[str, Any]] = None description: Optional[Dict[str, Any]] = None
node: Optional[Ps_NodeFrontResponseNode] = None node: Optional[Ps_NodeFrontResponseNode] = None
links: Optional[List[Dict[str, Any]]] = None link: Optional[Ps_NodeFrontResponseLink] = None

View File

@@ -1,75 +1,32 @@
from fastapi import status from fastapi import (
HTTPException,
status,
)
from orm.tables.account import AccountRole from orm.tables.account import AccountRole
from api.db.logic.account import get_user_by_login from api.db.logic.account import get_user_by_login
from api.error import create_operation_error, create_access_error
class UserRoleValidator: async def db_user_role_validation(connection, current_user):
"""Валидатор ролей пользователей""" authorize_user = await get_user_by_login(connection, current_user)
def __init__(self, connection):
self.connection = connection
async def validate_admin(self, current_user: int):
"""Проверяет права администратора или владельца"""
try:
authorize_user = await get_user_by_login(self.connection, current_user)
except Exception as e:
raise create_operation_error(
message="User not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"user_id": current_user, "error": str(e)},
)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
raise create_access_error( raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions")
message="Insufficient permissions",
status_code=status.HTTP_403_FORBIDDEN,
details={
"user_id": current_user,
"user_role": authorize_user.role.value,
"required_roles": [AccountRole.OWNER.value, AccountRole.ADMIN.value],
},
)
return authorize_user return authorize_user
async def validate_ownership(self, current_user: int, resource_owner_id: int):
"""Проверяет владение ресурсом или права администратора"""
try:
authorize_user = await get_user_by_login(self.connection, current_user)
except Exception as e:
raise create_operation_error(
message="User not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"user_id": current_user, "error": str(e)},
)
async def db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, current_listevents_creator_id
):
authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
if authorize_user.id != resource_owner_id: if authorize_user.id != current_listevents_creator_id:
raise create_access_error( raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions")
message="Access denied",
status_code=status.HTTP_403_FORBIDDEN,
details={
"user_id": current_user,
"resource_owner_id": resource_owner_id,
"user_role": authorize_user.role.value,
"reason": "User is not the owner and does not have admin privileges",
},
)
return authorize_user return authorize_user
async def get_user(self, current_user: int):
"""Получает пользователя с админ-статусом"""
try:
authorize_user = await get_user_by_login(self.connection, current_user)
except Exception as e:
raise create_operation_error(
message="User not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"user_id": current_user, "error": str(e)},
)
is_admin = authorize_user.role in {AccountRole.OWNER, AccountRole.ADMIN} async def db_user_role_validation_for_list_events_and_process_schema(connection, current_user):
return authorize_user, is_admin authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
return authorize_user, False
else:
return authorize_user, True

View File

@@ -1,52 +0,0 @@
import json
from pathlib import Path
from typing import Dict
# Путь к файлу счётчика (в корне проекта)
COUNTER_FILE_PATH = Path(__file__).parent.parent.parent / "node_counter.json"
def get_node_counter() -> int:
"""
Открывает JSON файл и возвращает значение node_counter.
Если файл не существует, создаёт его со значением по умолчанию 0.
Returns:
int: Текущее значение счётчика узлов
"""
if not COUNTER_FILE_PATH.exists():
initial_data: Dict[str, int] = {"node_counter": 0}
with open(COUNTER_FILE_PATH, "w", encoding="utf-8") as f:
json.dump(initial_data, f, indent=2, ensure_ascii=False)
return 0
try:
with open(COUNTER_FILE_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("node_counter", 0)
except (json.JSONDecodeError, IOError):
initial_data = {"node_counter": 0}
with open(COUNTER_FILE_PATH, "w", encoding="utf-8") as f:
json.dump(initial_data, f, indent=2, ensure_ascii=False)
return 0
def increment_node_counter() -> int:
"""
Увеличивает значение node_counter на 1, сохраняет в файл и возвращает новое значение.
Returns:
int: Новое значение счётчика (старое значение + 1)
"""
current_value = get_node_counter()
new_value = current_value + 1
data: Dict[str, int] = {"node_counter": new_value}
with open(COUNTER_FILE_PATH, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return new_value

View File

@@ -1,6 +1,5 @@
from pydantic.alias_generators import to_camel from pydantic.alias_generators import to_camel
def to_camel_dict(obj): def to_camel_dict(obj):
if isinstance(obj, dict): if isinstance(obj, dict):
return {to_camel(key): to_camel_dict(value) for key, value in obj.items()} return {to_camel(key): to_camel_dict(value) for key, value in obj.items()}

2
api/poetry.lock generated
View File

@@ -2020,7 +2020,7 @@ sqlalchemy = "^2.0.43"
type = "git" type = "git"
url = "http://88.86.199.167:3000/Nox/CORE.git" url = "http://88.86.199.167:3000/Nox/CORE.git"
reference = "HEAD" reference = "HEAD"
resolved_reference = "43c139512928ab3a4767e771c8e41e39930599ad" resolved_reference = "b3896e8b5dbed2d609c8ac257419d5492c9e7b8d"
[[package]] [[package]]
name = "watchfiles" name = "watchfiles"

View File

@@ -18,7 +18,7 @@ dependencies = [
"python-multipart (>=0.0.20,<0.0.21)", "python-multipart (>=0.0.20,<0.0.21)",
"requests (>=2.31.0,<3.0.0)", "requests (>=2.31.0,<3.0.0)",
"fastapi-jwt-auth @ git+https://github.com/vvpreo/fastapi-jwt-auth", "fastapi-jwt-auth @ git+https://github.com/vvpreo/fastapi-jwt-auth",
"vork-core @ git+http://88.86.199.167:3000/Nox/CORE.git", "core-library @ git+https://gitea.heado.ru/Vorkout/core.git@VORKOUT-18",
] ]