diff --git a/api/api/db/logic/listevents.py b/api/api/db/logic/listevents.py index f667809..c318195 100644 --- a/api/api/db/logic/listevents.py +++ b/api/api/db/logic/listevents.py @@ -31,7 +31,7 @@ async def get_listevents_page_by_creator_id( list_events_table.c.title, list_events_table.c.creator_id, list_events_table.c.created_at, - list_events_table.c.schema_, + list_events_table.c.schema, list_events_table.c.state, list_events_table.c.status, ) diff --git a/api/api/db/logic/processschema.py b/api/api/db/logic/processschema.py new file mode 100644 index 0000000..5f34392 --- /dev/null +++ b/api/api/db/logic/processschema.py @@ -0,0 +1,184 @@ +from typing import Optional +import math + +from datetime import datetime, timezone + +from sqlalchemy import insert, select, func +from sqlalchemy.ext.asyncio import AsyncConnection +from enum import Enum + +from api.db.tables.process import process_schema_table + +from api.schemas.process.process_schema import ProcessSchema + +from api.schemas.endpoints.process_schema import all_process_schema_adapter, AllProcessSchemaResponse + +async def get_processschema_page_by_creator_id( + connection: AsyncConnection, creator_id: int, page: int, limit: int +) -> Optional[AllProcessSchemaResponse]: + """ + Получает список событий заданного создателя по значениям page и limit и creator_id. + """ + + first_schema = page * limit - limit + query = ( + select( + process_schema_table.c.id, + process_schema_table.c.title, + process_schema_table.c.description, + process_schema_table.c.owner_id, + process_schema_table.c.creator_id, + process_schema_table.c.created_at, + process_schema_table.c.settings, + process_schema_table.c.status, + ) + .where(process_schema_table.c.creator_id == creator_id) + .order_by(process_schema_table.c.id) + .offset(first_schema) + .limit(limit) + ) + + count_query = ( + select(func.count()) + .select_from(process_schema_table) + .where(process_schema_table.c.creator_id == creator_id) + ) + + result = await connection.execute(query) + count_result = await connection.execute(count_query) + + events_data = result.mappings().all() + total_count = count_result.scalar() + total_pages = math.ceil(total_count / limit) + + validated_process_schema = all_process_schema_adapter.validate_python(events_data) + + return AllProcessSchemaResponse( + process_schema=validated_process_schema, + amount_count=total_count, + amount_pages=total_pages, + current_page=page, + limit=limit, + ) + + +async def get_processschema_page(connection: AsyncConnection, page, limit) -> Optional[AllProcessSchemaResponse]: + """ + Получает список событий заданного создателя по значениям page и limit. + """ + + first_schema = page * limit - (limit) + + query = ( + select( + process_schema_table.c.id, + process_schema_table.c.title, + process_schema_table.c.description, + process_schema_table.c.owner_id, + process_schema_table.c.creator_id, + process_schema_table.c.created_at, + process_schema_table.c.settings, + process_schema_table.c.status, + ) + .order_by(process_schema_table.c.id) + .offset(first_schema) + .limit(limit) + ) + + count_query = select(func.count()).select_from(process_schema_table) + + result = await connection.execute(query) + count_result = await connection.execute(count_query) + + events_data = result.mappings().all() + total_count = count_result.scalar() + total_pages = math.ceil(total_count / limit) + + + validated_process_schema = all_process_schema_adapter.validate_python(events_data) + + return AllProcessSchemaResponse( + process_schema=validated_process_schema, + amount_count=total_count, + amount_pages=total_pages, + current_page=page, + limit=limit, + ) + + + +async def get_processschema_by_title(connection: AsyncConnection, title: str) -> Optional[ProcessSchema]: + """ + Получает process schema по title. + """ + query = select(process_schema_table).where(process_schema_table.c.title == title) + + processschema_db_cursor = await connection.execute(query) + processschema_db = processschema_db_cursor.one_or_none() + + if not processschema_db: + return None + + processschema_data = { + column.name: ( + getattr(processschema_db, column.name).name + if isinstance(getattr(processschema_db, column.name), Enum) + else getattr(processschema_db, column.name) + ) + for column in process_schema_table.columns + } + + return ProcessSchema.model_validate(processschema_data) + +async def get_processschema_by_id(connection: AsyncConnection, id: int) -> Optional[ProcessSchema]: + """ + Получает processschema по id. + """ + query = select(process_schema_table).where(process_schema_table.c.id == id) + + processschema_db_cursor = await connection.execute(query) + processschema_db = processschema_db_cursor.one_or_none() + + if not processschema_db: + return None + + processschema_data = { + column.name: ( + getattr(processschema_db, column.name).name + if isinstance(getattr(processschema_db, column.name), Enum) + else getattr(processschema_db, column.name) + ) + for column in process_schema_table.columns + } + + return ProcessSchema.model_validate(processschema_data) + +async def update_processschema_by_id(connection: AsyncConnection, update_values, processschema): + """ + Вносит изменеия в нужное поле таблицы process_schema_table. + """ + await connection.execute( + process_schema_table.update().where(process_schema_table.c.id == processschema.id).values(**update_values) + ) + + await connection.commit() + +async def create_processschema(connection: AsyncConnection, processschema: ProcessSchema, creator_id: int) -> Optional[ProcessSchema]: + """ + Создает нове поле в таблице process_schema_table. + """ + query = insert(process_schema_table).values( + title=processschema.title, + description=processschema.description, + owner_id=processschema.owner_id, + creator_id=creator_id, + created_at=datetime.now(timezone.utc), + settings=processschema.settings, + status=processschema.status.value, + ) + + await connection.execute(query) + + await connection.commit() + + return processschema diff --git a/api/api/db/tables/events.py b/api/api/db/tables/events.py index fba2e58..8b60caf 100644 --- a/api/api/db/tables/events.py +++ b/api/api/db/tables/events.py @@ -1,8 +1,7 @@ import enum -from sqlalchemy import Table, Column, Integer, String, Enum as SQLAEnum, JSON, ForeignKey, DateTime, Index +from sqlalchemy import Table, Column, String, Enum as SQLAEnum, JSON, ForeignKey, DateTime from sqlalchemy.sql import func -from enum import Enum, auto from api.db.sql_types import UnsignedInt diff --git a/api/api/db/tables/process.py b/api/api/db/tables/process.py index 379663c..7f9beec 100644 --- a/api/api/db/tables/process.py +++ b/api/api/db/tables/process.py @@ -3,7 +3,6 @@ import enum from sqlalchemy import ( Table, Column, - Integer, String, Text, Enum as SQLAEnum, @@ -14,7 +13,7 @@ from sqlalchemy import ( PrimaryKeyConstraint, ) from sqlalchemy.sql import func -from enum import Enum, auto +from enum import Enum from api.db.sql_types import UnsignedInt diff --git a/api/api/endpoints/__init__.py b/api/api/endpoints/__init__.py index a02a1c5..d0ecfc0 100644 --- a/api/api/endpoints/__init__.py +++ b/api/api/endpoints/__init__.py @@ -3,8 +3,9 @@ from api.endpoints.profile import api_router as profile_router from api.endpoints.account import api_router as account_router from api.endpoints.keyring import api_router as keyring_router from api.endpoints.listevents import api_router as listevents_router +from api.endpoints.processschema import api_router as processschema_router -list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router] +list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router,processschema_router] __all__ = [ "list_of_routes", diff --git a/api/api/endpoints/listevents.py b/api/api/endpoints/listevents.py index 0a5a0f7..d93274f 100644 --- a/api/api/endpoints/listevents.py +++ b/api/api/endpoints/listevents.py @@ -32,8 +32,8 @@ from api.schemas.endpoints.list_events import ListEventUpdate, AllListEventRespo from api.services.auth import get_current_user from api.services.user_role_validation import ( - db_user_role_validation_for_listevents_by_listevent_id, - db_user_role_validation_for_listevents, + db_user_role_validation_for_listevents_and_processschema_by_listevent_id, + db_user_role_validation_for_listevents_and_processschema, ) from api.services.update_data_validation import update_listevents_data_changes @@ -51,22 +51,22 @@ async def get_all_list_events( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user, page_flag = await db_user_role_validation_for_listevents(connection, current_user) + authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user) if page_flag: - list_eventslist = await get_listevents_page(connection, page, limit) - print(list_eventslist) - if list_eventslist is None: + list_eventspage = await get_listevents_page(connection, page, limit) + + if list_eventspage is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") - return list_eventslist + return list_eventspage else: - list_events_list = await get_listevents_page_by_creator_id(connection, authorize_user.id, page, limit) + list_events_page = await get_listevents_page_by_creator_id(connection, authorize_user.id, page, limit) - if list_events_list is None: + if list_events_page is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") - return list_events_list + return list_events_page @api_router.get("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent) @@ -80,7 +80,7 @@ async def get_list_events( if listevents_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") - authorize_user = await db_user_role_validation_for_listevents_by_listevent_id( + authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id( connection, current_user, listevents_validation.creator_id ) @@ -111,7 +111,7 @@ async def create_list_events( @api_router.put("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent) -async def update_listevents( +async def update_list_events( listevents_id: int, listevents_update: ListEventUpdate, connection: AsyncConnection = Depends(get_connection_dep), @@ -122,7 +122,7 @@ async def update_listevents( if listevents_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") - authorize_user = await db_user_role_validation_for_listevents_by_listevent_id( + authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id( connection, current_user, listevents_validation.creator_id ) @@ -151,7 +151,7 @@ async def delete_list_events( if listevents_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") - authorize_user = await db_user_role_validation_for_listevents_by_listevent_id( + authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id( connection, current_user, listevents_validation.creator_id ) diff --git a/api/api/endpoints/processschema.py b/api/api/endpoints/processschema.py new file mode 100644 index 0000000..e0e67f8 --- /dev/null +++ b/api/api/endpoints/processschema.py @@ -0,0 +1,167 @@ +from fastapi import ( + APIRouter, + Depends, + HTTPException, + status, +) + +from sqlalchemy.ext.asyncio import AsyncConnection + +from api.db.connection.session import get_connection_dep + +from api.db.logic.account import get_user_by_login + +from api.db.logic.processschema import ( + get_processschema_by_title, + create_processschema, + get_processschema_by_id, + update_processschema_by_id, + get_processschema_page_by_creator_id, + get_processschema_page +) + +from api.schemas.process.process_schema import ProcessSchema + +from api.db.tables.process import ProcessStatus + +from api.schemas.base import bearer_schema + +from api.schemas.endpoints.process_schema import ProcessSchemaUpdate, AllProcessSchemaResponse + +from api.services.auth import get_current_user + +from api.services.user_role_validation import ( + db_user_role_validation_for_listevents_and_processschema_by_listevent_id, + db_user_role_validation_for_listevents_and_processschema, +) +from api.services.update_data_validation import update_processschema_data_changes + + +api_router = APIRouter( + prefix="/processschema", + tags=["process schema"], +) + + + +@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllProcessSchemaResponse) +async def get_all_process_schema( + page: int = 1, + limit: int = 10, + connection: AsyncConnection = Depends(get_connection_dep), + current_user=Depends(get_current_user), +): + authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user) + + if page_flag: + process_schemapage = await get_processschema_page(connection, page, limit) + + if process_schemapage is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") + + return process_schemapage + else: + process_schema_page = await get_processschema_page_by_creator_id(connection, authorize_user.id, page, limit) + + if process_schema_page is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") + + return process_schema_page + + +@api_router.get("/{processschema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema) +async def get_process_schema( + processschema_id: int, + connection: AsyncConnection = Depends(get_connection_dep), + current_user=Depends(get_current_user), +): + processschema_validation = await get_processschema_by_id(connection, processschema_id) + + if processschema_validation is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") + + authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id( + connection, current_user, processschema_validation.creator_id + ) + + if processschema_id is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") + + return processschema_validation + +@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema) +async def create_process_schema( + processschema: ProcessSchemaUpdate, + connection: AsyncConnection = Depends(get_connection_dep), + current_user=Depends(get_current_user), +): + user_validation = await get_user_by_login(connection, current_user) + processschema_validation = await get_processschema_by_title(connection, processschema.title) + + if processschema_validation is None: + await create_processschema(connection, processschema, user_validation.id) + processschema_new = await get_processschema_by_title(connection, processschema.title) + return processschema_new + + else: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="An process schema with this information already exists." + ) + + +@api_router.put("/{processschema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema) +async def update_process_schema( + processschema_id: int, + processschema_update: ProcessSchemaUpdate, + connection: AsyncConnection = Depends(get_connection_dep), + current_user=Depends(get_current_user), +): + processschema_validation = await get_processschema_by_id(connection, processschema_id) + + if processschema_validation is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") + + authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id( + connection, current_user, processschema_validation.creator_id + ) + + update_values = update_processschema_data_changes(processschema_update, processschema_validation) + + if update_values is None: + return processschema_validation + + processschema_update_data = ProcessSchema.model_validate({**processschema_validation.model_dump(), **update_values}) + + await update_processschema_by_id(connection, update_values, processschema_validation) + + processschema = await get_processschema_by_id(connection, processschema_id) + + return processschema + +@api_router.delete("/{processschema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema) +async def delete_process_schema( + processschema_id: int, + connection: AsyncConnection = Depends(get_connection_dep), + current_user=Depends(get_current_user), +): + processschema_validation = await get_processschema_by_id(connection, processschema_id) + + if processschema_validation is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") + + authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id( + connection, current_user, processschema_validation.creator_id + ) + + processschema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value) + + update_values = update_processschema_data_changes(processschema_update, processschema_validation) + + if update_values is None: + return processschema_validation + + await update_processschema_by_id(connection, update_values, processschema_validation) + + processschema = await get_processschema_by_id(connection, processschema_id) + + return processschema diff --git a/api/api/endpoints/profile.py b/api/api/endpoints/profile.py index 8f13a98..43ecf7c 100644 --- a/api/api/endpoints/profile.py +++ b/api/api/endpoints/profile.py @@ -1,11 +1,7 @@ from fastapi import ( APIRouter, - Body, Depends, - Form, HTTPException, - Request, - Response, status, ) diff --git a/api/api/schemas/endpoints/account_keyring.py b/api/api/schemas/endpoints/account_keyring.py index 382c9e8..49eab9e 100644 --- a/api/api/schemas/endpoints/account_keyring.py +++ b/api/api/schemas/endpoints/account_keyring.py @@ -1,9 +1,6 @@ -import datetime from typing import Optional from pydantic import Field -from datetime import datetime from api.db.tables.account import KeyType, KeyStatus - from api.schemas.base import Base diff --git a/api/api/schemas/endpoints/process_schema.py b/api/api/schemas/endpoints/process_schema.py new file mode 100644 index 0000000..3ba927c --- /dev/null +++ b/api/api/schemas/endpoints/process_schema.py @@ -0,0 +1,36 @@ +from pydantic import Field, TypeAdapter +from typing import Optional, Dict, Any, List +from datetime import datetime + + +from api.schemas.base import Base +from api.db.tables.process import ProcessStatus + + +class ProcessSchemaUpdate(Base): + title: Optional[str] = Field(None, max_length=100) + description: Optional[str] = None + owner_id: Optional[int] = None + settings: Optional[Dict[str, Any]] = None + status: Optional[ProcessStatus] = None + +class AllProcessSchema(Base): + id: int + title: str = Field(..., max_length=100) + description: str + owner_id: int + creator_id: int + created_at: datetime + settings: Dict[str, Any] + status: ProcessStatus + + +class AllProcessSchemaResponse(Base): + process_schema: List[AllProcessSchema] + amount_count: int + amount_pages: int + current_page: int + limit: int + + +all_process_schema_adapter = TypeAdapter(List[AllProcessSchema]) diff --git a/api/api/schemas/process/node_link.py b/api/api/schemas/process/node_link.py index f67a9f9..5ad9e1d 100644 --- a/api/api/schemas/process/node_link.py +++ b/api/api/schemas/process/node_link.py @@ -1,16 +1,9 @@ from pydantic import Field from typing import Dict, Any from datetime import datetime -from enum import Enum from api.schemas.base import Base - - -class Status(Enum): - ACTIVE = "Active" - STOPPING = "Stopping" - STOPPED = "Stopped" - DELETED = "Deleted" +from api.db.tables.process import NodeStatus class MyModel(Base): @@ -21,4 +14,4 @@ class MyModel(Base): settings: Dict[str, Any] creator_id: int created_at: datetime - status: Status + status: NodeStatus diff --git a/api/api/schemas/process/process_schema.py b/api/api/schemas/process/process_schema.py index 70f670b..364a624 100644 --- a/api/api/schemas/process/process_schema.py +++ b/api/api/schemas/process/process_schema.py @@ -1,17 +1,9 @@ from pydantic import Field from typing import Dict, Any from datetime import datetime -from enum import Enum from api.schemas.base import Base - - -class Status(Enum): - ACTIVE = "Active" - STOPPING = "Stopping" - STOPPED = "Stopped" - DELETED = "Deleted" - +from api.db.tables.process import ProcessStatus class ProcessSchema(Base): id: int @@ -21,4 +13,4 @@ class ProcessSchema(Base): creator_id: int created_at: datetime settings: Dict[str, Any] - status: Status + status: ProcessStatus diff --git a/api/api/schemas/process/ps_node.py b/api/api/schemas/process/ps_node.py index 0a47f38..3e75c6c 100644 --- a/api/api/schemas/process/ps_node.py +++ b/api/api/schemas/process/ps_node.py @@ -1,19 +1,8 @@ from datetime import datetime from typing import Dict, Any -from enum import Enum from api.schemas.base import Base - - -class NodeType(Enum): - pass - - -class Status(Enum): - ACTIVE = "Active" - DISABLED = "Disabled" - DELETED = "Deleted" - +from api.db.tables.process import NodeType,NodeStatus class Ps_Node(Base): id: int @@ -22,4 +11,4 @@ class Ps_Node(Base): settings: dict creator_id: Dict[str, Any] created_at: datetime - status: Status + status: NodeStatus diff --git a/api/api/services/update_data_validation.py b/api/api/services/update_data_validation.py index b6ebba7..7b114a9 100644 --- a/api/api/services/update_data_validation.py +++ b/api/api/services/update_data_validation.py @@ -6,7 +6,8 @@ from api.schemas.endpoints.account_keyring import AccountKeyringUpdate from api.db.tables.account import AccountRole, AccountStatus from api.schemas.endpoints.list_events import ListEventUpdate from api.db.tables.events import EventState, EventStatus - +from api.schemas.endpoints.process_schema import ProcessSchemaUpdate +from api.db.tables.process import ProcessStatus def update_user_data_changes(update_data: UserUpdate, user) -> Optional[dict]: """ @@ -108,3 +109,36 @@ def update_listevents_data_changes(update_data: ListEventUpdate, listevents) -> changes[field] = new_value return changes if changes else None + +def update_processschema_data_changes(update_data: ProcessSchemaUpdate, processschema) -> Optional[dict]: + """ + Сравнивает данные для обновления с текущими значениями processschema. + Возвращает: + - None, если нет изменений + - Словарь {поле: новое_значение} для измененных полей + """ + update_values = {} + changes = {} + + for field, value in update_data.model_dump(exclude_unset=True).items(): + if value is None: + continue + + if isinstance(value, (ProcessStatus)): + update_values[field] = value.value + else: + update_values[field] = value + + for field, new_value in update_values.items(): + if not hasattr(processschema, field): + continue + + current_value = getattr(processschema, field) + + if isinstance(current_value, Enum): + current_value = current_value.value + + if current_value != new_value: + changes[field] = new_value + + return changes if changes else None diff --git a/api/api/services/user_role_validation.py b/api/api/services/user_role_validation.py index 8414ae9..657bd7e 100644 --- a/api/api/services/user_role_validation.py +++ b/api/api/services/user_role_validation.py @@ -13,7 +13,7 @@ async def db_user_role_validation(connection, current_user): return authorize_user -async def db_user_role_validation_for_listevents_by_listevent_id( +async def db_user_role_validation_for_listevents_and_processschema_by_listevent_id( connection, current_user, current_listevents_creator_id ): authorize_user = await get_user_by_login(connection, current_user) @@ -23,7 +23,7 @@ async def db_user_role_validation_for_listevents_by_listevent_id( return authorize_user -async def db_user_role_validation_for_listevents(connection, current_user): +async def db_user_role_validation_for_listevents_and_processschema(connection, current_user): authorize_user = await get_user_by_login(connection, current_user) if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: return authorize_user, False