fix: rename list events
This commit is contained in:
279
api/api/db/logic/list_events.py
Normal file
279
api/api/db/logic/list_events.py
Normal file
@@ -0,0 +1,279 @@
|
||||
from typing import Optional
|
||||
import math
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
from sqlalchemy import insert, select, func, or_, and_, asc, desc
|
||||
from sqlalchemy.ext.asyncio import AsyncConnection
|
||||
|
||||
from api.db.tables.events import list_events_table
|
||||
|
||||
from api.schemas.events.list_events import ListEvent
|
||||
|
||||
from api.schemas.endpoints.list_events import all_list_event_adapter, AllListEventResponse, ListEventFilterDTO
|
||||
|
||||
|
||||
async def get_list_events_page_DTO(
|
||||
connection: AsyncConnection, filter_dto: ListEventFilterDTO
|
||||
) -> Optional[AllListEventResponse]:
|
||||
"""
|
||||
Получает список событий с фильтрацией через DTO объект.
|
||||
Поддерживает:
|
||||
- пагинацию
|
||||
- полнотекстовый поиск (пропускает name при русских буквах)
|
||||
- фильтрацию по полям
|
||||
- сортировку
|
||||
"""
|
||||
page = filter_dto.pagination.get("page", 1)
|
||||
limit = filter_dto.pagination.get("limit", 10)
|
||||
offset = (page - 1) * limit
|
||||
|
||||
query = select(
|
||||
list_events_table.c.id,
|
||||
list_events_table.c.name,
|
||||
list_events_table.c.title,
|
||||
list_events_table.c.creator_id,
|
||||
list_events_table.c.created_at,
|
||||
list_events_table.c.schema.label("schema_"),
|
||||
list_events_table.c.state,
|
||||
list_events_table.c.status,
|
||||
)
|
||||
|
||||
if filter_dto.search:
|
||||
search_term = f"%{filter_dto.search}%"
|
||||
has_russian = any("\u0400" <= char <= "\u04ff" for char in filter_dto.search)
|
||||
|
||||
if has_russian:
|
||||
query = query.where(list_events_table.c.title.ilike(search_term))
|
||||
else:
|
||||
query = query.where(
|
||||
or_(list_events_table.c.title.ilike(search_term), list_events_table.c.name.ilike(search_term))
|
||||
)
|
||||
|
||||
filter_conditions = []
|
||||
if filter_dto.filters:
|
||||
for field, values in filter_dto.filters.items():
|
||||
column = getattr(list_events_table.c, field, None)
|
||||
if column is not None and values:
|
||||
if len(values) == 1:
|
||||
filter_conditions.append(column == values[0])
|
||||
else:
|
||||
filter_conditions.append(column.in_(values))
|
||||
|
||||
if filter_conditions:
|
||||
query = query.where(and_(*filter_conditions))
|
||||
|
||||
if filter_dto.order:
|
||||
order_field = filter_dto.order.get("field", "id")
|
||||
order_direction = filter_dto.order.get("direction", "asc")
|
||||
|
||||
if order_field.startswith("schema."):
|
||||
json_field = order_field[7:]
|
||||
column = list_events_table.c.schema[json_field].astext
|
||||
else:
|
||||
column = getattr(list_events_table.c, order_field, None)
|
||||
|
||||
if column is not None:
|
||||
if order_direction.lower() == "desc":
|
||||
query = query.order_by(desc(column))
|
||||
else:
|
||||
query = query.order_by(asc(column))
|
||||
else:
|
||||
query = query.order_by(list_events_table.c.id)
|
||||
|
||||
query = query.offset(offset).limit(limit)
|
||||
|
||||
count_query = select(func.count()).select_from(list_events_table)
|
||||
|
||||
if filter_dto.search:
|
||||
search_term = f"%{filter_dto.search}%"
|
||||
has_russian = any("\u0400" <= char <= "\u04ff" for char in filter_dto.search)
|
||||
|
||||
if has_russian:
|
||||
count_query = count_query.where(list_events_table.c.title.ilike(search_term))
|
||||
else:
|
||||
count_query = count_query.where(
|
||||
or_(list_events_table.c.title.ilike(search_term), list_events_table.c.name.ilike(search_term))
|
||||
)
|
||||
|
||||
if filter_conditions:
|
||||
count_query = count_query.where(and_(*filter_conditions))
|
||||
|
||||
result = await connection.execute(query)
|
||||
count_result = await connection.execute(count_query)
|
||||
|
||||
events_data = result.mappings().all()
|
||||
total_count = count_result.scalar()
|
||||
|
||||
if not total_count:
|
||||
return None
|
||||
|
||||
total_pages = math.ceil(total_count / limit)
|
||||
validated_events = all_list_event_adapter.validate_python(events_data)
|
||||
|
||||
return AllListEventResponse(
|
||||
list_event=validated_events,
|
||||
amount_count=total_count,
|
||||
amount_pages=total_pages,
|
||||
current_page=page,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
|
||||
async def get_list_events_page_by_creator_id(
|
||||
connection: AsyncConnection, creator_id: int, page: int, limit: int
|
||||
) -> Optional[AllListEventResponse]:
|
||||
"""
|
||||
Получает список событий заданного создателя по значениям page и limit и creator_id.
|
||||
"""
|
||||
|
||||
first_event = page * limit - limit
|
||||
query = (
|
||||
select(
|
||||
list_events_table.c.id,
|
||||
list_events_table.c.name,
|
||||
list_events_table.c.title,
|
||||
list_events_table.c.creator_id,
|
||||
list_events_table.c.created_at,
|
||||
list_events_table.c.schema,
|
||||
list_events_table.c.state,
|
||||
list_events_table.c.status,
|
||||
)
|
||||
.where(list_events_table.c.creator_id == creator_id) # Фильтрация по creator_id
|
||||
.order_by(list_events_table.c.id)
|
||||
.offset(first_event)
|
||||
.limit(limit)
|
||||
)
|
||||
|
||||
count_query = (
|
||||
select(func.count())
|
||||
.select_from(list_events_table)
|
||||
.where(list_events_table.c.creator_id == creator_id) # Фильтрация по creator_id
|
||||
)
|
||||
|
||||
result = await connection.execute(query)
|
||||
count_result = await connection.execute(count_query)
|
||||
|
||||
events_data = result.mappings().all()
|
||||
total_count = count_result.scalar()
|
||||
total_pages = math.ceil(total_count / limit)
|
||||
|
||||
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
|
||||
validated_list_event = all_list_event_adapter.validate_python(events_data)
|
||||
|
||||
return AllListEventResponse(
|
||||
list_event=validated_list_event,
|
||||
amount_count=total_count,
|
||||
amount_pages=total_pages,
|
||||
current_page=page,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
|
||||
async def get_list_events_page(connection: AsyncConnection, page, limit) -> Optional[AllListEventResponse]:
|
||||
"""
|
||||
Получает список событий заданного создателя по значениям page и limit.
|
||||
"""
|
||||
|
||||
first_event = page * limit - (limit)
|
||||
|
||||
query = (
|
||||
select(
|
||||
list_events_table.c.id,
|
||||
list_events_table.c.name,
|
||||
list_events_table.c.title,
|
||||
list_events_table.c.creator_id,
|
||||
list_events_table.c.created_at,
|
||||
list_events_table.c.schema,
|
||||
list_events_table.c.state,
|
||||
list_events_table.c.status,
|
||||
)
|
||||
.order_by(list_events_table.c.id)
|
||||
.offset(first_event)
|
||||
.limit(limit)
|
||||
)
|
||||
|
||||
count_query = select(func.count()).select_from(list_events_table)
|
||||
|
||||
result = await connection.execute(query)
|
||||
count_result = await connection.execute(count_query)
|
||||
|
||||
events_data = result.mappings().all()
|
||||
total_count = count_result.scalar()
|
||||
total_pages = math.ceil(total_count / limit)
|
||||
|
||||
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
|
||||
validated_list_event = all_list_event_adapter.validate_python(events_data)
|
||||
|
||||
return AllListEventResponse(
|
||||
list_event=validated_list_event,
|
||||
amount_count=total_count,
|
||||
amount_pages=total_pages,
|
||||
current_page=page,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
|
||||
async def get_list_events_by_name(connection: AsyncConnection, name: str) -> Optional[ListEvent]:
|
||||
"""
|
||||
Получает list events по name.
|
||||
"""
|
||||
query = select(list_events_table).where(list_events_table.c.name == name)
|
||||
|
||||
list_events_db_cursor = await connection.execute(query)
|
||||
|
||||
list_events_data = list_events_db_cursor.mappings().one_or_none()
|
||||
if not list_events_data:
|
||||
return None
|
||||
|
||||
return ListEvent.model_validate(list_events_data)
|
||||
|
||||
|
||||
async def get_list_events_by_id(connection: AsyncConnection, id: int) -> Optional[ListEvent]:
|
||||
"""
|
||||
Получает listevent по id.
|
||||
"""
|
||||
query = select(list_events_table).where(list_events_table.c.id == id)
|
||||
|
||||
list_events_db_cursor = await connection.execute(query)
|
||||
|
||||
list_events_data = list_events_db_cursor.mappings().one_or_none()
|
||||
if not list_events_data:
|
||||
return None
|
||||
|
||||
return ListEvent.model_validate(list_events_data)
|
||||
|
||||
|
||||
async def update_list_events_by_id(connection: AsyncConnection, update_values, list_events):
|
||||
"""
|
||||
Вносит изменеия в нужное поле таблицы list_events_table.
|
||||
"""
|
||||
await connection.execute(
|
||||
list_events_table.update().where(list_events_table.c.id == list_events.id).values(**update_values)
|
||||
)
|
||||
|
||||
await connection.commit()
|
||||
|
||||
|
||||
async def create_list_events(
|
||||
connection: AsyncConnection, list_events: ListEvent, creator_id: int
|
||||
) -> Optional[ListEvent]:
|
||||
"""
|
||||
Создает нове поле в таблице list_events_table.
|
||||
"""
|
||||
query = insert(list_events_table).values(
|
||||
name=list_events.name,
|
||||
title=list_events.title, # добавлено поле title
|
||||
creator_id=creator_id,
|
||||
created_at=datetime.now(timezone.utc),
|
||||
schema=list_events.schema_, # добавлено поле schema
|
||||
state=list_events.state.value, # добавлено поле state
|
||||
status=list_events.status.value, # добавлено поле status
|
||||
)
|
||||
|
||||
await connection.execute(query)
|
||||
|
||||
await connection.commit()
|
||||
|
||||
return list_events
|
||||
Reference in New Issue
Block a user