Files
connect/api/api/db/logic/list_events.py
2025-11-15 16:00:05 +05:00

186 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Optional
import math
from datetime import datetime, timezone
from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection
from orm.tables.events import list_events_table
from api.schemas.events.list_events import ListEvent
from api.schemas.endpoints.list_events import all_list_event_adapter, AllListEventResponse, ListEventFilterDTO
async def get_list_events_page_DTO(
connection: AsyncConnection, filter_dto: ListEventFilterDTO
) -> Optional[AllListEventResponse]:
"""
Получает список событий с фильтрацией через DTO объект.
Поддерживает:
- пагинацию
- полнотекстовый поиск (пропускает name при русских буквах)
- фильтрацию по полям
- сортировку
"""
page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit
query = select(
list_events_table.c.id,
list_events_table.c.name,
list_events_table.c.title,
list_events_table.c.creator_id,
list_events_table.c.created_at,
list_events_table.c.schema.label("schema_"),
list_events_table.c.state,
list_events_table.c.status,
)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
has_russian = any("\u0400" <= char <= "\u04ff" for char in filter_dto.search)
if has_russian:
query = query.where(list_events_table.c.title.ilike(search_term))
else:
query = query.where(
or_(list_events_table.c.title.ilike(search_term), list_events_table.c.name.ilike(search_term))
)
filter_conditions = []
if filter_dto.filters:
for field, values in filter_dto.filters.items():
column = getattr(list_events_table.c, field, None)
if column is not None and values:
if len(values) == 1:
filter_conditions.append(column == values[0])
else:
filter_conditions.append(column.in_(values))
if filter_conditions:
query = query.where(and_(*filter_conditions))
if filter_dto.order:
order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get("direction", "asc")
if order_field.startswith("schema."):
json_field = order_field[7:]
column = list_events_table.c.schema[json_field].astext
else:
column = getattr(list_events_table.c, order_field, None)
if column is not None:
if order_direction.lower() == "desc":
query = query.order_by(desc(column))
else:
query = query.order_by(asc(column))
else:
query = query.order_by(list_events_table.c.id)
query = query.offset(offset).limit(limit)
count_query = select(func.count()).select_from(list_events_table)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
has_russian = any("\u0400" <= char <= "\u04ff" for char in filter_dto.search)
if has_russian:
count_query = count_query.where(list_events_table.c.title.ilike(search_term))
else:
count_query = count_query.where(
or_(list_events_table.c.title.ilike(search_term), list_events_table.c.name.ilike(search_term))
)
if filter_conditions:
count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
if not total_count:
return None
total_pages = math.ceil(total_count / limit)
validated_events = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(
list_event=validated_events,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_list_events_by_name(connection: AsyncConnection, name: str) -> Optional[ListEvent]:
"""
Получает list events по name.
"""
query = select(list_events_table).where(list_events_table.c.name == name)
list_events_db_cursor = await connection.execute(query)
list_events_data = list_events_db_cursor.mappings().one_or_none()
if not list_events_data:
return None
return ListEvent.model_validate(list_events_data)
async def get_list_events_by_id(connection: AsyncConnection, id: int) -> Optional[ListEvent]:
"""
Получает listevent по id.
"""
query = select(list_events_table).where(list_events_table.c.id == id)
list_events_db_cursor = await connection.execute(query)
list_events_data = list_events_db_cursor.mappings().one_or_none()
if not list_events_data:
return None
return ListEvent.model_validate(list_events_data)
async def update_list_events_by_id(connection: AsyncConnection, update_values, list_events):
"""
Вносит изменеия в нужное поле таблицы list_events_table.
"""
await connection.execute(
list_events_table.update().where(list_events_table.c.id == list_events.id).values(**update_values)
)
await connection.commit()
async def create_list_events(
connection: AsyncConnection, list_events: ListEvent, creator_id: int
) -> Optional[ListEvent]:
"""
Создает нове поле в таблице list_events_table.
"""
query = insert(list_events_table).values(
name=list_events.name,
title=list_events.title, # добавлено поле title
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
schema=list_events.schema_, # добавлено поле schema
state=list_events.state.value, # добавлено поле state
status=list_events.status.value, # добавлено поле status
)
await connection.execute(query)
await connection.commit()
return list_events