feat: new ps node endpoints

This commit is contained in:
TheNoxium
2025-12-02 12:17:27 +05:30
parent fe7f8c6511
commit 362ff7babe
5 changed files with 214 additions and 17 deletions

View File

@@ -88,20 +88,12 @@ async def create_ps_node_schema(
return await get_last_ps_node_by_creator_and_ps_id(connection, creator_id, validated_schema.ps_id) return await get_last_ps_node_by_creator_and_ps_id(connection, creator_id, validated_schema.ps_id)
async def check_node_connection(connection: AsyncConnection, node_id: int, next_node_id: int, port: int) -> bool: async def update_ps_node_by_id(connection: AsyncConnection, update_values: dict, ps_node: Ps_Node) -> None:
""" """
Проверяет, подключен ли next_node_id к node_id через указанный порт. Обновляет данные PS ноды в таблице ps_node_table.
""" """
query = select(node_link_table).where( await connection.execute(update(ps_node_table).where(ps_node_table.c.id == ps_node.id).values(**update_values))
and_( await connection.commit()
node_link_table.c.node_id == node_id,
node_link_table.c.next_node_id == next_node_id,
node_link_table.c.link_point_id == port,
)
)
result = await connection.execute(query)
return result.mappings().first() is not None
async def get_nodes_for_deletion_ordered(connection: AsyncConnection, node_id: int) -> list[int]: async def get_nodes_for_deletion_ordered(connection: AsyncConnection, node_id: int) -> list[int]:

View File

@@ -5,9 +5,15 @@ from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep from api.db.connection.session import get_connection_dep
from api.db.logic.account import get_user_by_login from api.db.logic.account import get_user_by_login
from api.schemas.base import bearer_schema from api.schemas.base import bearer_schema
from api.schemas.process.ps_node import Ps_NodeFrontResponse, Ps_NodeRequest, Ps_NodeDeleteRequest from api.schemas.process.ps_node import (
Ps_NodeFrontResponse,
Ps_NodeRequest,
Ps_NodeDeleteRequest,
AllAvailableNodesResponse,
Ps_NodeUpdate,
)
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.db.logic.ps_node import get_ps_node_by_id, check_node_connection from api.db.logic.ps_node import get_ps_node_by_id
from api.db.logic.process_schema import get_process_schema_by_id from api.db.logic.process_schema import get_process_schema_by_id
from api.services.user_role_validation import UserRoleValidator from api.services.user_role_validation import UserRoleValidator
from core import VorkNodeRegistry from core import VorkNodeRegistry
@@ -149,3 +155,140 @@ async def create_ps_node_endpoint(
) )
return ps_node_front_response return ps_node_front_response
@api_router.get("/available", dependencies=[Depends(bearer_schema)], response_model=AllAvailableNodesResponse)
async def get_all_available_nodes_endpoint(
current_user=Depends(get_current_user),
):
registry = VorkNodeRegistry()
all_nodes = registry.get_all()
node_ids = list(all_nodes.keys())
return AllAvailableNodesResponse(nodes=node_ids)
@api_router.get("/{node_id}", dependencies=[Depends(bearer_schema)], response_model=Ps_NodeFrontResponse)
async def get_ps_node_endpoint(
node_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
service = PsNodeService(connection)
ps_node_response = await service.get(node_id)
if ps_node_response is None:
raise create_operation_error(
message="PS node not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"node_id": node_id},
)
process_schema = await get_process_schema_by_id(connection, ps_node_response.node["ps_id"])
if process_schema is None:
raise create_operation_error(
message="Process schema not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"schema_id": ps_node_response.node["ps_id"]},
)
validator = UserRoleValidator(connection)
try:
await validator.validate_ownership(current_user, process_schema.creator_id)
except Exception as e:
raise create_access_error(
message="Access denied",
status_code=status.HTTP_403_FORBIDDEN,
details={"user_id": current_user, "schema_creator_id": process_schema.creator_id, "reason": str(e)},
)
return ps_node_response
@api_router.put("", dependencies=[Depends(bearer_schema)], response_model=Ps_NodeFrontResponse)
async def update_ps_node_endpoint(
ps_node_update: Ps_NodeUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
"""
Обновляет settings PS ноды.
"""
process_schema = await get_process_schema_by_id(connection, ps_node_update.schema_id)
if process_schema is None:
raise create_operation_error(
message="Process schema not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"schema_id": ps_node_update.schema_id},
)
validator = UserRoleValidator(connection)
try:
await validator.validate_ownership(current_user, process_schema.creator_id)
except Exception as e:
raise create_access_error(
message="Access denied",
status_code=status.HTTP_403_FORBIDDEN,
details={"user_id": current_user, "schema_creator_id": process_schema.creator_id, "reason": str(e)},
)
ps_node = await get_ps_node_by_id(connection, ps_node_update.node_id)
if ps_node is None:
raise create_operation_error(
message="PS node not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"node_id": ps_node_update.node_id},
)
if ps_node.ps_id != ps_node_update.schema_id:
raise create_validation_error(
message="PS node does not belong to the specified schema",
status_code=status.HTTP_400_BAD_REQUEST,
details={
"node_id": ps_node_update.node_id,
"expected_schema_id": ps_node_update.schema_id,
"actual_schema_id": ps_node.ps_id,
},
)
# Получаем дескриптор формы для проверки constraint
registry = VorkNodeRegistry()
node_type_str = ps_node.node_type.value if hasattr(ps_node.node_type, "value") else str(ps_node.node_type)
vork_node = registry.get(node_type_str)
if vork_node is None:
raise create_operation_error(
message="Node type not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"node_type": node_type_str},
)
form_descriptor = vork_node.form()
if hasattr(form_descriptor, "model_dump"):
form_descriptor_dict = form_descriptor.model_dump()
else:
form_descriptor_dict = form_descriptor
service = PsNodeService(connection)
constraint_valid = service.check_constraint(form_descriptor_dict, ps_node_update.data)
if not constraint_valid:
raise create_validation_error(
message="Constraint validation failed",
status_code=status.HTTP_400_BAD_REQUEST,
details={"node_id": ps_node_update.node_id},
)
try:
updated_ps_node = await service.update(ps_node_update.node_id, ps_node_update.data, ps_node)
except Exception as e:
raise create_server_error(
message="Failed to update node",
status_code=500,
details={"error": str(e), "node_id": ps_node_update.node_id},
)
return updated_ps_node

View File

@@ -28,6 +28,16 @@ class Ps_Node(Base):
status: NodeStatus status: NodeStatus
class Ps_NodeUpdate(Base):
schema_id: int
node_id: int
data: dict[str, Any]
class Ps_NodeFrontResponse(Base): class Ps_NodeFrontResponse(Base):
node: dict[str, Any] | None = None node: dict[str, Any] | None = None
link: list[dict[str, Any]] | None = None link: list[dict[str, Any]] | None = None
class AllAvailableNodesResponse(Base):
nodes: list[str]

View File

@@ -78,7 +78,7 @@ class ProcessSchemaService:
process_schema_new = await get_process_schema_by_id(self.connection, node_id) process_schema_new = await get_process_schema_by_id(self.connection, node_id)
start_node_data = ListenNodeData(ps_id=process_schema_new.id, node_type=NodeType.START.value, is_start="True") start_node_data = ListenNodeData(ps_id=process_schema_new.id, node_type=NodeType.LISTEN.value, is_start="True")
start_node_links = {} start_node_links = {}

View File

@@ -5,9 +5,15 @@ from api.db.logic.ps_node import (
get_nodes_for_deletion_ordered, get_nodes_for_deletion_ordered,
delete_ps_nodes_delete_handler, delete_ps_nodes_delete_handler,
create_ps_node_schema, create_ps_node_schema,
get_ps_node_by_id,
) )
from api.db.logic.node_link import get_last_link_name_by_node_id, create_node_link_schema from api.db.logic.node_link import (
from api.schemas.process.ps_node import Ps_NodeFrontResponse get_last_link_name_by_node_id,
create_node_link_schema,
get_all_node_links_by_next_node_ids,
)
from api.db.logic.ps_node import update_ps_node_by_id
from api.schemas.process.ps_node import Ps_NodeFrontResponse, Ps_Node
from core import VorkNodeRegistry, VorkNodeLink from core import VorkNodeRegistry, VorkNodeLink
from model_nodes import VorkNodeLinkData from model_nodes import VorkNodeLinkData
@@ -63,3 +69,49 @@ class PsNodeService:
) )
return ps_node_front_response return ps_node_front_response
async def get(self, node_id: int) -> Ps_NodeFrontResponse | None:
"""
Получает конкретную ноду по ID с её вышестоящими линками.
"""
ps_node = await get_ps_node_by_id(self.connection, node_id)
if ps_node is None:
return None
node_links = await get_all_node_links_by_next_node_ids(self.connection, [node_id])
links_list = [{"link": link.model_dump()} for link in node_links]
return Ps_NodeFrontResponse(
node=ps_node.model_dump(),
link=links_list,
)
async def update(self, node_id: int, update_data: dict[str, Any], ps_node: Ps_Node) -> Ps_NodeFrontResponse:
"""
Обновляет данные PS ноды (settings).
"""
await update_ps_node_by_id(self.connection, {"settings": update_data}, ps_node)
node_links = await get_all_node_links_by_next_node_ids(self.connection, [node_id])
links_list = [{"link": link.model_dump()} for link in node_links]
updated_node_dict = ps_node.model_dump()
updated_node_dict["settings"] = update_data
return Ps_NodeFrontResponse(
node=updated_node_dict,
link=links_list,
)
def check_constraint(self, form_descriptor: dict[str, Any], settings_data: dict[str, Any]) -> bool:
"""
Валидирует значения инпутов по constraint .
На данный момент это заглушка, всегда возвращающая True,
так как в дескрипторе пока нет constraint.
"""
return True