Files
connect/api/api/db/logic/process_schema.py
2025-10-22 16:01:22 +05:00

209 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Optional, Dict, Any
import math
from datetime import datetime, timezone
from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection
from orm.tables.process import process_schema_table, ProcessStatus
from api.schemas.process.process_schema import ProcessSchema
from api.schemas.endpoints.process_schema import (
all_process_schema_adapter,
AllProcessSchemaResponse,
ProcessSchemaFilterDTO,
)
async def get_process_schema_page_DTO(
connection: AsyncConnection, filter_dto: ProcessSchemaFilterDTO
) -> Optional[AllProcessSchemaResponse]:
"""
Получает список схем процессов с комплексной фильтрацией через DTO объект.
Поддерживает:
- пагинацию
- поиск
- фильтрацию по полям
- сортировку
"""
page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit
query = select(
process_schema_table.c.id,
process_schema_table.c.title,
process_schema_table.c.description,
process_schema_table.c.owner_id,
process_schema_table.c.creator_id,
process_schema_table.c.created_at,
process_schema_table.c.settings,
process_schema_table.c.status,
)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
query = query.where(
or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term))
)
filter_conditions = []
if filter_dto.filters:
for field, values in filter_dto.filters.items():
column = getattr(process_schema_table.c, field, None)
if column is not None and values:
if len(values) == 1:
filter_conditions.append(column == values[0])
else:
filter_conditions.append(column.in_(values))
if filter_dto.filters is None or "status" not in filter_dto.filters:
filter_conditions.append(process_schema_table.c.status != "DELETED")
if filter_conditions:
query = query.where(and_(*filter_conditions))
if filter_dto.order:
order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get("direction", "asc")
column = getattr(process_schema_table.c, order_field, None)
if column is not None:
if order_direction.lower() == "desc":
query = query.order_by(desc(column))
else:
query = query.order_by(asc(column))
else:
query = query.order_by(process_schema_table.c.id)
query = query.offset(offset).limit(limit)
count_query = select(func.count()).select_from(process_schema_table)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
count_query = count_query.where(
or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term))
)
if filter_conditions:
count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
if not total_count:
return None
total_pages = math.ceil(total_count / limit)
validated_process_schema = all_process_schema_adapter.validate_python(events_data)
return AllProcessSchemaResponse(
process_schema=validated_process_schema,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_process_schema_by_id(connection: AsyncConnection, id: int) -> Optional[ProcessSchema]:
"""
Получает process_schema по id.
"""
query = select(process_schema_table).where(process_schema_table.c.id == id)
process_schema_db_cursor = await connection.execute(query)
process_schema_data = process_schema_db_cursor.mappings().one_or_none()
if not process_schema_data:
return None
return ProcessSchema.model_validate(process_schema_data)
async def update_process_schema_by_id(connection: AsyncConnection, update_values, process_schema):
"""
Вносит изменеия в нужное поле таблицы process_schema_table.
"""
await connection.execute(
process_schema_table.update().where(process_schema_table.c.id == process_schema.id).values(**update_values)
)
await connection.commit()
async def update_process_schema_settings_by_id(
connection: AsyncConnection, process_schema_id: int, node_data: Dict[str, Any]
):
"""
Добавляет новый узел в массив 'nodes' в настройках процесса.
Если массив 'nodes' не существует, создает его.
"""
# Получаем текущие settings
query = select(process_schema_table.c.settings).where(process_schema_table.c.id == process_schema_id)
result = await connection.execute(query)
current_settings = result.scalar_one_or_none()
# Если settings пустые, создаем пустой словарь
if current_settings is None:
current_settings = {}
# Инициализируем массив nodes, если его нет
if "nodes" not in current_settings:
current_settings["nodes"] = []
# Добавляем новый узел в массив
current_settings["nodes"].append(node_data)
# Обновляем поле settings
await connection.execute(
process_schema_table.update()
.where(process_schema_table.c.id == process_schema_id)
.values(settings=current_settings)
)
await connection.commit()
async def get_last_created_process_schema(connection: AsyncConnection) -> Optional[int]:
"""
Получает ID последней созданной схемы процесса.
"""
query = select(process_schema_table.c.id).order_by(desc(process_schema_table.c.id)).limit(1)
result = await connection.execute(query)
last_id = result.scalar_one_or_none()
return last_id
async def create_process_schema(
connection: AsyncConnection, creator_id: int, title: str, description: str
) -> Optional[int]:
"""
Создает новое поле в таблице process_schema_table.
"""
query = insert(process_schema_table).values(
title=title,
description=description,
owner_id=creator_id,
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
settings={},
status=ProcessStatus.ACTIVE.value,
)
result = await connection.execute(query)
await connection.commit()
return result.lastrowid