connect/api/api/db/logic/account.py

127 lines
3.7 KiB
Python

import math
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from sqlalchemy import func, insert, select
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.tables.account import account_table
from api.schemas.account.account import User
from api.schemas.endpoints.account import all_user_adapter, AllUser, AllUserResponse, UserCreate
async def get_user_accaunt_page(connection: AsyncConnection, page, limit) -> Optional[AllUserResponse]:
"""
Получает список ползовелей заданных значениями page, limit.
"""
first_user = page * limit - (limit)
query = (
select(
account_table.c.id,
account_table.c.name,
account_table.c.login,
account_table.c.email,
account_table.c.bind_tenant_id,
account_table.c.role,
account_table.c.created_at,
account_table.c.status,
)
.order_by(account_table.c.id)
.offset(first_user)
.limit(limit)
)
count_query = select(func.count()).select_from(account_table)
result = await connection.execute(query)
count_result = await connection.execute(count_query)
users_data = result.mappings().all()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
validated_users = all_user_adapter.validate_python(users_data)
return AllUserResponse(
users=validated_users,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_user_by_id(connection: AsyncConnection, user_id: int) -> Optional[AllUser]:
"""
Получает юзера по id.
"""
query = select(account_table).where(account_table.c.id == user_id)
user_db_cursor = await connection.execute(query)
user = user_db_cursor.mappings().one_or_none()
if not user:
return None
return AllUser.model_validate(user)
async def get_user_by_login(connection: AsyncConnection, login: str) -> Optional[User]:
"""
Получает юзера по login.
"""
query = select(account_table).where(account_table.c.login == login)
user_db_cursor = await connection.execute(query)
user_db = user_db_cursor.one_or_none()
if not user_db:
return None
user_data = {
column.name: (
getattr(user_db, column.name).name
if isinstance(getattr(user_db, column.name), Enum)
else getattr(user_db, column.name)
)
for column in account_table.columns
}
return User.model_validate(user_data)
async def update_user_by_id(connection: AsyncConnection, update_values, user) -> Optional[User]:
"""
Вносит изменеия в нужное поле таблицы account_table.
"""
await connection.execute(account_table.update().where(account_table.c.id == user.id).values(**update_values))
await connection.commit()
async def create_user(connection: AsyncConnection, user: UserCreate, creator_id: int) -> Optional[AllUser]:
"""
Создает нове поле в таблице account_table.
"""
query = insert(account_table).values(
name=user.name,
login=user.login,
email=user.email,
bind_tenant_id=user.bind_tenant_id,
role=user.role.value,
meta=user.meta or {},
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
status=user.status.value,
)
res = await connection.execute(query)
await connection.commit()
new_user = await get_user_by_id(connection, res.lastrowid)
return new_user