connect/api/api/db/logic/account.py
2025-05-30 08:51:49 +05:00

132 lines
3.5 KiB
Python

from typing import Optional
import math
from datetime import datetime, timezone
from sqlalchemy import insert, select, func
from sqlalchemy.ext.asyncio import AsyncConnection
from enum import Enum
from api.db.tables.account import account_table
from api.schemas.account.account import User
from api.schemas.endpoints.account import AllUserResponse, all_user_adapter
async def get_all_users_login_and_id(connection: AsyncConnection, page, limit) -> Optional[User]:
"""
Получает id и login всех юзеров
"""
first_user = page*limit-(limit)
query = (
select(account_table.c.id, account_table.c.login)
.order_by(account_table.c.id)
.offset(first_user)
.limit(limit)
)
count_query = select(func.count()).select_from(account_table)
result = await connection.execute(query)
count_result = await connection.execute(count_query)
users_data = result.fetchall()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
validated_users = all_user_adapter.validate_python(
[{"id": u.id, "login": u.login} for u in users_data]
)
return AllUserResponse(
users=validated_users,
amount_count=total_count,
amount_pages=total_pages
)
async def get_user_id(connection: AsyncConnection, id: int) -> Optional[User]:
"""
Получает юзера по id.
"""
query = (
select(account_table)
.where(account_table.c.id == id)
)
user_db_cursor = await connection.execute(query)
user_db = user_db_cursor.one_or_none()
if not user_db:
return None
user_data = {
column.name: (getattr(user_db, column.name).name if isinstance(
getattr(user_db, column.name), Enum) else getattr(user_db, column.name))
for column in account_table.columns
}
return User.model_validate(user_data)
async def get_user_login(connection: AsyncConnection, login: str) -> Optional[User]:
"""
Получает юзера по login.
"""
query = (
select(account_table)
.where(account_table.c.login == login)
)
user_db_cursor = await connection.execute(query)
user_db = user_db_cursor.one_or_none()
if not user_db:
return None
user_data = {
column.name: (getattr(user_db, column.name).name if isinstance(
getattr(user_db, column.name), Enum) else getattr(user_db, column.name))
for column in account_table.columns
}
return User.model_validate(user_data)
async def update_user_id(connection: AsyncConnection, update_values, user) -> Optional[User]:
"""
Вносит изменеия в нужное поле таблицы account_table.
"""
await connection.execute(
account_table.update()
.where(account_table.c.id == user.id)
.values(**update_values)
)
await connection.commit()
async def create_user(connection: AsyncConnection, user: User, creator_id: int) -> Optional[User]:
"""
Создает нове поле в таблице account_table.
"""
query = insert(account_table).values(
name=user.name,
login=user.login,
email=user.email,
bind_tenant_id=user.bind_tenant_id,
role=user.role.value,
meta=user.meta,
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
status=user.status.value
)
await connection.execute(query)
await connection.commit()
return user