10 Commits

Author SHA1 Message Date
TheNoxium
d9e88d66f7 fix: rename list events 2025-08-04 16:15:35 +05:00
TheNoxium
3c300f7c7a fix: rename process schema 2025-08-02 22:05:29 +05:00
TheNoxium
c66c21eb14 feat:dto accaunt endpoint, dto list events endpoint 2025-08-02 21:42:06 +05:00
TheNoxium
a9ecaadad6 feat: porcess-schema endpoint DTO 2025-07-31 21:53:33 +05:00
TheNoxium
e50bb32470 fix:api name "process-schema" 2025-07-31 20:33:45 +05:00
TheNoxium
2030d54b2c fix: ruff format 2025-07-30 14:04:59 +05:00
TheNoxium
b82960faf3 fix: name,updated data, db data mapping 2025-07-29 22:57:31 +05:00
TheNoxium
e0887c240f feat: CRUD ProcessSchema 2025-07-27 22:17:36 +05:00
5e3c3b4672 Merge pull request 'VORKOUT-14: format api with ruff' (#15) from VORKOUT-14 into master
Reviewed-on: #15
2025-07-04 20:09:54 +05:00
d03600b23d Merge pull request 'feat: CRUD ListEvent' (#14) from VORKOUT-14 into master
Reviewed-on: #14
Reviewed-by: Vladislav Syrochkin <vlad.dev@heado.ru>
2025-07-04 18:06:03 +05:00
23 changed files with 1020 additions and 596 deletions

View File

@@ -3,44 +3,108 @@ from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from sqlalchemy import func, insert, select
from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.tables.account import account_table
from api.schemas.account.account import User
from api.schemas.endpoints.account import all_user_adapter, AllUser, AllUserResponse, UserCreate
from api.schemas.endpoints.account import all_user_adapter, AllUser, AllUserResponse, UserCreate, UserFilterDTO
async def get_user_accaunt_page(connection: AsyncConnection, page, limit) -> Optional[AllUserResponse]:
async def get_user_account_page_DTO(
connection: AsyncConnection, filter_dto: UserFilterDTO
) -> Optional[AllUserResponse]:
"""
Получает список ползовелей заданных значениями page, limit.
Получает список пользователей с пагинацией, фильтрацией и сортировкой через DTO объект.
Поддерживает:
- пагинацию
- поиск
- фильтрацию по полям
- сортировку
"""
first_user = page * limit - (limit)
page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit
query = (
select(
query = select(
account_table.c.id,
account_table.c.name,
account_table.c.login,
account_table.c.email,
account_table.c.bind_tenant_id,
account_table.c.role,
account_table.c.meta,
account_table.c.creator_id,
account_table.c.created_at,
account_table.c.status,
)
.order_by(account_table.c.id)
.offset(first_user)
.limit(limit)
# Поиск
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
query = query.where(
or_(
account_table.c.name.ilike(search_term),
account_table.c.login.ilike(search_term),
account_table.c.email.ilike(search_term),
)
)
# Фильтрацию
filter_conditions = []
if filter_dto.filters:
for field, values in filter_dto.filters.items():
column = getattr(account_table.c, field, None)
if column is not None and values:
if len(values) == 1:
filter_conditions.append(column == values[0])
else:
filter_conditions.append(column.in_(values))
if filter_conditions:
query = query.where(and_(*filter_conditions))
# Сортировка
if filter_dto.order:
order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get("direction", "asc")
column = getattr(account_table.c, order_field, None)
if column is not None:
if order_direction.lower() == "desc":
query = query.order_by(desc(column))
else:
query = query.order_by(asc(column))
else:
query = query.order_by(account_table.c.id)
query = query.offset(offset).limit(limit)
count_query = select(func.count()).select_from(account_table)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
count_query = count_query.where(
or_(
account_table.c.name.ilike(search_term),
account_table.c.login.ilike(search_term),
account_table.c.email.ilike(search_term),
)
)
if filter_conditions:
count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query)
count_result = await connection.execute(count_query)
users_data = result.mappings().all()
total_count = count_result.scalar()
if not total_count:
return None
total_pages = math.ceil(total_count / limit)
validated_users = all_user_adapter.validate_python(users_data)
@@ -66,7 +130,7 @@ async def get_user_by_id(connection: AsyncConnection, user_id: int) -> Optional[
if not user:
return None
return AllUser.model_validate(user)
return User.model_validate(user)
async def get_user_by_login(connection: AsyncConnection, login: str) -> Optional[User]:
@@ -76,20 +140,10 @@ async def get_user_by_login(connection: AsyncConnection, login: str) -> Optional
query = select(account_table).where(account_table.c.login == login)
user_db_cursor = await connection.execute(query)
user_db = user_db_cursor.one_or_none()
if not user_db:
user_data = user_db_cursor.mappings().one_or_none()
if not user_data:
return None
user_data = {
column.name: (
getattr(user_db, column.name).name
if isinstance(getattr(user_db, column.name), Enum)
else getattr(user_db, column.name)
)
for column in account_table.columns
}
return User.model_validate(user_data)

View File

@@ -18,20 +18,11 @@ async def get_key_by_id(connection: AsyncConnection, key_id: str) -> Optional[Ac
query = select(account_keyring_table).where(account_keyring_table.c.key_id == key_id)
user_db_cursor = await connection.execute(query)
user_db = user_db_cursor.one_or_none()
if not user_db:
user_data = user_db_cursor.mappings().one_or_none()
if not user_data:
return None
user_data = {
column.name: (
getattr(user_db, column.name).name
if isinstance(getattr(user_db, column.name), Enum)
else getattr(user_db, column.name)
)
for column in account_keyring_table.columns
}
return AccountKeyring.model_validate(user_data)

View File

@@ -0,0 +1,279 @@
from typing import Optional
import math
from datetime import datetime, timezone
from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.tables.events import list_events_table
from api.schemas.events.list_events import ListEvent
from api.schemas.endpoints.list_events import all_list_event_adapter, AllListEventResponse, ListEventFilterDTO
async def get_list_events_page_DTO(
connection: AsyncConnection, filter_dto: ListEventFilterDTO
) -> Optional[AllListEventResponse]:
"""
Получает список событий с фильтрацией через DTO объект.
Поддерживает:
- пагинацию
- полнотекстовый поиск (пропускает name при русских буквах)
- фильтрацию по полям
- сортировку
"""
page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit
query = select(
list_events_table.c.id,
list_events_table.c.name,
list_events_table.c.title,
list_events_table.c.creator_id,
list_events_table.c.created_at,
list_events_table.c.schema.label("schema_"),
list_events_table.c.state,
list_events_table.c.status,
)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
has_russian = any("\u0400" <= char <= "\u04ff" for char in filter_dto.search)
if has_russian:
query = query.where(list_events_table.c.title.ilike(search_term))
else:
query = query.where(
or_(list_events_table.c.title.ilike(search_term), list_events_table.c.name.ilike(search_term))
)
filter_conditions = []
if filter_dto.filters:
for field, values in filter_dto.filters.items():
column = getattr(list_events_table.c, field, None)
if column is not None and values:
if len(values) == 1:
filter_conditions.append(column == values[0])
else:
filter_conditions.append(column.in_(values))
if filter_conditions:
query = query.where(and_(*filter_conditions))
if filter_dto.order:
order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get("direction", "asc")
if order_field.startswith("schema."):
json_field = order_field[7:]
column = list_events_table.c.schema[json_field].astext
else:
column = getattr(list_events_table.c, order_field, None)
if column is not None:
if order_direction.lower() == "desc":
query = query.order_by(desc(column))
else:
query = query.order_by(asc(column))
else:
query = query.order_by(list_events_table.c.id)
query = query.offset(offset).limit(limit)
count_query = select(func.count()).select_from(list_events_table)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
has_russian = any("\u0400" <= char <= "\u04ff" for char in filter_dto.search)
if has_russian:
count_query = count_query.where(list_events_table.c.title.ilike(search_term))
else:
count_query = count_query.where(
or_(list_events_table.c.title.ilike(search_term), list_events_table.c.name.ilike(search_term))
)
if filter_conditions:
count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
if not total_count:
return None
total_pages = math.ceil(total_count / limit)
validated_events = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(
list_event=validated_events,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_list_events_page_by_creator_id(
connection: AsyncConnection, creator_id: int, page: int, limit: int
) -> Optional[AllListEventResponse]:
"""
Получает список событий заданного создателя по значениям page и limit и creator_id.
"""
first_event = page * limit - limit
query = (
select(
list_events_table.c.id,
list_events_table.c.name,
list_events_table.c.title,
list_events_table.c.creator_id,
list_events_table.c.created_at,
list_events_table.c.schema,
list_events_table.c.state,
list_events_table.c.status,
)
.where(list_events_table.c.creator_id == creator_id) # Фильтрация по creator_id
.order_by(list_events_table.c.id)
.offset(first_event)
.limit(limit)
)
count_query = (
select(func.count())
.select_from(list_events_table)
.where(list_events_table.c.creator_id == creator_id) # Фильтрация по creator_id
)
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
validated_list_event = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(
list_event=validated_list_event,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_list_events_page(connection: AsyncConnection, page, limit) -> Optional[AllListEventResponse]:
"""
Получает список событий заданного создателя по значениям page и limit.
"""
first_event = page * limit - (limit)
query = (
select(
list_events_table.c.id,
list_events_table.c.name,
list_events_table.c.title,
list_events_table.c.creator_id,
list_events_table.c.created_at,
list_events_table.c.schema,
list_events_table.c.state,
list_events_table.c.status,
)
.order_by(list_events_table.c.id)
.offset(first_event)
.limit(limit)
)
count_query = select(func.count()).select_from(list_events_table)
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
validated_list_event = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(
list_event=validated_list_event,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_list_events_by_name(connection: AsyncConnection, name: str) -> Optional[ListEvent]:
"""
Получает list events по name.
"""
query = select(list_events_table).where(list_events_table.c.name == name)
list_events_db_cursor = await connection.execute(query)
list_events_data = list_events_db_cursor.mappings().one_or_none()
if not list_events_data:
return None
return ListEvent.model_validate(list_events_data)
async def get_list_events_by_id(connection: AsyncConnection, id: int) -> Optional[ListEvent]:
"""
Получает listevent по id.
"""
query = select(list_events_table).where(list_events_table.c.id == id)
list_events_db_cursor = await connection.execute(query)
list_events_data = list_events_db_cursor.mappings().one_or_none()
if not list_events_data:
return None
return ListEvent.model_validate(list_events_data)
async def update_list_events_by_id(connection: AsyncConnection, update_values, list_events):
"""
Вносит изменеия в нужное поле таблицы list_events_table.
"""
await connection.execute(
list_events_table.update().where(list_events_table.c.id == list_events.id).values(**update_values)
)
await connection.commit()
async def create_list_events(
connection: AsyncConnection, list_events: ListEvent, creator_id: int
) -> Optional[ListEvent]:
"""
Создает нове поле в таблице list_events_table.
"""
query = insert(list_events_table).values(
name=list_events.name,
title=list_events.title, # добавлено поле title
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
schema=list_events.schema_, # добавлено поле schema
state=list_events.state.value, # добавлено поле state
status=list_events.status.value, # добавлено поле status
)
await connection.execute(query)
await connection.commit()
return list_events

View File

@@ -1,190 +0,0 @@
from typing import Optional
import math
from datetime import datetime, timezone
from sqlalchemy import insert, select, func
from sqlalchemy.ext.asyncio import AsyncConnection
from enum import Enum
from api.db.tables.events import list_events_table
from api.schemas.events.list_events import ListEvent
from api.schemas.endpoints.list_events import all_list_event_adapter, AllListEventResponse
async def get_listevents_page_by_creator_id(
connection: AsyncConnection, creator_id: int, page: int, limit: int
) -> Optional[AllListEventResponse]:
"""
Получает список событий заданного создателя по значениям page и limit и creator_id.
"""
first_event = page * limit - limit
query = (
select(
list_events_table.c.id,
list_events_table.c.name,
list_events_table.c.title,
list_events_table.c.creator_id,
list_events_table.c.created_at,
list_events_table.c.schema_,
list_events_table.c.state,
list_events_table.c.status,
)
.where(list_events_table.c.creator_id == creator_id) # Фильтрация по creator_id
.order_by(list_events_table.c.id)
.offset(first_event)
.limit(limit)
)
count_query = (
select(func.count())
.select_from(list_events_table)
.where(list_events_table.c.creator_id == creator_id) # Фильтрация по creator_id
)
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
validated_list_event = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(
list_event=validated_list_event,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_listevents_page(connection: AsyncConnection, page, limit) -> Optional[AllListEventResponse]:
"""
Получает список событий заданного создателя по значениям page и limit.
"""
first_event = page * limit - (limit)
query = (
select(
list_events_table.c.id,
list_events_table.c.name,
list_events_table.c.title,
list_events_table.c.creator_id,
list_events_table.c.created_at,
list_events_table.c.schema,
list_events_table.c.state,
list_events_table.c.status,
)
.order_by(list_events_table.c.id)
.offset(first_event)
.limit(limit)
)
count_query = select(func.count()).select_from(list_events_table)
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
total_pages = math.ceil(total_count / limit)
# Здесь предполагается, что all_list_event_adapter.validate_python корректно обрабатывает данные
validated_list_event = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(
list_event=validated_list_event,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_listevents_by_name(connection: AsyncConnection, name: str) -> Optional[ListEvent]:
"""
Получает list events по name.
"""
query = select(list_events_table).where(list_events_table.c.name == name)
listevents_db_cursor = await connection.execute(query)
listevents_db = listevents_db_cursor.one_or_none()
if not listevents_db:
return None
listevents_data = {
column.name: (
getattr(listevents_db, column.name).name
if isinstance(getattr(listevents_db, column.name), Enum)
else getattr(listevents_db, column.name)
)
for column in list_events_table.columns
}
return ListEvent.model_validate(listevents_data)
async def get_listevents_by_id(connection: AsyncConnection, id: int) -> Optional[ListEvent]:
"""
Получает listevent по id.
"""
query = select(list_events_table).where(list_events_table.c.id == id)
listevents_db_cursor = await connection.execute(query)
listevents_db = listevents_db_cursor.one_or_none()
if not listevents_db:
return None
listevents_data = {
column.name: (
getattr(listevents_db, column.name).name
if isinstance(getattr(listevents_db, column.name), Enum)
else getattr(listevents_db, column.name)
)
for column in list_events_table.columns
}
return ListEvent.model_validate(listevents_data)
async def update_listevents_by_id(connection: AsyncConnection, update_values, listevents):
"""
Вносит изменеия в нужное поле таблицы list_events_table.
"""
await connection.execute(
list_events_table.update().where(list_events_table.c.id == listevents.id).values(**update_values)
)
await connection.commit()
async def create_listevents(connection: AsyncConnection, listevents: ListEvent, creator_id: int) -> Optional[ListEvent]:
"""
Создает нове поле в таблице list_events_table.
"""
query = insert(list_events_table).values(
name=listevents.name,
title=listevents.title, # добавлено поле title
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
schema=listevents.schema_, # добавлено поле schema
state=listevents.state.value, # добавлено поле state
status=listevents.status.value, # добавлено поле status
)
await connection.execute(query)
await connection.commit()
return listevents

View File

@@ -0,0 +1,175 @@
from typing import Optional
import math
from datetime import datetime, timezone
from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.tables.process import process_schema_table
from api.schemas.process.process_schema import ProcessSchema
from api.schemas.endpoints.process_schema import (
all_process_schema_adapter,
AllProcessSchemaResponse,
ProcessSchemaFilterDTO,
)
async def get_process_schema_page_DTO(
connection: AsyncConnection, filter_dto: ProcessSchemaFilterDTO
) -> Optional[AllProcessSchemaResponse]:
"""
Получает список схем процессов с комплексной фильтрацией через DTO объект.
Поддерживает:
- пагинацию
- поиск
- фильтрацию по полям
- сортировку
"""
page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit
query = select(
process_schema_table.c.id,
process_schema_table.c.title,
process_schema_table.c.description,
process_schema_table.c.owner_id,
process_schema_table.c.creator_id,
process_schema_table.c.created_at,
process_schema_table.c.settings,
process_schema_table.c.status,
)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
query = query.where(
or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term))
)
if filter_dto.filters:
filter_conditions = []
for field, values in filter_dto.filters.items():
column = getattr(process_schema_table.c, field, None)
if column is not None and values:
if len(values) == 1:
filter_conditions.append(column == values[0])
else:
filter_conditions.append(column.in_(values))
if filter_conditions:
query = query.where(and_(*filter_conditions))
if filter_dto.order:
order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get("direction", "asc")
column = getattr(process_schema_table.c, order_field, None)
if column is not None:
if order_direction.lower() == "desc":
query = query.order_by(desc(column))
else:
query = query.order_by(asc(column))
else:
query = query.order_by(process_schema_table.c.id)
query = query.offset(offset).limit(limit)
count_query = select(func.count()).select_from(process_schema_table)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
count_query = count_query.where(
or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term))
)
if filter_dto.filters and filter_conditions:
count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
if not total_count:
return None
total_pages = math.ceil(total_count / limit)
validated_process_schema = all_process_schema_adapter.validate_python(events_data)
return AllProcessSchemaResponse(
process_schema=validated_process_schema,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_process_schema_by_title(connection: AsyncConnection, title: str) -> Optional[ProcessSchema]:
"""
Получает process schema по title.
"""
query = select(process_schema_table).where(process_schema_table.c.title == title)
process_schema_db_cursor = await connection.execute(query)
process_schema_data = process_schema_db_cursor.mappings().one_or_none()
if not process_schema_data:
return None
return ProcessSchema.model_validate(process_schema_data)
async def get_process_schema_by_id(connection: AsyncConnection, id: int) -> Optional[ProcessSchema]:
"""
Получает process_schema по id.
"""
query = select(process_schema_table).where(process_schema_table.c.id == id)
process_schema_db_cursor = await connection.execute(query)
process_schema_data = process_schema_db_cursor.mappings().one_or_none()
if not process_schema_data:
return None
return ProcessSchema.model_validate(process_schema_data)
async def update_process_schema_by_id(connection: AsyncConnection, update_values, process_schema):
"""
Вносит изменеия в нужное поле таблицы process_schema_table.
"""
await connection.execute(
process_schema_table.update().where(process_schema_table.c.id == process_schema.id).values(**update_values)
)
await connection.commit()
async def create_process_schema(
connection: AsyncConnection, process_schema: ProcessSchema, creator_id: int
) -> Optional[ProcessSchema]:
"""
Создает нове поле в таблице process_schema_table.
"""
query = insert(process_schema_table).values(
title=process_schema.title,
description=process_schema.description,
owner_id=process_schema.owner_id,
creator_id=creator_id,
created_at=datetime.now(timezone.utc),
settings=process_schema.settings,
status=process_schema.status.value,
)
await connection.execute(query)
await connection.commit()
return process_schema

View File

@@ -1,8 +1,7 @@
import enum
from sqlalchemy import Table, Column, Integer, String, Enum as SQLAEnum, JSON, ForeignKey, DateTime, Index
from sqlalchemy import Table, Column, String, Enum as SQLAEnum, JSON, ForeignKey, DateTime
from sqlalchemy.sql import func
from enum import Enum, auto
from api.db.sql_types import UnsignedInt

View File

@@ -3,7 +3,6 @@ import enum
from sqlalchemy import (
Table,
Column,
Integer,
String,
Text,
Enum as SQLAEnum,
@@ -14,7 +13,7 @@ from sqlalchemy import (
PrimaryKeyConstraint,
)
from sqlalchemy.sql import func
from enum import Enum, auto
from enum import Enum
from api.db.sql_types import UnsignedInt

View File

@@ -2,9 +2,10 @@ from api.endpoints.auth import api_router as auth_router
from api.endpoints.profile import api_router as profile_router
from api.endpoints.account import api_router as account_router
from api.endpoints.keyring import api_router as keyring_router
from api.endpoints.listevents import api_router as listevents_router
from api.endpoints.list_events import api_router as listevents_router
from api.endpoints.process_schema import api_router as processschema_router
list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router]
list_of_routes = [auth_router, profile_router, account_router, keyring_router, listevents_router, processschema_router]
__all__ = [
"list_of_routes",

View File

@@ -1,15 +1,12 @@
from fastapi import (
APIRouter,
Depends,
HTTPException,
status,
)
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.db.logic.account import (
create_user,
get_user_accaunt_page,
get_user_account_page_DTO,
get_user_by_id,
get_user_by_login,
update_user_by_id,
@@ -18,9 +15,8 @@ from api.db.logic.keyring import create_password_key, update_password_key
from api.db.tables.account import AccountStatus
from api.schemas.account.account import User
from api.schemas.base import bearer_schema
from api.schemas.endpoints.account import AllUser, AllUserResponse, UserCreate, UserUpdate
from api.schemas.endpoints.account import AllUserResponse, UserCreate, UserUpdate, UserFilterDTO
from api.services.auth import get_current_user
from api.services.update_data_validation import update_user_data_changes
from api.services.user_role_validation import db_user_role_validation
api_router = APIRouter(
@@ -31,14 +27,33 @@ api_router = APIRouter(
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllUserResponse)
async def get_all_account(
page: int = 1,
limit: int = 10,
page: int = Query(1, description="Page number", gt=0),
limit: int = Query(10, description="КNumber of items per page", gt=0),
search: Optional[str] = Query(None, description="Search term to filter by name or login or email"),
status_filter: Optional[List[str]] = Query(None, description="Filter by status"),
role_filter: Optional[List[str]] = Query(None, description="Filter by role"),
creator_id: Optional[int] = Query(None, description="Filter by creator id"),
order_field: Optional[str] = Query("id", description="Field to sort by"),
order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"),
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
user_list = await get_user_accaunt_page(connection, page, limit)
filters = {
**({"status": status_filter} if status_filter else {}),
**({"role": role_filter} if role_filter else {}),
**({"creator_id": [str(creator_id)]} if creator_id else {}),
}
filter_dto = UserFilterDTO(
pagination={"page": page, "limit": limit},
search=search,
order={"field": order_field, "direction": order_direction},
filters=filters if filters else None,
)
user_list = await get_user_account_page_DTO(connection, filter_dto)
if user_list is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Accounts not found")
@@ -46,7 +61,7 @@ async def get_all_account(
return user_list
@api_router.get("/{user_id}", dependencies=[Depends(bearer_schema)], response_model=UserUpdate)
@api_router.get("/{user_id}", dependencies=[Depends(bearer_schema)], response_model=User)
async def get_account(
user_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
@@ -62,7 +77,7 @@ async def get_account(
return user
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=AllUser)
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=User)
async def create_account(
user: UserCreate,
connection: AsyncConnection = Depends(get_connection_dep),
@@ -98,14 +113,12 @@ async def update_account(
if user_update.password is not None:
await update_password_key(connection, user.id, user_update.password)
update_values = update_user_data_changes(user_update, user)
updated_values = user_update.model_dump(by_alias=True, exclude_none=True)
if update_values is None:
if not updated_values:
return user
user_update_data = UserUpdate.model_validate({**user.model_dump(), **update_values})
await update_user_by_id(connection, update_values, user)
await update_user_by_id(connection, updated_values, user)
user = await get_user_by_id(connection, user_id)
@@ -126,12 +139,12 @@ async def delete_account(
user_update = UserUpdate(status=AccountStatus.DELETED.value)
update_values = update_user_data_changes(user_update, user)
updated_values = user_update.model_dump(by_alias=True, exclude_none=True)
if update_values is None:
if not updated_values:
return user
await update_user_by_id(connection, update_values, user)
await update_user_by_id(connection, updated_values, user)
user = await get_user_by_id(connection, user_id)

View File

@@ -24,7 +24,6 @@ from api.schemas.account.account_keyring import AccountKeyring
from api.services.auth import get_current_user
from api.services.user_role_validation import db_user_role_validation
from api.services.update_data_validation import update_key_data_changes
api_router = APIRouter(
@@ -87,14 +86,12 @@ async def update_keyring(
if keyring is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="keyring not found")
update_values = update_key_data_changes(keyring_update, keyring)
updated_values = keyring_update.model_dump(by_alias=True, exclude_none=True)
if update_values is None:
if not updated_values:
return keyring
keyring_update_data = AccountKeyring.model_validate({**keyring.model_dump(), **update_values})
await update_key_by_id(connection, update_values, keyring)
await update_key_by_id(connection, updated_values, keyring)
keyring = await get_key_by_id(connection, key_id)
@@ -116,12 +113,12 @@ async def delete_keyring(
keyring_update = AccountKeyringUpdate(status=KeyStatus.DELETED.value)
update_values = update_key_data_changes(keyring_update, keyring)
updated_values = keyring_update.model_dump(by_alias=True, exclude_none=True)
if update_values is None:
if not updated_values:
return keyring
await update_key_by_id(connection, update_values, keyring)
await update_key_by_id(connection, updated_values, keyring)
keyring = await get_key_by_id(connection, key_id)

View File

@@ -0,0 +1,179 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.db.logic.account import get_user_by_login
from api.db.logic.list_events import (
get_list_events_by_name,
get_list_events_by_id,
create_list_events,
update_list_events_by_id,
get_list_events_page_DTO,
)
from api.schemas.events.list_events import ListEvent
from api.db.tables.events import EventStatus
from api.schemas.base import bearer_schema
from api.schemas.endpoints.list_events import ListEventUpdate, AllListEventResponse, ListEventFilterDTO
from api.services.auth import get_current_user
from api.services.user_role_validation import (
db_user_role_validation_for_list_events_and_process_schema_by_list_event_id,
db_user_role_validation_for_list_events_and_process_schema,
)
api_router = APIRouter(
prefix="/list_events",
tags=["list events"],
)
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllListEventResponse)
async def get_all_list_events(
page: int = Query(1, description="Page number", gt=0),
limit: int = Query(10, description="Number of items per page", gt=0),
search: Optional[str] = Query(None, description="Search term to filter by title or name"),
order_field: Optional[str] = Query("id", description="Field to sort by"),
order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"),
status_filter: Optional[List[str]] = Query(None, description="Filter by status"),
state_filter: Optional[List[str]] = Query(None, description="Filter by state"),
creator_id: Optional[int] = Query(None, description="Filter by creator id"),
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
filters = {
**({"status": status_filter} if status_filter else {}),
**({"state": state_filter} if state_filter else {}),
**({"creator_id": [str(creator_id)]} if creator_id else {}),
}
filter_dto = ListEventFilterDTO(
pagination={"page": page, "limit": limit},
search=search,
order={"field": order_field, "direction": order_direction},
filters=filters if filters else None,
)
authorize_user, page_flag = await db_user_role_validation_for_list_events_and_process_schema(
connection, current_user
)
if not page_flag:
if filter_dto.filters is None:
filter_dto.filters = {}
filter_dto.filters["creator_id"] = [str(authorize_user.id)]
list_events_page = await get_list_events_page_DTO(connection, filter_dto)
if list_events_page is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
return list_events_page
@api_router.get("/{list_events_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def get_list_events(
list_events_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
list_events_validation = await get_list_events_by_id(connection, list_events_id)
if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, list_events_validation.creator_id
)
if list_events_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
return list_events_validation
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def create_list_events(
list_events: ListEventUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
user_validation = await get_user_by_login(connection, current_user)
list_events_validation = await get_list_events_by_name(connection, list_events.name)
if list_events_validation is None:
await create_list_events(connection, list_events, user_validation.id)
list_events_new = await get_list_events_by_name(connection, list_events.name)
return list_events_new
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="An List events with this information already exists."
)
@api_router.put("/{list_events_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def update_list_events(
list_events_id: int,
list_events_update: ListEventUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
list_events_validation = await get_list_events_by_id(connection, list_events_id)
if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, list_events_validation.creator_id
)
updated_values = list_events_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return list_events_validation
await update_list_events_by_id(connection, updated_values, list_events_validation)
list_events = await get_list_events_by_id(connection, list_events_id)
return list_events
@api_router.delete("/{list_events_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def delete_list_events(
list_events_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
list_events_validation = await get_list_events_by_id(connection, list_events_id)
if list_events_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, list_events_validation.creator_id
)
list_events_update = ListEventUpdate(status=EventStatus.DELETED.value)
updated_values = list_events_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return list_events_validation
await update_list_events_by_id(connection, updated_values, list_events_validation)
list_events = await get_list_events_by_id(connection, list_events_id)
return list_events

View File

@@ -1,169 +0,0 @@
from fastapi import (
APIRouter,
Depends,
HTTPException,
status,
)
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.db.logic.account import get_user_by_login
from api.db.logic.listevents import (
get_listevents_by_name,
get_listevents_by_id,
create_listevents,
update_listevents_by_id,
get_listevents_page,
get_listevents_page_by_creator_id,
)
from api.schemas.events.list_events import ListEvent
from api.db.tables.events import EventStatus
from api.schemas.base import bearer_schema
from api.schemas.endpoints.list_events import ListEventUpdate, AllListEventResponse
from api.services.auth import get_current_user
from api.services.user_role_validation import (
db_user_role_validation_for_listevents_by_listevent_id,
db_user_role_validation_for_listevents,
)
from api.services.update_data_validation import update_listevents_data_changes
api_router = APIRouter(
prefix="/listevents",
tags=["list events"],
)
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllListEventResponse)
async def get_all_list_events(
page: int = 1,
limit: int = 10,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user, page_flag = await db_user_role_validation_for_listevents(connection, current_user)
if page_flag:
list_eventslist = await get_listevents_page(connection, page, limit)
print(list_eventslist)
if list_eventslist is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
return list_eventslist
else:
list_events_list = await get_listevents_page_by_creator_id(connection, authorize_user.id, page, limit)
if list_events_list is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
return list_events_list
@api_router.get("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def get_list_events(
listevents_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
if listevents_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
return listevents_validation
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def create_list_events(
listevents: ListEventUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
user_validation = await get_user_by_login(connection, current_user)
listevents_validation = await get_listevents_by_name(connection, listevents.name)
if listevents_validation is None:
await create_listevents(connection, listevents, user_validation.id)
listevents_new = await get_listevents_by_name(connection, listevents.name)
return listevents_new
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="An List events with this information already exists."
)
@api_router.put("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def update_listevents(
listevents_id: int,
listevents_update: ListEventUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
update_values = update_listevents_data_changes(listevents_update, listevents_validation)
if update_values is None:
return listevents_validation
listevents_update_data = ListEvent.model_validate({**listevents_validation.model_dump(), **update_values})
await update_listevents_by_id(connection, update_values, listevents_validation)
listevents = await get_listevents_by_id(connection, listevents_id)
return listevents
@api_router.delete("/{listevents_id}", dependencies=[Depends(bearer_schema)], response_model=ListEvent)
async def delete_list_events(
listevents_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
listevents_validation = await get_listevents_by_id(connection, listevents_id)
if listevents_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
authorize_user = await db_user_role_validation_for_listevents_by_listevent_id(
connection, current_user, listevents_validation.creator_id
)
listevents_update = ListEventUpdate(status=EventStatus.DELETED.value)
update_values = update_listevents_data_changes(listevents_update, listevents_validation)
if update_values is None:
return listevents_validation
await update_listevents_by_id(connection, update_values, listevents_validation)
listevents = await get_listevents_by_id(connection, listevents_id)
return listevents

View File

@@ -0,0 +1,178 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.db.logic.account import get_user_by_login
from api.db.logic.process_schema import (
get_process_schema_by_title,
create_process_schema,
get_process_schema_by_id,
update_process_schema_by_id,
get_process_schema_page_DTO,
)
from api.schemas.process.process_schema import ProcessSchema
from api.db.tables.process import ProcessStatus
from api.schemas.base import bearer_schema
from api.schemas.endpoints.process_schema import ProcessSchemaUpdate, AllProcessSchemaResponse, ProcessSchemaFilterDTO
from api.services.auth import get_current_user
from api.services.user_role_validation import (
db_user_role_validation_for_list_events_and_process_schema_by_list_event_id,
db_user_role_validation_for_list_events_and_process_schema,
)
api_router = APIRouter(
prefix="/process_schema",
tags=["process schema"],
)
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllProcessSchemaResponse)
async def get_all_process_schema(
page: int = Query(1, description="Page number", gt=0),
limit: int = Query(10, description="Number of items per page", gt=0),
search: Optional[str] = Query(None, description="Search term to filter by title or description"),
order_field: Optional[str] = Query("id", description="Field to sort by"),
order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"),
status_filter: Optional[List[str]] = Query(None, description="Filter by status"),
owner_id: Optional[List[str]] = Query(None, description="Filter by owner id"),
connection: AsyncConnection = Depends(get_connection_dep),
creator_id: Optional[int] = Query(None, description="Filter by creator id"),
current_user=Depends(get_current_user),
):
filters = {
**({"status": status_filter} if status_filter else {}),
**({"owner_id": owner_id} if owner_id else {}),
**({"creator_id": [str(creator_id)]} if creator_id else {}),
}
filter_dto = ProcessSchemaFilterDTO(
pagination={"page": page, "limit": limit},
search=search,
order={"field": order_field, "direction": order_direction},
filters=filters if filters else None,
)
authorize_user, page_flag = await db_user_role_validation_for_list_events_and_process_schema(
connection, current_user
)
if not page_flag:
if filter_dto.filters is None:
filter_dto.filters = {}
filter_dto.filters["creator_id"] = [str(authorize_user.id)]
process_schema_page = await get_process_schema_page_DTO(connection, filter_dto)
if process_schema_page is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
return process_schema_page
@api_router.get("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def get_process_schema(
process_schema_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
process_schema_validation = await get_process_schema_by_id(connection, process_schema_id)
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, process_schema_validation.creator_id
)
if process_schema_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
return process_schema_validation
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def create_processschema(
process_schema: ProcessSchemaUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
user_validation = await get_user_by_login(connection, current_user)
process_schema_validation = await get_process_schema_by_title(connection, process_schema.title)
if process_schema_validation is None:
await create_process_schema(connection, process_schema, user_validation.id)
process_schema_new = await get_process_schema_by_title(connection, process_schema.title)
return process_schema_new
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="An process schema with this information already exists."
)
@api_router.put("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def update_process_schema(
process_schema_id: int,
process_schema_update: ProcessSchemaUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
process_schema_validation = await get_process_schema_by_id(connection, process_schema_id)
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, process_schema_validation.creator_id
)
updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return process_schema_validation
await update_process_schema_by_id(connection, updated_values, process_schema_validation)
process_schema = await get_process_schema_by_id(connection, process_schema_id)
return process_schema
@api_router.delete("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def delete_process_schema(
process_schema_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
process_schema_validation = await get_process_schema_by_id(connection, process_schema_id)
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, process_schema_validation.creator_id
)
process_schema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value)
updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return process_schema_validation
await update_process_schema_by_id(connection, updated_values, process_schema_validation)
process_schema = await get_process_schema_by_id(connection, process_schema_id)
return process_schema

View File

@@ -1,11 +1,7 @@
from fastapi import (
APIRouter,
Body,
Depends,
Form,
HTTPException,
Request,
Response,
status,
)
@@ -16,7 +12,6 @@ from api.db.connection.session import get_connection_dep
from api.db.logic.account import get_user_by_id, update_user_by_id, get_user_by_login
from api.schemas.base import bearer_schema
from api.services.auth import get_current_user
from api.services.update_data_validation import update_user_data_changes
from api.schemas.endpoints.account import UserUpdate
from api.schemas.account.account import User
@@ -42,7 +37,7 @@ async def get_profile(
@api_router.put("", dependencies=[Depends(bearer_schema)], response_model=User)
async def update_profile(
user_updata: UserUpdate,
user_update: UserUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
@@ -50,13 +45,13 @@ async def update_profile(
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Account not found")
if user_updata.role == None and user_updata.login == None:
update_values = update_user_data_changes(user_updata, user)
if user_update.role is None and user_update.login is None:
updated_values = user_update.model_dump(by_alias=True, exclude_none=True)
if update_values is None:
if updated_values is None:
return user
await update_user_by_id(connection, update_values, user)
await update_user_by_id(connection, updated_values, user)
user = await get_user_by_id(connection, user.id)

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Dict
from pydantic import EmailStr, Field, TypeAdapter
@@ -36,6 +36,8 @@ class AllUser(Base):
email: Optional[EmailStr] = None
bind_tenant_id: Optional[str] = None
role: AccountRole
meta: Optional[dict] = None
creator_id: Optional[int] = None
created_at: datetime
status: AccountStatus
@@ -49,3 +51,10 @@ class AllUserResponse(Base):
all_user_adapter = TypeAdapter(List[AllUser])
class UserFilterDTO(Base):
pagination: Dict[str, int]
search: Optional[str] = None
order: Optional[Dict[str, str]] = None
filters: Optional[Dict[str, List[str]]] = None

View File

@@ -1,9 +1,6 @@
import datetime
from typing import Optional
from pydantic import Field
from datetime import datetime
from api.db.tables.account import KeyType, KeyStatus
from api.schemas.base import Base

View File

@@ -35,3 +35,10 @@ class AllListEventResponse(Base):
all_list_event_adapter = TypeAdapter(List[AllListEvent])
class ListEventFilterDTO(Base):
pagination: Dict[str, int]
search: Optional[str] = None
order: Optional[Dict[str, str]] = None
filters: Optional[Dict[str, List[str]]] = None

View File

@@ -0,0 +1,44 @@
from pydantic import Field, TypeAdapter
from typing import Optional, Dict, Any, List
from datetime import datetime
from api.schemas.base import Base
from api.db.tables.process import ProcessStatus
class ProcessSchemaUpdate(Base):
title: Optional[str] = Field(None, max_length=100)
description: Optional[str] = None
owner_id: Optional[int] = None
settings: Optional[Dict[str, Any]] = None
status: Optional[ProcessStatus] = None
class AllProcessSchema(Base):
id: int
title: str = Field(..., max_length=100)
description: str
owner_id: int
creator_id: int
created_at: datetime
settings: Dict[str, Any]
status: ProcessStatus
class AllProcessSchemaResponse(Base):
process_schema: List[AllProcessSchema]
amount_count: int
amount_pages: int
current_page: int
limit: int
all_process_schema_adapter = TypeAdapter(List[AllProcessSchema])
class ProcessSchemaFilterDTO(Base):
pagination: Dict[str, int]
search: Optional[str] = None
order: Optional[Dict[str, str]] = None
filters: Optional[Dict[str, List[str]]] = None

View File

@@ -1,16 +1,9 @@
from pydantic import Field
from typing import Dict, Any
from datetime import datetime
from enum import Enum
from api.schemas.base import Base
class Status(Enum):
ACTIVE = "Active"
STOPPING = "Stopping"
STOPPED = "Stopped"
DELETED = "Deleted"
from api.db.tables.process import NodeStatus
class MyModel(Base):
@@ -21,4 +14,4 @@ class MyModel(Base):
settings: Dict[str, Any]
creator_id: int
created_at: datetime
status: Status
status: NodeStatus

View File

@@ -1,16 +1,9 @@
from pydantic import Field
from typing import Dict, Any
from datetime import datetime
from enum import Enum
from api.schemas.base import Base
class Status(Enum):
ACTIVE = "Active"
STOPPING = "Stopping"
STOPPED = "Stopped"
DELETED = "Deleted"
from api.db.tables.process import ProcessStatus
class ProcessSchema(Base):
@@ -21,4 +14,4 @@ class ProcessSchema(Base):
creator_id: int
created_at: datetime
settings: Dict[str, Any]
status: Status
status: ProcessStatus

View File

@@ -1,18 +1,8 @@
from datetime import datetime
from typing import Dict, Any
from enum import Enum
from api.schemas.base import Base
class NodeType(Enum):
pass
class Status(Enum):
ACTIVE = "Active"
DISABLED = "Disabled"
DELETED = "Deleted"
from api.db.tables.process import NodeType, NodeStatus
class Ps_Node(Base):
@@ -22,4 +12,4 @@ class Ps_Node(Base):
settings: dict
creator_id: Dict[str, Any]
created_at: datetime
status: Status
status: NodeStatus

View File

@@ -1,110 +0,0 @@
from enum import Enum
from typing import Optional
from api.schemas.endpoints.account import UserUpdate
from api.db.tables.account import KeyType, KeyStatus
from api.schemas.endpoints.account_keyring import AccountKeyringUpdate
from api.db.tables.account import AccountRole, AccountStatus
from api.schemas.endpoints.list_events import ListEventUpdate
from api.db.tables.events import EventState, EventStatus
def update_user_data_changes(update_data: UserUpdate, user) -> Optional[dict]:
"""
Сравнивает данные для обновления с текущими значениями пользователя.
Возвращает:
- None, если нет изменений
- Словарь {поле: новое_значение} для измененных полей
"""
update_values = {}
changes = {}
for field, value in update_data.model_dump(exclude_unset=True).items():
if value is None:
continue
if isinstance(value, (AccountRole, AccountStatus)):
update_values[field] = value.value
else:
update_values[field] = value
for field, new_value in update_values.items():
if not hasattr(user, field):
continue
current_value = getattr(user, field)
if isinstance(current_value, Enum):
current_value = current_value.value
if current_value != new_value:
changes[field] = new_value
return changes if changes else None
def update_key_data_changes(update_data: AccountKeyringUpdate, key) -> Optional[dict]:
"""
Сравнивает данные для обновления с текущими значениями пользователя.
Возвращает:
- None, если нет изменений
- Словарь {поле: новое_значение} для измененных полей
"""
update_values = {}
changes = {}
for field, value in update_data.model_dump(exclude_unset=True).items():
if value is None:
continue
if isinstance(value, (KeyType, KeyStatus)):
update_values[field] = value.value
else:
update_values[field] = value
for field, new_value in update_values.items():
if not hasattr(key, field):
continue
current_value = getattr(key, field)
if isinstance(current_value, Enum):
current_value = current_value.value
if current_value != new_value:
changes[field] = new_value
return changes if changes else None
def update_listevents_data_changes(update_data: ListEventUpdate, listevents) -> Optional[dict]:
"""
Сравнивает данные для обновления с текущими значениями listevents.
Возвращает:
- None, если нет изменений
- Словарь {поле: новое_значение} для измененных полей
"""
update_values = {}
changes = {}
for field, value in update_data.model_dump(exclude_unset=True).items():
if value is None:
continue
if isinstance(value, (EventState, EventStatus)):
update_values[field] = value.value
else:
update_values[field] = value
for field, new_value in update_values.items():
if not hasattr(listevents, field):
continue
current_value = getattr(listevents, field)
if isinstance(current_value, Enum):
current_value = current_value.value
if current_value != new_value:
changes[field] = new_value
return changes if changes else None

View File

@@ -13,7 +13,7 @@ async def db_user_role_validation(connection, current_user):
return authorize_user
async def db_user_role_validation_for_listevents_by_listevent_id(
async def db_user_role_validation_for_list_events_and_process_schema_by_list_event_id(
connection, current_user, current_listevents_creator_id
):
authorize_user = await get_user_by_login(connection, current_user)
@@ -23,7 +23,7 @@ async def db_user_role_validation_for_listevents_by_listevent_id(
return authorize_user
async def db_user_role_validation_for_listevents(connection, current_user):
async def db_user_role_validation_for_list_events_and_process_schema(connection, current_user):
authorize_user = await get_user_by_login(connection, current_user)
if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}:
return authorize_user, False