feat: CRUD ProcessSchema #16

Merged
ivan.dev merged 8 commits from VORKOUT-17 into master 2025-08-05 21:37:07 +05:00
15 changed files with 449 additions and 62 deletions
Showing only changes of commit e0887c240f - Show all commits

View File

@@ -31,7 +31,7 @@ async def get_listevents_page_by_creator_id(
list_events_table.c.title, list_events_table.c.title,
list_events_table.c.creator_id, list_events_table.c.creator_id,
list_events_table.c.created_at, list_events_table.c.created_at,
list_events_table.c.schema_, list_events_table.c.schema,
list_events_table.c.state, list_events_table.c.state,
list_events_table.c.status, list_events_table.c.status,
) )

View File

@@ -0,0 +1,184 @@
from typing import Optional
import math
from datetime import datetime, timezone
from sqlalchemy import insert, select, func
from sqlalchemy.ext.asyncio import AsyncConnection
from enum import Enum
from api.db.tables.process import process_schema_table
from api.schemas.process.process_schema import ProcessSchema
from api.schemas.endpoints.process_schema import all_process_schema_adapter, AllProcessSchemaResponse
async def get_processschema_page_by_creator_id(
ivan.dev marked this conversation as resolved Outdated

Может, переименуем processschema в process_schema, а то как-то 3 s странновато выглядят?

Может, переименуем `processschema` в `process_schema`, а то как-то 3 s странновато выглядят?
connection: AsyncConnection, creator_id: int, page: int, limit: int
) -> Optional[AllProcessSchemaResponse]:
"""
Получает список событий заданного создателя по значениям page и limit и creator_id.
"""
first_schema = page * limit - limit
query = (
select(
process_schema_table.c.id,
process_schema_table.c.title,
process_schema_table.c.description,
process_schema_table.c.owner_id,
process_schema_table.c.creator_id,
process_schema_table.c.created_at,
process_schema_table.c.settings,
process_schema_table.c.status,
)
.where(process_schema_table.c.creator_id == creator_id)
.order_by(process_schema_table.c.id)
.offset(first_schema)
.limit(limit)
)
count_query = (
select(func.count())
.select_from(process_schema_table)
.where(process_schema_table.c.creator_id == creator_id)
)
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
validated_process_schema = all_process_schema_adapter.validate_python(events_data)
return AllProcessSchemaResponse(
process_schema=validated_process_schema,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
ivan.dev marked this conversation as resolved Outdated

А если какая-то дополнительная фильтрация будет над этим списком - отдельным методом будет реализовываться?

Это я к тому, что я бы через DTO расширяемый объект фильтрации тут проводил, у которого по базе limit[offset,count], search и order, к примеру

А если какая-то дополнительная фильтрация будет над этим списком - отдельным методом будет реализовываться? Это я к тому, что я бы через DTO расширяемый объект фильтрации тут проводил, у которого по базе limit[offset,count], search и order, к примеру
async def get_processschema_page(connection: AsyncConnection, page, limit) -> Optional[AllProcessSchemaResponse]:
"""
Получает список событий заданного создателя по значениям page и limit.
ivan.dev marked this conversation as resolved Outdated

Тут, наверное, тогда не "заданного", если я правильно понял

Тут, наверное, тогда не "заданного", если я правильно понял
"""
first_schema = page * limit - (limit)
query = (
select(
process_schema_table.c.id,
process_schema_table.c.title,
process_schema_table.c.description,
process_schema_table.c.owner_id,
process_schema_table.c.creator_id,
process_schema_table.c.created_at,
process_schema_table.c.settings,
process_schema_table.c.status,
)
.order_by(process_schema_table.c.id)
.offset(first_schema)
.limit(limit)
)
count_query = select(func.count()).select_from(process_schema_table)
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
validated_process_schema = all_process_schema_adapter.validate_python(events_data)
return AllProcessSchemaResponse(
process_schema=validated_process_schema,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_processschema_by_title(connection: AsyncConnection, title: str) -> Optional[ProcessSchema]:
"""
Получает process schema по title.
"""
query = select(process_schema_table).where(process_schema_table.c.title == title)
processschema_db_cursor = await connection.execute(query)
processschema_db = processschema_db_cursor.one_or_none()
if not processschema_db:
return None
processschema_data = {
ivan.dev marked this conversation as resolved Outdated

Тут можно упростить, написал ниже

Тут можно упростить, написал ниже
column.name: (
getattr(processschema_db, column.name).name
if isinstance(getattr(processschema_db, column.name), Enum)
else getattr(processschema_db, column.name)
)
for column in process_schema_table.columns
}
return ProcessSchema.model_validate(processschema_data)
async def get_processschema_by_id(connection: AsyncConnection, id: int) -> Optional[ProcessSchema]:
"""
Получает processschema по id.
"""
query = select(process_schema_table).where(process_schema_table.c.id == id)
processschema_db_cursor = await connection.execute(query)
processschema_db = processschema_db_cursor.one_or_none()
if not processschema_db:
return None
processschema_data = {
ivan.dev marked this conversation as resolved Outdated

Давай тут перейдем к схеме попроще

processschema_data = processschema_db_cursor.mappings().one_or_none()
if not processschema_data:
    return None
Давай тут перейдем к схеме попроще ```python processschema_data = processschema_db_cursor.mappings().one_or_none() if not processschema_data: return None ```
column.name: (
getattr(processschema_db, column.name).name
if isinstance(getattr(processschema_db, column.name), Enum)
else getattr(processschema_db, column.name)
)
for column in process_schema_table.columns
}
return ProcessSchema.model_validate(processschema_data)
async def update_processschema_by_id(connection: AsyncConnection, update_values, processschema):
"""
Вносит изменеия в нужное поле таблицы process_schema_table.
"""
await connection.execute(
process_schema_table.update().where(process_schema_table.c.id == processschema.id).values(**update_values)
)
await connection.commit()
async def create_processschema(connection: AsyncConnection, processschema: ProcessSchema, creator_id: int) -> Optional[ProcessSchema]:
"""
Создает нове поле в таблице process_schema_table.
"""
query = insert(process_schema_table).values(
title=processschema.title,
description=processschema.description,
owner_id=processschema.owner_id,
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
settings=processschema.settings,
status=processschema.status.value,
)
await connection.execute(query)
await connection.commit()
return processschema

View File

@@ -1,8 +1,7 @@
import enum import enum
from sqlalchemy import Table, Column, Integer, String, Enum as SQLAEnum, JSON, ForeignKey, DateTime, Index from sqlalchemy import Table, Column, String, Enum as SQLAEnum, JSON, ForeignKey, DateTime
from sqlalchemy.sql import func from sqlalchemy.sql import func
from enum import Enum, auto
from api.db.sql_types import UnsignedInt from api.db.sql_types import UnsignedInt

View File

@@ -3,7 +3,6 @@ import enum
from sqlalchemy import ( from sqlalchemy import (
Table, Table,
Column, Column,
Integer,
String, String,
Text, Text,
Enum as SQLAEnum, Enum as SQLAEnum,
@@ -14,7 +13,7 @@ from sqlalchemy import (
PrimaryKeyConstraint, PrimaryKeyConstraint,
) )
from sqlalchemy.sql import func from sqlalchemy.sql import func
from enum import Enum, auto from enum import Enum
from api.db.sql_types import UnsignedInt from api.db.sql_types import UnsignedInt

View File

@@ -3,8 +3,9 @@ from api.endpoints.profile import api_router as profile_router
from api.endpoints.account import api_router as account_router from api.endpoints.account import api_router as account_router
from api.endpoints.keyring import api_router as keyring_router from api.endpoints.keyring import api_router as keyring_router
from api.endpoints.listevents import api_router as listevents_router from api.endpoints.listevents import api_router as listevents_router
from api.endpoints.processschema import api_router as processschema_router
list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router] list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router,processschema_router]
__all__ = [ __all__ = [
"list_of_routes", "list_of_routes",

View File

@@ -32,8 +32,8 @@ from api.schemas.endpoints.list_events import ListEventUpdate, AllListEventRespo
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.services.user_role_validation import ( from api.services.user_role_validation import (
db_user_role_validation_for_listevents_by_listevent_id, db_user_role_validation_for_listevents_and_processschema_by_listevent_id,
db_user_role_validation_for_listevents, db_user_role_validation_for_listevents_and_processschema,
) )
from api.services.update_data_validation import update_listevents_data_changes from api.services.update_data_validation import update_listevents_data_changes
@@ -51,22 +51,22 @@ async def get_all_list_events(
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
authorize_user, page_flag = await db_user_role_validation_for_listevents(connection, current_user) authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user)
if page_flag: if page_flag:
list_eventslist = await get_listevents_page(connection, page, limit) list_eventspage = await get_listevents_page(connection, page, limit)
ivan.dev marked this conversation as resolved Outdated

list_events_page?

`list_events_page`?
print(list_eventslist)
if list_eventslist is None: if list_eventspage is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
return list_eventslist return list_eventspage
else: else:
list_events_list = await get_listevents_page_by_creator_id(connection, authorize_user.id, page, limit) list_events_page = await get_listevents_page_by_creator_id(connection, authorize_user.id, page, limit)
if list_events_list is None: if list_events_page is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
return list_events_list return list_events_page
@api_router.get("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent) @api_router.get("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
@@ -80,7 +80,7 @@ async def get_list_events(
if listevents_validation is None: if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id( authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, listevents_validation.creator_id connection, current_user, listevents_validation.creator_id
) )
@@ -111,7 +111,7 @@ async def create_list_events(
@api_router.put("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent) @api_router.put("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def update_listevents( async def update_list_events(
listevents_id: int, listevents_id: int,
listevents_update: ListEventUpdate, listevents_update: ListEventUpdate,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
@@ -122,7 +122,7 @@ async def update_listevents(
if listevents_validation is None: if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id( authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, listevents_validation.creator_id connection, current_user, listevents_validation.creator_id
) )
@@ -151,7 +151,7 @@ async def delete_list_events(
if listevents_validation is None: if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id( authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, listevents_validation.creator_id connection, current_user, listevents_validation.creator_id
) )
ivan.dev marked this conversation as resolved Outdated

Давай получение полей для обновления тоже упростим
Посмотри еще где можно, вроде пару мест точно видел)

updated_values = listevents_update.model_dump(by_alias=True, exclude_none=True)

by_alias сделал потому, что при имени schema была ошибка совпадения имен (schema_)

В текущей вариации обновления полей не работает как раз таки со схемой

Давай получение полей для обновления тоже упростим Посмотри еще где можно, вроде пару мест точно видел) ```python updated_values = listevents_update.model_dump(by_alias=True, exclude_none=True) ``` by_alias сделал потому, что при имени schema была ошибка совпадения имен (schema_) В текущей вариации обновления полей не работает как раз таки со схемой

View File

@@ -0,0 +1,167 @@
from fastapi import (
APIRouter,
Depends,
HTTPException,
status,
)
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.db.logic.account import get_user_by_login
from api.db.logic.processschema import (
get_processschema_by_title,
create_processschema,
get_processschema_by_id,
update_processschema_by_id,
get_processschema_page_by_creator_id,
get_processschema_page
)
from api.schemas.process.process_schema import ProcessSchema
from api.db.tables.process import ProcessStatus
from api.schemas.base import bearer_schema
from api.schemas.endpoints.process_schema import ProcessSchemaUpdate, AllProcessSchemaResponse
from api.services.auth import get_current_user
from api.services.user_role_validation import (
db_user_role_validation_for_listevents_and_processschema_by_listevent_id,
db_user_role_validation_for_listevents_and_processschema,
)
from api.services.update_data_validation import update_processschema_data_changes
api_router = APIRouter(
prefix="/processschema",
ivan.dev marked this conversation as resolved Outdated

Может все-таки везде будем делить process-schema на 2 слова? Просто уж очень странно это выглядит один

Может все-таки везде будем делить `process-schema` на 2 слова? Просто уж очень странно это выглядит один

Может все-таки везде будем делить process-schema на 2 слова? Просто уж очень странно это выглядит один

Цепочками с дефисом или нижним подчёркиванием удобнее читать и меньше шансов словить ошибку восприятия, ага

> Может все-таки везде будем делить `process-schema` на 2 слова? Просто уж очень странно это выглядит один Цепочками с дефисом или нижним подчёркиванием удобнее читать и меньше шансов словить ошибку восприятия, ага
tags=["process schema"],
)
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllProcessSchemaResponse)
async def get_all_process_schema(
page: int = 1,
limit: int = 10,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user)
if page_flag:
process_schemapage = await get_processschema_page(connection, page, limit)
if process_schemapage is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
return process_schemapage
else:
process_schema_page = await get_processschema_page_by_creator_id(connection, authorize_user.id, page, limit)
if process_schema_page is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
return process_schema_page
@api_router.get("/{processschema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def get_process_schema(
processschema_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
processschema_validation = await get_processschema_by_id(connection, processschema_id)
if processschema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, processschema_validation.creator_id
)
if processschema_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
return processschema_validation
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def create_process_schema(
processschema: ProcessSchemaUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
user_validation = await get_user_by_login(connection, current_user)
processschema_validation = await get_processschema_by_title(connection, processschema.title)
if processschema_validation is None:
await create_processschema(connection, processschema, user_validation.id)
processschema_new = await get_processschema_by_title(connection, processschema.title)
return processschema_new
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="An process schema with this information already exists."
)
@api_router.put("/{processschema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def update_process_schema(
processschema_id: int,
processschema_update: ProcessSchemaUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
processschema_validation = await get_processschema_by_id(connection, processschema_id)
if processschema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, processschema_validation.creator_id
)
update_values = update_processschema_data_changes(processschema_update, processschema_validation)
if update_values is None:
return processschema_validation
processschema_update_data = ProcessSchema.model_validate({**processschema_validation.model_dump(), **update_values})
await update_processschema_by_id(connection, update_values, processschema_validation)
processschema = await get_processschema_by_id(connection, processschema_id)
return processschema
@api_router.delete("/{processschema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def delete_process_schema(
processschema_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
processschema_validation = await get_processschema_by_id(connection, processschema_id)
if processschema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, processschema_validation.creator_id
)
processschema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value)
update_values = update_processschema_data_changes(processschema_update, processschema_validation)
if update_values is None:
return processschema_validation
ivan.dev marked this conversation as resolved Outdated

Получится привести все использования этого слова во всех файлах к одному виду? О чём писали в комментариях выше.

Получится привести все использования этого слова во всех файлах к одному виду? О чём писали в комментариях выше.
await update_processschema_by_id(connection, update_values, processschema_validation)
processschema = await get_processschema_by_id(connection, processschema_id)
return processschema

View File

@@ -1,11 +1,7 @@
from fastapi import ( from fastapi import (
APIRouter, APIRouter,
Body,
Depends, Depends,
Form,
HTTPException, HTTPException,
Request,
Response,
status, status,
) )

View File

@@ -1,9 +1,6 @@
import datetime
from typing import Optional from typing import Optional
from pydantic import Field from pydantic import Field
from datetime import datetime
from api.db.tables.account import KeyType, KeyStatus from api.db.tables.account import KeyType, KeyStatus
from api.schemas.base import Base from api.schemas.base import Base

View File

@@ -0,0 +1,36 @@
from pydantic import Field, TypeAdapter
from typing import Optional, Dict, Any, List
from datetime import datetime
from api.schemas.base import Base
from api.db.tables.process import ProcessStatus
class ProcessSchemaUpdate(Base):
title: Optional[str] = Field(None, max_length=100)
description: Optional[str] = None
owner_id: Optional[int] = None
settings: Optional[Dict[str, Any]] = None
status: Optional[ProcessStatus] = None
class AllProcessSchema(Base):
id: int
title: str = Field(..., max_length=100)
description: str
owner_id: int
creator_id: int
created_at: datetime
settings: Dict[str, Any]
status: ProcessStatus
class AllProcessSchemaResponse(Base):
process_schema: List[AllProcessSchema]
amount_count: int
amount_pages: int
current_page: int
limit: int
all_process_schema_adapter = TypeAdapter(List[AllProcessSchema])

View File

@@ -1,16 +1,9 @@
from pydantic import Field from pydantic import Field
from typing import Dict, Any from typing import Dict, Any
from datetime import datetime from datetime import datetime
from enum import Enum
from api.schemas.base import Base from api.schemas.base import Base
from api.db.tables.process import NodeStatus
class Status(Enum):
ACTIVE = "Active"
STOPPING = "Stopping"
STOPPED = "Stopped"
DELETED = "Deleted"
class MyModel(Base): class MyModel(Base):
@@ -21,4 +14,4 @@ class MyModel(Base):
settings: Dict[str, Any] settings: Dict[str, Any]
creator_id: int creator_id: int
created_at: datetime created_at: datetime
status: Status status: NodeStatus

View File

@@ -1,17 +1,9 @@
from pydantic import Field from pydantic import Field
from typing import Dict, Any from typing import Dict, Any
from datetime import datetime from datetime import datetime
from enum import Enum
from api.schemas.base import Base from api.schemas.base import Base
from api.db.tables.process import ProcessStatus
class Status(Enum):
ACTIVE = "Active"
STOPPING = "Stopping"
STOPPED = "Stopped"
DELETED = "Deleted"
class ProcessSchema(Base): class ProcessSchema(Base):
id: int id: int
@@ -21,4 +13,4 @@ class ProcessSchema(Base):
creator_id: int creator_id: int
created_at: datetime created_at: datetime
settings: Dict[str, Any] settings: Dict[str, Any]
status: Status status: ProcessStatus

View File

@@ -1,19 +1,8 @@
from datetime import datetime from datetime import datetime
from typing import Dict, Any from typing import Dict, Any
from enum import Enum
from api.schemas.base import Base from api.schemas.base import Base
from api.db.tables.process import NodeType,NodeStatus
class NodeType(Enum):
pass
class Status(Enum):
ACTIVE = "Active"
DISABLED = "Disabled"
DELETED = "Deleted"
class Ps_Node(Base): class Ps_Node(Base):
id: int id: int
@@ -22,4 +11,4 @@ class Ps_Node(Base):
settings: dict settings: dict
creator_id: Dict[str, Any] creator_id: Dict[str, Any]
created_at: datetime created_at: datetime
status: Status status: NodeStatus

View File

@@ -6,7 +6,8 @@ from api.schemas.endpoints.account_keyring import AccountKeyringUpdate
from api.db.tables.account import AccountRole, AccountStatus from api.db.tables.account import AccountRole, AccountStatus
from api.schemas.endpoints.list_events import ListEventUpdate from api.schemas.endpoints.list_events import ListEventUpdate
from api.db.tables.events import EventState, EventStatus from api.db.tables.events import EventState, EventStatus
from api.schemas.endpoints.process_schema import ProcessSchemaUpdate
from api.db.tables.process import ProcessStatus
def update_user_data_changes(update_data: UserUpdate, user) -> Optional[dict]: def update_user_data_changes(update_data: UserUpdate, user) -> Optional[dict]:
""" """
@@ -108,3 +109,36 @@ def update_listevents_data_changes(update_data: ListEventUpdate, listevents) ->
changes[field] = new_value changes[field] = new_value
return changes if changes else None return changes if changes else None
def update_processschema_data_changes(update_data: ProcessSchemaUpdate, processschema) -> Optional[dict]:
"""
Сравнивает данные для обновления с текущими значениями processschema.
Возвращает:
- None, если нет изменений
- Словарь {поле: новое_значение} для измененных полей
"""
update_values = {}
changes = {}
for field, value in update_data.model_dump(exclude_unset=True).items():
if value is None:
continue
if isinstance(value, (ProcessStatus)):
update_values[field] = value.value
else:
update_values[field] = value
for field, new_value in update_values.items():
if not hasattr(processschema, field):
continue
current_value = getattr(processschema, field)
if isinstance(current_value, Enum):
current_value = current_value.value
if current_value != new_value:
changes[field] = new_value
return changes if changes else None

View File

@@ -13,7 +13,7 @@ async def db_user_role_validation(connection, current_user):
return authorize_user return authorize_user
async def db_user_role_validation_for_listevents_by_listevent_id( async def db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, current_listevents_creator_id connection, current_user, current_listevents_creator_id
): ):
authorize_user = await get_user_by_login(connection, current_user) authorize_user = await get_user_by_login(connection, current_user)
@@ -23,7 +23,7 @@ async def db_user_role_validation_for_listevents_by_listevent_id(
return authorize_user return authorize_user
async def db_user_role_validation_for_listevents(connection, current_user): async def db_user_role_validation_for_listevents_and_processschema(connection, current_user):
authorize_user = await get_user_by_login(connection, current_user) authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
return authorize_user, False return authorize_user, False