Files
connect/api/api/db/logic/account.py

181 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection
from orm.tables.account import account_table
from api.schemas.account.account import User
from api.schemas.endpoints.account import all_user_adapter, AllUser, AllUserResponse, UserCreate, UserFilterDTO
async def get_user_account_page_DTO(
connection: AsyncConnection, filter_dto: UserFilterDTO
) -> Optional[AllUserResponse]:
"""
Получает список пользователей с пагинацией, фильтрацией и сортировкой через DTO объект.
Поддерживает:
- пагинацию
- поиск
- фильтрацию по полям
- сортировку
"""
page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit
query = select(
account_table.c.id,
account_table.c.name,
account_table.c.login,
account_table.c.email,
account_table.c.bind_tenant_id,
account_table.c.role,
account_table.c.meta,
account_table.c.creator_id,
account_table.c.created_at,
account_table.c.status,
)
# Поиск
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
query = query.where(
or_(
account_table.c.name.ilike(search_term),
account_table.c.login.ilike(search_term),
account_table.c.email.ilike(search_term),
)
)
# Фильтрацию
filter_conditions = []
if filter_dto.filters:
for field, values in filter_dto.filters.items():
column = getattr(account_table.c, field, None)
if column is not None and values:
if len(values) == 1:
filter_conditions.append(column == values[0])
else:
filter_conditions.append(column.in_(values))
if filter_conditions:
query = query.where(and_(*filter_conditions))
# Сортировка
if filter_dto.order:
order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get("direction", "asc")
column = getattr(account_table.c, order_field, None)
if column is not None:
if order_direction.lower() == "desc":
query = query.order_by(desc(column))
else:
query = query.order_by(asc(column))
else:
query = query.order_by(account_table.c.id)
query = query.offset(offset).limit(limit)
count_query = select(func.count()).select_from(account_table)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
count_query = count_query.where(
or_(
account_table.c.name.ilike(search_term),
account_table.c.login.ilike(search_term),
account_table.c.email.ilike(search_term),
)
)
if filter_conditions:
count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query)
count_result = await connection.execute(count_query)
users_data = result.mappings().all()
total_count = count_result.scalar()
if not total_count:
return None
total_pages = math.ceil(total_count / limit)
validated_users = all_user_adapter.validate_python(users_data)
return AllUserResponse(
users=validated_users,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_user_by_id(connection: AsyncConnection, user_id: int) -> Optional[AllUser]:
"""
Получает юзера по id.
"""
query = select(account_table).where(account_table.c.id == user_id)
user_db_cursor = await connection.execute(query)
user = user_db_cursor.mappings().one_or_none()
if not user:
return None
return User.model_validate(user)
async def get_user_by_login(connection: AsyncConnection, login: str) -> Optional[User]:
"""
Получает юзера по login.
"""
query = select(account_table).where(account_table.c.login == login)
user_db_cursor = await connection.execute(query)
user_data = user_db_cursor.mappings().one_or_none()
if not user_data:
return None
return User.model_validate(user_data)
async def update_user_by_id(connection: AsyncConnection, update_values, user) -> Optional[User]:
"""
Вносит изменеия в нужное поле таблицы account_table.
"""
await connection.execute(account_table.update().where(account_table.c.id == user.id).values(**update_values))
await connection.commit()
async def create_user(connection: AsyncConnection, user: UserCreate, creator_id: int) -> Optional[AllUser]:
"""
Создает нове поле в таблице account_table.
"""
query = insert(account_table).values(
name=user.name,
login=user.login,
email=user.email,
bind_tenant_id=user.bind_tenant_id,
role=user.role.value,
meta=user.meta or {},
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
status=user.status.value,
)
res = await connection.execute(query)
await connection.commit()
new_user = await get_user_by_id(connection, res.lastrowid)
return new_user