VORKOUT-14: format api with ruff #15

Merged
vlad.dev merged 1 commits from VORKOUT-14 into master 2025-07-04 20:09:54 +05:00
7 changed files with 65 additions and 47 deletions

View File

@ -13,10 +13,12 @@ from api.db.tables.events import list_events_table
from api.schemas.events.list_events import ListEvent
from api.schemas.endpoints.list_events import all_list_event_adapter,AllListEventResponse
from api.schemas.endpoints.list_events import all_list_event_adapter, AllListEventResponse
async def get_listevents_page_by_creator_id(connection: AsyncConnection, creator_id: int, page: int, limit: int) -> Optional[AllListEventResponse]:
async def get_listevents_page_by_creator_id(
connection: AsyncConnection, creator_id: int, page: int, limit: int
) -> Optional[AllListEventResponse]:
"""
Получает список событий заданного создателя по значениям page и limit и creator_id.
"""
@ -55,8 +57,13 @@ async def get_listevents_page_by_creator_id(connection: AsyncConnection, creator
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
validated_list_event = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(list_event=validated_list_event, amount_count=total_count, amount_pages=total_pages, current_page=page,
limit = limit)
return AllListEventResponse(
list_event=validated_list_event,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_listevents_page(connection: AsyncConnection, page, limit) -> Optional[AllListEventResponse]:
@ -94,8 +101,14 @@ async def get_listevents_page(connection: AsyncConnection, page, limit) -> Optio
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
validated_list_event = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(list_event=validated_list_event, amount_count=total_count, amount_pages=total_pages,current_page=page,
limit = limit)
return AllListEventResponse(
list_event=validated_list_event,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_listevents_by_name(connection: AsyncConnection, name: str) -> Optional[ListEvent]:
"""
@ -120,6 +133,7 @@ async def get_listevents_by_name(connection: AsyncConnection, name: str) -> Opti
return ListEvent.model_validate(listevents_data)
async def get_listevents_by_id(connection: AsyncConnection, id: int) -> Optional[ListEvent]:
"""
Получает listevent по id.
@ -143,16 +157,18 @@ async def get_listevents_by_id(connection: AsyncConnection, id: int) -> Optional
return ListEvent.model_validate(listevents_data)
async def update_listevents_by_id(connection: AsyncConnection, update_values, listevents):
"""
Вносит изменеия в нужное поле таблицы list_events_table.
"""
await connection.execute(list_events_table.update().where(list_events_table.c.id == listevents.id).values(**update_values))
await connection.execute(
list_events_table.update().where(list_events_table.c.id == listevents.id).values(**update_values)
)
await connection.commit()
async def create_listevents(connection: AsyncConnection, listevents: ListEvent, creator_id: int) -> Optional[ListEvent]:
"""
Создает нове поле в таблице list_events_table.
@ -164,7 +180,7 @@ async def create_listevents(connection: AsyncConnection, listevents: ListEvent,
created_at=datetime.now(timezone.utc),
schema=listevents.schema_, # добавлено поле schema
state=listevents.state.value, # добавлено поле state
status=listevents.status.value # добавлено поле status
status=listevents.status.value, # добавлено поле status
)
await connection.execute(query)

View File

@ -4,7 +4,7 @@ from api.endpoints.account import api_router as account_router
from api.endpoints.keyring import api_router as keyring_router
from api.endpoints.listevents import api_router as listevents_router
list_of_routes = [auth_router, profile_router, account_router, keyring_router,listevents_router]
list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router]
__all__ = [
"list_of_routes",

View File

@ -10,9 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.db.logic.account import (
get_user_by_login
)
from api.db.logic.account import get_user_by_login
from api.db.logic.listevents import (
get_listevents_by_name,
@ -20,8 +18,7 @@ from api.db.logic.listevents import (
create_listevents,
update_listevents_by_id,
get_listevents_page,
get_listevents_page_by_creator_id
get_listevents_page_by_creator_id,
)
@ -30,11 +27,14 @@ from api.db.tables.events import EventStatus
from api.schemas.base import bearer_schema
from api.schemas.endpoints.list_events import ListEventUpdate,AllListEventResponse
from api.schemas.endpoints.list_events import ListEventUpdate, AllListEventResponse
from api.services.auth import get_current_user
from api.services.user_role_validation import db_user_role_validation_for_listevents_by_listevent_id,db_user_role_validation_for_listevents
from api.services.user_role_validation import (
db_user_role_validation_for_listevents_by_listevent_id,
db_user_role_validation_for_listevents,
)
from api.services.update_data_validation import update_listevents_data_changes
@ -43,20 +43,17 @@ api_router = APIRouter(
tags=["list events"],
)
@api_router.get("",
dependencies=[Depends(bearer_schema)],
response_model=AllListEventResponse)
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllListEventResponse)
async def get_all_list_events(
page: int = 1,
limit: int = 10,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user,page_flag = await db_user_role_validation_for_listevents(connection, current_user)
authorize_user, page_flag = await db_user_role_validation_for_listevents(connection, current_user)
if page_flag:
list_eventslist = await get_listevents_page(connection, page, limit)
print(list_eventslist)
if list_eventslist is None:
@ -64,7 +61,7 @@ async def get_all_list_events(
return list_eventslist
else:
list_events_list = await get_listevents_page_by_creator_id(connection,authorize_user.id, page, limit)
list_events_list = await get_listevents_page_by_creator_id(connection, authorize_user.id, page, limit)
if list_events_list is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
@ -72,23 +69,20 @@ async def get_all_list_events(
return list_events_list
@api_router.get("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def get_list_events(
listevents_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user)
current_user=Depends(get_current_user),
):
listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(connection, current_user,listevents_validation.creator_id)
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
if listevents_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
@ -100,9 +94,8 @@ async def get_list_events(
async def create_list_events(
listevents: ListEventUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user)
current_user=Depends(get_current_user),
):
user_validation = await get_user_by_login(connection, current_user)
listevents_validation = await get_listevents_by_name(connection, listevents.name)
@ -116,6 +109,7 @@ async def create_list_events(
status_code=status.HTTP_400_BAD_REQUEST, detail="An List events with this information already exists."
)
@api_router.put("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def update_listevents(
listevents_id: int,
@ -123,13 +117,14 @@ async def update_listevents(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(connection, current_user,listevents_validation.creator_id)
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
update_values = update_listevents_data_changes(listevents_update, listevents_validation)
@ -151,17 +146,17 @@ async def delete_list_events(
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(connection, current_user,listevents_validation.creator_id)
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
listevents_update = ListEventUpdate(status=EventStatus.DELETED.value)
update_values = update_listevents_data_changes(listevents_update, listevents_validation)
if update_values is None:

View File

@ -1,18 +1,19 @@
from pydantic import Field, TypeAdapter
from typing import Optional,Dict, Any, List
from typing import Optional, Dict, Any, List
from datetime import datetime
from api.schemas.base import Base
from api.db.tables.events import EventState,EventStatus
from api.db.tables.events import EventState, EventStatus
class ListEventUpdate(Base):
name: Optional[str] = Field(None, max_length=40)
title: Optional[str] = Field(None, max_length=64)
schema_: Optional[Dict[str, Any]]= Field(None, alias="schema")
state: Optional[EventState]= None
status: Optional[EventStatus]= None
schema_: Optional[Dict[str, Any]] = Field(None, alias="schema")
state: Optional[EventState] = None
status: Optional[EventStatus] = None
class AllListEvent(Base):
id: int
@ -32,4 +33,5 @@ class AllListEventResponse(Base):
current_page: int
limit: int
all_list_event_adapter = TypeAdapter(List[AllListEvent])

View File

@ -3,7 +3,7 @@ from typing import Dict, Any
from datetime import datetime
from api.schemas.base import Base
from api.db.tables.events import EventState,EventStatus
from api.db.tables.events import EventState, EventStatus
class ListEvent(Base):

View File

@ -75,6 +75,7 @@ def update_key_data_changes(update_data: AccountKeyringUpdate, key) -> Optional[
return changes if changes else None
def update_listevents_data_changes(update_data: ListEventUpdate, listevents) -> Optional[dict]:
"""
Сравнивает данные для обновления с текущими значениями listevents.

View File

@ -12,16 +12,20 @@ async def db_user_role_validation(connection, current_user):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions")
return authorize_user
async def db_user_role_validation_for_listevents_by_listevent_id(connection, current_user,current_listevents_creator_id):
async def db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, current_listevents_creator_id
):
authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
if authorize_user.id != current_listevents_creator_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions")
return authorize_user
async def db_user_role_validation_for_listevents(connection, current_user):
authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
return authorize_user,False
return authorize_user, False
else:
return authorize_user,True
return authorize_user, True