feat: add TypeAdapter

This commit is contained in:
TheNoxium 2025-05-28 14:10:00 +05:00
parent 98a4692247
commit 320c13183f

View File

@ -3,6 +3,8 @@ import math
from datetime import datetime, timezone from datetime import datetime, timezone
from pydantic import TypeAdapter
from sqlalchemy import insert, select, func from sqlalchemy import insert, select, func
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
from enum import Enum from enum import Enum
@ -10,33 +12,45 @@ from enum import Enum
from api.db.tables.account import account_table from api.db.tables.account import account_table
from api.schemas.account.account import User from api.schemas.account.account import User
from api.schemas.endpoints.account import AllUser,AllUserResponse from api.schemas.endpoints.account import AllUser, AllUserResponse
async def get_all_users_login_and_id(connection: AsyncConnection,page,limit) -> Optional[User]:
async def get_all_users_login_and_id(connection: AsyncConnection, page, limit) -> Optional[User]:
""" """
Получает id и login всех юзеров Получает id и login всех юзеров
""" """
first_user = page*limit-(limit-1)
last_user = first_user+(limit-1)
query = select(account_table.c.id, account_table.c.login).where(account_table.c.id.between(first_user, last_user )) user_adapter = TypeAdapter(AllUser)
response_adapter = TypeAdapter(AllUserResponse)
result = await connection.execute(query) first_user = page*limit-(limit)
users = result.fetchall() query = (
select(account_table.c.id, account_table.c.login)
user_list = [{'id': user.id, 'login': user.login} for user in users] .order_by(account_table.c.id)
.offset(first_user)
user_models = [AllUser(id=user['id'], login=user['login']) for user in user_list] .limit(limit)
)
count_query = select(func.count()).select_from(account_table) count_query = select(func.count()).select_from(account_table)
result = await connection.execute(query)
count_result = await connection.execute(count_query) count_result = await connection.execute(count_query)
amount_count = count_result.scalar()
amount_pages = math.ceil(amount_count / limit)
users_data = result.fetchall()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
return AllUserResponse(users=user_models, amount_count=amount_count, amount_pages=amount_pages) validated_users = [
user_adapter.validate_python({"id": u.id, "login": u.login})
for u in users_data
]
return response_adapter.validate_python({
"users": validated_users,
"amount_count": total_count,
"amount_pages": total_pages
})
async def get_user_id(connection: AsyncConnection, id: int) -> Optional[User]: async def get_user_id(connection: AsyncConnection, id: int) -> Optional[User]:
@ -116,8 +130,6 @@ async def create_user(connection: AsyncConnection, user: User, creator_id: int)
status=user.status.value status=user.status.value
) )
await connection.execute(query) await connection.execute(query)
await connection.commit() await connection.commit()