connect/api/api/db/logic/keyring.py
2025-07-02 12:23:22 +05:00

105 lines
3.5 KiB
Python

from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Optional
from sqlalchemy import insert, select, update
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.tables.account import account_keyring_table, KeyStatus, KeyType
from api.schemas.account.account_keyring import AccountKeyring
from api.utils.hasher import hasher
async def get_key_by_id(connection: AsyncConnection, key_id: str) -> Optional[AccountKeyring]:
"""
Получает key по key_id.
"""
query = select(account_keyring_table).where(account_keyring_table.c.key_id == key_id)
user_db_cursor = await connection.execute(query)
user_db = user_db_cursor.one_or_none()
if not user_db:
return None
user_data = {
column.name: (
getattr(user_db, column.name).name
if isinstance(getattr(user_db, column.name), Enum)
else getattr(user_db, column.name)
)
for column in account_keyring_table.columns
}
return AccountKeyring.model_validate(user_data)
async def update_key_by_id(connection: AsyncConnection, update_values, key) -> Optional[AccountKeyring]:
"""
Вносит изменеия в нужное поле таблицы account_keyring_table.
"""
await connection.execute(
account_keyring_table.update().where(account_keyring_table.c.key_id == key.key_id).values(**update_values)
)
await connection.commit()
async def create_key(connection: AsyncConnection, key: AccountKeyring, key_id: int) -> Optional[AccountKeyring]:
"""
Создает нове поле в таблице account_keyring_table).
"""
query = insert(account_keyring_table).values(
owner_id=key.owner_id,
key_type=key.key_type.value,
key_id=key_id,
key_value=key.key_value,
created_at=datetime.now(timezone.utc),
expiry=key.expiry,
status=key.status.value,
)
key.created_at = datetime.now(timezone.utc)
key.key_id = key_id
await connection.execute(query)
await connection.commit()
return key
async def create_password_key(connection: AsyncConnection, password: str | None, owner_id: int):
if password is None:
password = hasher.generate_password()
hashed_password = hasher.hash_data(password)
stmt = mysql_insert(account_keyring_table).values(
owner_id=owner_id,
key_type=KeyType.PASSWORD.value,
key_id="PASSWORD",
key_value=hashed_password,
created_at=datetime.now(timezone.utc),
expiry=datetime.now(timezone.utc) + timedelta(days=365),
status=KeyStatus.ACTIVE,
)
stmt.on_duplicate_key_update(key_value=hashed_password)
await connection.execute(stmt)
await connection.commit()
async def update_password_key(connection: AsyncConnection, owner_id: int, password: str):
stmt = select(account_keyring_table).where(account_keyring_table.c.owner_id == owner_id)
result = await connection.execute(stmt)
keyring = result.one_or_none()
if not keyring:
await create_password_key(connection, password, owner_id)
else:
stmt = (
update(account_keyring_table)
.values(key_value=hasher.hash_data(password), expiry=datetime.now(timezone.utc) + timedelta(days=365))
.where(account_keyring_table.c.owner_id == owner_id)
)
await connection.execute(stmt)
await connection.commit()