refactor(api): format with ruff

This commit is contained in:
Vladislav Syrochkin 2025-07-04 20:08:50 +05:00
parent 82fcd22faf
commit 60160d46be
7 changed files with 65 additions and 47 deletions

View File

@ -13,10 +13,12 @@ from api.db.tables.events import list_events_table
from api.schemas.events.list_events import ListEvent from api.schemas.events.list_events import ListEvent
from api.schemas.endpoints.list_events import all_list_event_adapter,AllListEventResponse from api.schemas.endpoints.list_events import all_list_event_adapter, AllListEventResponse
async def get_listevents_page_by_creator_id(connection: AsyncConnection, creator_id: int, page: int, limit: int) -> Optional[AllListEventResponse]: async def get_listevents_page_by_creator_id(
connection: AsyncConnection, creator_id: int, page: int, limit: int
) -> Optional[AllListEventResponse]:
""" """
Получает список событий заданного создателя по значениям page и limit и creator_id. Получает список событий заданного создателя по значениям page и limit и creator_id.
""" """
@ -55,8 +57,13 @@ async def get_listevents_page_by_creator_id(connection: AsyncConnection, creator
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные # Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
validated_list_event = all_list_event_adapter.validate_python(events_data) validated_list_event = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(list_event=validated_list_event, amount_count=total_count, amount_pages=total_pages, current_page=page, return AllListEventResponse(
limit = limit) list_event=validated_list_event,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_listevents_page(connection: AsyncConnection, page, limit) -> Optional[AllListEventResponse]: async def get_listevents_page(connection: AsyncConnection, page, limit) -> Optional[AllListEventResponse]:
@ -94,8 +101,14 @@ async def get_listevents_page(connection: AsyncConnection, page, limit) -> Optio
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные # Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
validated_list_event = all_list_event_adapter.validate_python(events_data) validated_list_event = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(list_event=validated_list_event, amount_count=total_count, amount_pages=total_pages,current_page=page, return AllListEventResponse(
limit = limit) list_event=validated_list_event,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_listevents_by_name(connection: AsyncConnection, name: str) -> Optional[ListEvent]: async def get_listevents_by_name(connection: AsyncConnection, name: str) -> Optional[ListEvent]:
""" """
@ -120,6 +133,7 @@ async def get_listevents_by_name(connection: AsyncConnection, name: str) -> Opti
return ListEvent.model_validate(listevents_data) return ListEvent.model_validate(listevents_data)
async def get_listevents_by_id(connection: AsyncConnection, id: int) -> Optional[ListEvent]: async def get_listevents_by_id(connection: AsyncConnection, id: int) -> Optional[ListEvent]:
""" """
Получает listevent по id. Получает listevent по id.
@ -143,16 +157,18 @@ async def get_listevents_by_id(connection: AsyncConnection, id: int) -> Optional
return ListEvent.model_validate(listevents_data) return ListEvent.model_validate(listevents_data)
async def update_listevents_by_id(connection: AsyncConnection, update_values, listevents): async def update_listevents_by_id(connection: AsyncConnection, update_values, listevents):
""" """
Вносит изменеия в нужное поле таблицы list_events_table. Вносит изменеия в нужное поле таблицы list_events_table.
""" """
await connection.execute(list_events_table.update().where(list_events_table.c.id == listevents.id).values(**update_values)) await connection.execute(
list_events_table.update().where(list_events_table.c.id == listevents.id).values(**update_values)
)
await connection.commit() await connection.commit()
async def create_listevents(connection: AsyncConnection, listevents: ListEvent, creator_id: int) -> Optional[ListEvent]: async def create_listevents(connection: AsyncConnection, listevents: ListEvent, creator_id: int) -> Optional[ListEvent]:
""" """
Создает нове поле в таблице list_events_table. Создает нове поле в таблице list_events_table.
@ -164,7 +180,7 @@ async def create_listevents(connection: AsyncConnection, listevents: ListEvent,
created_at=datetime.now(timezone.utc), created_at=datetime.now(timezone.utc),
schema=listevents.schema_, # добавлено поле schema schema=listevents.schema_, # добавлено поле schema
state=listevents.state.value, # добавлено поле state state=listevents.state.value, # добавлено поле state
status=listevents.status.value # добавлено поле status status=listevents.status.value, # добавлено поле status
) )
await connection.execute(query) await connection.execute(query)

View File

@ -4,7 +4,7 @@ from api.endpoints.account import api_router as account_router
from api.endpoints.keyring import api_router as keyring_router from api.endpoints.keyring import api_router as keyring_router
from api.endpoints.listevents import api_router as listevents_router from api.endpoints.listevents import api_router as listevents_router
list_of_routes = [auth_router, profile_router, account_router, keyring_router,listevents_router] list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router]
__all__ = [ __all__ = [
"list_of_routes", "list_of_routes",

View File

@ -10,9 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep from api.db.connection.session import get_connection_dep
from api.db.logic.account import ( from api.db.logic.account import get_user_by_login
get_user_by_login
)
from api.db.logic.listevents import ( from api.db.logic.listevents import (
get_listevents_by_name, get_listevents_by_name,
@ -20,8 +18,7 @@ from api.db.logic.listevents import (
create_listevents, create_listevents,
update_listevents_by_id, update_listevents_by_id,
get_listevents_page, get_listevents_page,
get_listevents_page_by_creator_id get_listevents_page_by_creator_id,
) )
@ -30,11 +27,14 @@ from api.db.tables.events import EventStatus
from api.schemas.base import bearer_schema from api.schemas.base import bearer_schema
from api.schemas.endpoints.list_events import ListEventUpdate,AllListEventResponse from api.schemas.endpoints.list_events import ListEventUpdate, AllListEventResponse
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.services.user_role_validation import db_user_role_validation_for_listevents_by_listevent_id,db_user_role_validation_for_listevents from api.services.user_role_validation import (
db_user_role_validation_for_listevents_by_listevent_id,
db_user_role_validation_for_listevents,
)
from api.services.update_data_validation import update_listevents_data_changes from api.services.update_data_validation import update_listevents_data_changes
@ -43,20 +43,17 @@ api_router = APIRouter(
tags=["list events"], tags=["list events"],
) )
@api_router.get("",
dependencies=[Depends(bearer_schema)], @api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllListEventResponse)
response_model=AllListEventResponse)
async def get_all_list_events( async def get_all_list_events(
page: int = 1, page: int = 1,
limit: int = 10, limit: int = 10,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
authorize_user, page_flag = await db_user_role_validation_for_listevents(connection, current_user)
authorize_user,page_flag = await db_user_role_validation_for_listevents(connection, current_user)
if page_flag: if page_flag:
list_eventslist = await get_listevents_page(connection, page, limit) list_eventslist = await get_listevents_page(connection, page, limit)
print(list_eventslist) print(list_eventslist)
if list_eventslist is None: if list_eventslist is None:
@ -64,7 +61,7 @@ async def get_all_list_events(
return list_eventslist return list_eventslist
else: else:
list_events_list = await get_listevents_page_by_creator_id(connection,authorize_user.id, page, limit) list_events_list = await get_listevents_page_by_creator_id(connection, authorize_user.id, page, limit)
if list_events_list is None: if list_events_list is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
@ -72,23 +69,20 @@ async def get_all_list_events(
return list_events_list return list_events_list
@api_router.get("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent) @api_router.get("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def get_list_events( async def get_list_events(
listevents_id: int, listevents_id: int,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user) current_user=Depends(get_current_user),
): ):
listevents_validation = await get_listevents_by_id(connection, listevents_id) listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None: if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(connection, current_user,listevents_validation.creator_id) authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
if listevents_id is None: if listevents_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
@ -100,9 +94,8 @@ async def get_list_events(
async def create_list_events( async def create_list_events(
listevents: ListEventUpdate, listevents: ListEventUpdate,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user) current_user=Depends(get_current_user),
): ):
user_validation = await get_user_by_login(connection, current_user) user_validation = await get_user_by_login(connection, current_user)
listevents_validation = await get_listevents_by_name(connection, listevents.name) listevents_validation = await get_listevents_by_name(connection, listevents.name)
@ -116,6 +109,7 @@ async def create_list_events(
status_code=status.HTTP_400_BAD_REQUEST, detail="An List events with this information already exists." status_code=status.HTTP_400_BAD_REQUEST, detail="An List events with this information already exists."
) )
@api_router.put("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent) @api_router.put("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def update_listevents( async def update_listevents(
listevents_id: int, listevents_id: int,
@ -123,13 +117,14 @@ async def update_listevents(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
listevents_validation = await get_listevents_by_id(connection, listevents_id) listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None: if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(connection, current_user,listevents_validation.creator_id) authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
update_values = update_listevents_data_changes(listevents_update, listevents_validation) update_values = update_listevents_data_changes(listevents_update, listevents_validation)
@ -151,17 +146,17 @@ async def delete_list_events(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
listevents_validation = await get_listevents_by_id(connection, listevents_id) listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None: if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(connection, current_user,listevents_validation.creator_id) authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
listevents_update = ListEventUpdate(status=EventStatus.DELETED.value) listevents_update = ListEventUpdate(status=EventStatus.DELETED.value)
update_values = update_listevents_data_changes(listevents_update, listevents_validation) update_values = update_listevents_data_changes(listevents_update, listevents_validation)
if update_values is None: if update_values is None:

View File

@ -1,18 +1,19 @@
from pydantic import Field, TypeAdapter from pydantic import Field, TypeAdapter
from typing import Optional,Dict, Any, List from typing import Optional, Dict, Any, List
from datetime import datetime from datetime import datetime
from api.schemas.base import Base from api.schemas.base import Base
from api.db.tables.events import EventState,EventStatus from api.db.tables.events import EventState, EventStatus
class ListEventUpdate(Base): class ListEventUpdate(Base):
name: Optional[str] = Field(None, max_length=40) name: Optional[str] = Field(None, max_length=40)
title: Optional[str] = Field(None, max_length=64) title: Optional[str] = Field(None, max_length=64)
schema_: Optional[Dict[str, Any]]= Field(None, alias="schema") schema_: Optional[Dict[str, Any]] = Field(None, alias="schema")
state: Optional[EventState]= None state: Optional[EventState] = None
status: Optional[EventStatus]= None status: Optional[EventStatus] = None
class AllListEvent(Base): class AllListEvent(Base):
id: int id: int
@ -32,4 +33,5 @@ class AllListEventResponse(Base):
current_page: int current_page: int
limit: int limit: int
all_list_event_adapter = TypeAdapter(List[AllListEvent]) all_list_event_adapter = TypeAdapter(List[AllListEvent])

View File

@ -3,7 +3,7 @@ from typing import Dict, Any
from datetime import datetime from datetime import datetime
from api.schemas.base import Base from api.schemas.base import Base
from api.db.tables.events import EventState,EventStatus from api.db.tables.events import EventState, EventStatus
class ListEvent(Base): class ListEvent(Base):

View File

@ -75,6 +75,7 @@ def update_key_data_changes(update_data: AccountKeyringUpdate, key) -> Optional[
return changes if changes else None return changes if changes else None
def update_listevents_data_changes(update_data: ListEventUpdate, listevents) -> Optional[dict]: def update_listevents_data_changes(update_data: ListEventUpdate, listevents) -> Optional[dict]:
""" """
Сравнивает данные для обновления с текущими значениями listevents. Сравнивает данные для обновления с текущими значениями listevents.

View File

@ -12,16 +12,20 @@ async def db_user_role_validation(connection, current_user):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions")
return authorize_user return authorize_user
async def db_user_role_validation_for_listevents_by_listevent_id(connection, current_user,current_listevents_creator_id):
async def db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, current_listevents_creator_id
):
authorize_user = await get_user_by_login(connection, current_user) authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
if authorize_user.id != current_listevents_creator_id: if authorize_user.id != current_listevents_creator_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions")
return authorize_user return authorize_user
async def db_user_role_validation_for_listevents(connection, current_user): async def db_user_role_validation_for_listevents(connection, current_user):
authorize_user = await get_user_by_login(connection, current_user) authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
return authorize_user,False return authorize_user, False
else: else:
return authorize_user,True return authorize_user, True