feat:dto accaunt endpoint, dto list events endpoint

This commit is contained in:
TheNoxium
2025-08-02 21:42:06 +05:00
parent a9ecaadad6
commit c66c21eb14
9 changed files with 309 additions and 98 deletions

View File

@@ -3,44 +3,108 @@ from datetime import datetime, timezone
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional
from sqlalchemy import func, insert, select from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.tables.account import account_table from api.db.tables.account import account_table
from api.schemas.account.account import User from api.schemas.account.account import User
from api.schemas.endpoints.account import all_user_adapter, AllUser, AllUserResponse, UserCreate from api.schemas.endpoints.account import all_user_adapter, AllUser, AllUserResponse, UserCreate, UserFilterDTO
async def get_user_accaunt_page(connection: AsyncConnection, page, limit) -> Optional[AllUserResponse]: async def get_user_account_page_DTO(
connection: AsyncConnection, filter_dto: UserFilterDTO
) -> Optional[AllUserResponse]:
""" """
Получает список ползовелей заданных значениями page, limit. Получает список пользователей с пагинацией, фильтрацией и сортировкой через DTO объект.
Поддерживает:
- пагинацию
- поиск
- фильтрацию по полям
- сортировку
""" """
first_user = page * limit - (limit) page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit
query = ( query = select(
select(
account_table.c.id, account_table.c.id,
account_table.c.name, account_table.c.name,
account_table.c.login, account_table.c.login,
account_table.c.email, account_table.c.email,
account_table.c.bind_tenant_id, account_table.c.bind_tenant_id,
account_table.c.role, account_table.c.role,
account_table.c.meta,
account_table.c.creator_id,
account_table.c.created_at, account_table.c.created_at,
account_table.c.status, account_table.c.status,
) )
.order_by(account_table.c.id)
.offset(first_user) # Поиск
.limit(limit) if filter_dto.search:
search_term = f"%{filter_dto.search}%"
query = query.where(
or_(
account_table.c.name.ilike(search_term),
account_table.c.login.ilike(search_term),
account_table.c.email.ilike(search_term),
)
) )
# Фильтрацию
filter_conditions = []
if filter_dto.filters:
for field, values in filter_dto.filters.items():
column = getattr(account_table.c, field, None)
if column is not None and values:
if len(values) == 1:
filter_conditions.append(column == values[0])
else:
filter_conditions.append(column.in_(values))
if filter_conditions:
query = query.where(and_(*filter_conditions))
# Сортировка
if filter_dto.order:
order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get("direction", "asc")
column = getattr(account_table.c, order_field, None)
if column is not None:
if order_direction.lower() == "desc":
query = query.order_by(desc(column))
else:
query = query.order_by(asc(column))
else:
query = query.order_by(account_table.c.id)
query = query.offset(offset).limit(limit)
count_query = select(func.count()).select_from(account_table) count_query = select(func.count()).select_from(account_table)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
count_query = count_query.where(
or_(
account_table.c.name.ilike(search_term),
account_table.c.login.ilike(search_term),
account_table.c.email.ilike(search_term),
)
)
if filter_conditions:
count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query) result = await connection.execute(query)
count_result = await connection.execute(count_query) count_result = await connection.execute(count_query)
users_data = result.mappings().all() users_data = result.mappings().all()
total_count = count_result.scalar() total_count = count_result.scalar()
if not total_count:
return None
total_pages = math.ceil(total_count / limit) total_pages = math.ceil(total_count / limit)
validated_users = all_user_adapter.validate_python(users_data) validated_users = all_user_adapter.validate_python(users_data)
@@ -66,7 +130,7 @@ async def get_user_by_id(connection: AsyncConnection, user_id: int) -> Optional[
if not user: if not user:
return None return None
return AllUser.model_validate(user) return User.model_validate(user)
async def get_user_by_login(connection: AsyncConnection, login: str) -> Optional[User]: async def get_user_by_login(connection: AsyncConnection, login: str) -> Optional[User]:

View File

@@ -3,17 +3,122 @@ import math
from datetime import datetime, timezone from datetime import datetime, timezone
from sqlalchemy import insert, select, func
from sqlalchemy import insert, select, func, or_, and_, asc, desc
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
from enum import Enum
from api.db.tables.events import list_events_table from api.db.tables.events import list_events_table
from api.schemas.events.list_events import ListEvent from api.schemas.events.list_events import ListEvent
from api.schemas.endpoints.list_events import all_list_event_adapter, AllListEventResponse, ListEventFilterDTO
from api.schemas.endpoints.list_events import all_list_event_adapter, AllListEventResponse
async def get_listevents_page_DTO(
connection: AsyncConnection, filter_dto: ListEventFilterDTO
) -> Optional[AllListEventResponse]:
"""
Получает список событий с фильтрацией через DTO объект.
Поддерживает:
- пагинацию
- полнотекстовый поиск (пропускает name при русских буквах)
- фильтрацию по полям
- сортировку
"""
page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit
query = select(
list_events_table.c.id,
list_events_table.c.name,
list_events_table.c.title,
list_events_table.c.creator_id,
list_events_table.c.created_at,
list_events_table.c.schema.label("schema_"),
list_events_table.c.state,
list_events_table.c.status,
)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
has_russian = any("\u0400" <= char <= "\u04ff" for char in filter_dto.search)
if has_russian:
query = query.where(list_events_table.c.title.ilike(search_term))
else:
query = query.where(
or_(list_events_table.c.title.ilike(search_term), list_events_table.c.name.ilike(search_term))
)
filter_conditions = []
if filter_dto.filters:
for field, values in filter_dto.filters.items():
column = getattr(list_events_table.c, field, None)
if column is not None and values:
if len(values) == 1:
filter_conditions.append(column == values[0])
else:
filter_conditions.append(column.in_(values))
if filter_conditions:
query = query.where(and_(*filter_conditions))
if filter_dto.order:
order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get("direction", "asc")
if order_field.startswith("schema."):
json_field = order_field[7:]
column = list_events_table.c.schema[json_field].astext
else:
column = getattr(list_events_table.c, order_field, None)
if column is not None:
if order_direction.lower() == "desc":
query = query.order_by(desc(column))
else:
query = query.order_by(asc(column))
else:
query = query.order_by(list_events_table.c.id)
query = query.offset(offset).limit(limit)
count_query = select(func.count()).select_from(list_events_table)
if filter_dto.search:
search_term = f"%{filter_dto.search}%"
has_russian = any("\u0400" <= char <= "\u04ff" for char in filter_dto.search)
if has_russian:
count_query = count_query.where(list_events_table.c.title.ilike(search_term))
else:
count_query = count_query.where(
or_(list_events_table.c.title.ilike(search_term), list_events_table.c.name.ilike(search_term))
)
if filter_conditions:
count_query = count_query.where(and_(*filter_conditions))
result = await connection.execute(query)
count_result = await connection.execute(count_query)
events_data = result.mappings().all()
total_count = count_result.scalar()
if not total_count:
return None
total_pages = math.ceil(total_count / limit)
validated_events = all_list_event_adapter.validate_python(events_data)
return AllListEventResponse(
list_event=validated_events,
amount_count=total_count,
amount_pages=total_pages,
current_page=page,
limit=limit,
)
async def get_listevents_page_by_creator_id( async def get_listevents_page_by_creator_id(

View File

@@ -10,16 +10,27 @@ from api.db.tables.process import process_schema_table
from api.schemas.process.process_schema import ProcessSchema from api.schemas.process.process_schema import ProcessSchema
from api.schemas.endpoints.process_schema import all_process_schema_adapter, AllProcessSchemaResponse, ProcessSchemaFilterDTO from api.schemas.endpoints.process_schema import (
all_process_schema_adapter,
AllProcessSchemaResponse,
ProcessSchemaFilterDTO,
)
async def get_process_schema_page(connection: AsyncConnection, filter_dto: ProcessSchemaFilterDTO) -> Optional[AllProcessSchemaResponse]: async def get_process_schema_page_DTO(
connection: AsyncConnection, filter_dto: ProcessSchemaFilterDTO
) -> Optional[AllProcessSchemaResponse]:
""" """
Получает список схем процессов с комплексной фильтрацией через DTO объект. Получает список схем процессов с комплексной фильтрацией через DTO объект.
Поддерживает:
- пагинацию
- поиск
- фильтрацию по полям
- сортировку
""" """
page = filter_dto.pagination.get('page', 1) page = filter_dto.pagination.get("page", 1)
limit = filter_dto.pagination.get('limit', 10) limit = filter_dto.pagination.get("limit", 10)
offset = (page - 1) * limit offset = (page - 1) * limit
query = select( query = select(
@@ -36,10 +47,7 @@ async def get_process_schema_page(connection: AsyncConnection, filter_dto: Proce
if filter_dto.search: if filter_dto.search:
search_term = f"%{filter_dto.search}%" search_term = f"%{filter_dto.search}%"
query = query.where( query = query.where(
or_( or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term))
process_schema_table.c.title.ilike(search_term),
process_schema_table.c.description.ilike(search_term)
)
) )
if filter_dto.filters: if filter_dto.filters:
@@ -56,12 +64,12 @@ async def get_process_schema_page(connection: AsyncConnection, filter_dto: Proce
query = query.where(and_(*filter_conditions)) query = query.where(and_(*filter_conditions))
if filter_dto.order: if filter_dto.order:
order_field = filter_dto.order.get('field', 'id') order_field = filter_dto.order.get("field", "id")
order_direction = filter_dto.order.get('direction', 'asc') order_direction = filter_dto.order.get("direction", "asc")
column = getattr(process_schema_table.c, order_field, None) column = getattr(process_schema_table.c, order_field, None)
if column is not None: if column is not None:
if order_direction.lower() == 'desc': if order_direction.lower() == "desc":
query = query.order_by(desc(column)) query = query.order_by(desc(column))
else: else:
query = query.order_by(asc(column)) query = query.order_by(asc(column))
@@ -75,10 +83,7 @@ async def get_process_schema_page(connection: AsyncConnection, filter_dto: Proce
if filter_dto.search: if filter_dto.search:
search_term = f"%{filter_dto.search}%" search_term = f"%{filter_dto.search}%"
count_query = count_query.where( count_query = count_query.where(
or_( or_(process_schema_table.c.title.ilike(search_term), process_schema_table.c.description.ilike(search_term))
process_schema_table.c.title.ilike(search_term),
process_schema_table.c.description.ilike(search_term)
)
) )
if filter_dto.filters and filter_conditions: if filter_dto.filters and filter_conditions:
@@ -105,6 +110,7 @@ async def get_process_schema_page(connection: AsyncConnection, filter_dto: Proce
limit=limit, limit=limit,
) )
async def get_process_schema_by_title(connection: AsyncConnection, title: str) -> Optional[ProcessSchema]: async def get_process_schema_by_title(connection: AsyncConnection, title: str) -> Optional[ProcessSchema]:
""" """
Получает process schema по title. Получает process schema по title.

View File

@@ -1,15 +1,12 @@
from fastapi import ( from fastapi import APIRouter, Depends, HTTPException, status, Query
APIRouter,
Depends, from typing import Optional, List
HTTPException,
status,
)
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep from api.db.connection.session import get_connection_dep
from api.db.logic.account import ( from api.db.logic.account import (
create_user, create_user,
get_user_accaunt_page, get_user_account_page_DTO,
get_user_by_id, get_user_by_id,
get_user_by_login, get_user_by_login,
update_user_by_id, update_user_by_id,
@@ -18,7 +15,7 @@ from api.db.logic.keyring import create_password_key, update_password_key
from api.db.tables.account import AccountStatus from api.db.tables.account import AccountStatus
from api.schemas.account.account import User from api.schemas.account.account import User
from api.schemas.base import bearer_schema from api.schemas.base import bearer_schema
from api.schemas.endpoints.account import AllUser, AllUserResponse, UserCreate, UserUpdate from api.schemas.endpoints.account import AllUserResponse, UserCreate, UserUpdate, UserFilterDTO
from api.services.auth import get_current_user from api.services.auth import get_current_user
from api.services.user_role_validation import db_user_role_validation from api.services.user_role_validation import db_user_role_validation
@@ -30,14 +27,33 @@ api_router = APIRouter(
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllUserResponse) @api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllUserResponse)
async def get_all_account( async def get_all_account(
page: int = 1, page: int = Query(1, description="Page number", gt=0),
limit: int = 10, limit: int = Query(10, description="КNumber of items per page", gt=0),
search: Optional[str] = Query(None, description="Search term to filter by name or login or email"),
status_filter: Optional[List[str]] = Query(None, description="Filter by status"),
role_filter: Optional[List[str]] = Query(None, description="Filter by role"),
creator_id: Optional[int] = Query(None, description="Filter by creator id"),
order_field: Optional[str] = Query("id", description="Field to sort by"),
order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"),
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
authorize_user = await db_user_role_validation(connection, current_user) authorize_user = await db_user_role_validation(connection, current_user)
user_list = await get_user_accaunt_page(connection, page, limit) filters = {
**({"status": status_filter} if status_filter else {}),
**({"role": role_filter} if role_filter else {}),
**({"creator_id": [str(creator_id)]} if creator_id else {}),
}
filter_dto = UserFilterDTO(
pagination={"page": page, "limit": limit},
search=search,
order={"field": order_field, "direction": order_direction},
filters=filters if filters else None,
)
user_list = await get_user_account_page_DTO(connection, filter_dto)
if user_list is None: if user_list is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Accounts not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Accounts not found")
@@ -45,7 +61,7 @@ async def get_all_account(
return user_list return user_list
@api_router.get("/{user_id}", dependencies=[Depends(bearer_schema)], response_model=UserUpdate) @api_router.get("/{user_id}", dependencies=[Depends(bearer_schema)], response_model=User)
async def get_account( async def get_account(
user_id: int, user_id: int,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
@@ -61,7 +77,7 @@ async def get_account(
return user return user
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=AllUser) @api_router.post("", dependencies=[Depends(bearer_schema)], response_model=User)
async def create_account( async def create_account(
user: UserCreate, user: UserCreate,
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),

View File

@@ -1,10 +1,6 @@
from fastapi import ( from fastapi import APIRouter, Depends, HTTPException, status, Query
APIRouter,
Depends,
HTTPException,
status,
)
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
@@ -17,8 +13,7 @@ from api.db.logic.listevents import (
get_listevents_by_id, get_listevents_by_id,
create_listevents, create_listevents,
update_listevents_by_id, update_listevents_by_id,
get_listevents_page, get_listevents_page_DTO,
get_listevents_page_by_creator_id,
) )
@@ -27,7 +22,7 @@ from api.db.tables.events import EventStatus
from api.schemas.base import bearer_schema from api.schemas.base import bearer_schema
from api.schemas.endpoints.list_events import ListEventUpdate, AllListEventResponse from api.schemas.endpoints.list_events import ListEventUpdate, AllListEventResponse, ListEventFilterDTO
from api.services.auth import get_current_user from api.services.auth import get_current_user
@@ -37,7 +32,6 @@ from api.services.user_role_validation import (
) )
api_router = APIRouter( api_router = APIRouter(
prefix="/listevents", prefix="/listevents",
tags=["list events"], tags=["list events"],
@@ -46,22 +40,38 @@ api_router = APIRouter(
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllListEventResponse) @api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllListEventResponse)
async def get_all_list_events( async def get_all_list_events(
page: int = 1, page: int = Query(1, description="Page number", gt=0),
limit: int = 10, limit: int = Query(10, description="Number of items per page", gt=0),
search: Optional[str] = Query(None, description="Search term to filter by title or name"),
order_field: Optional[str] = Query("id", description="Field to sort by"),
order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"),
status_filter: Optional[List[str]] = Query(None, description="Filter by status"),
state_filter: Optional[List[str]] = Query(None, description="Filter by state"),
creator_id: Optional[int] = Query(None, description="Filter by creator id"),
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
filters = {
**({"status": status_filter} if status_filter else {}),
**({"creator_id": [str(creator_id)]} if creator_id else {}),
**({"state": state_filter} if state_filter else {}),
}
filter_dto = ListEventFilterDTO(
pagination={"page": page, "limit": limit},
search=search,
order={"field": order_field, "direction": order_direction},
filters=filters if filters else None,
)
authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user) authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user)
if page_flag: if not page_flag:
list_events_page = await get_listevents_page(connection, page, limit) if filter_dto.filters is None:
filter_dto.filters = {}
filter_dto.filters["creator_id"] = [str(authorize_user.id)]
if list_events_page is None: list_events_page = await get_listevents_page_DTO(connection, filter_dto)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")
return list_events_page
else:
list_events_page = await get_listevents_page_by_creator_id(connection, authorize_user.id, page, limit)
if list_events_page is None: if list_events_page is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found")

View File

@@ -1,10 +1,4 @@
from fastapi import ( from fastapi import APIRouter, Depends, HTTPException, status, Query
APIRouter,
Depends,
HTTPException,
status,
Query
)
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
from sqlalchemy.ext.asyncio import AsyncConnection from sqlalchemy.ext.asyncio import AsyncConnection
@@ -18,7 +12,7 @@ from api.db.logic.processschema import (
create_process_schema, create_process_schema,
get_process_schema_by_id, get_process_schema_by_id,
update_process_schema_by_id, update_process_schema_by_id,
get_process_schema_page, get_process_schema_page_DTO,
) )
from api.schemas.process.process_schema import ProcessSchema from api.schemas.process.process_schema import ProcessSchema
@@ -51,12 +45,11 @@ async def get_all_process_schema(
order_field: Optional[str] = Query("id", description="Field to sort by"), order_field: Optional[str] = Query("id", description="Field to sort by"),
order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"), order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"),
status_filter: Optional[List[str]] = Query(None, description="Filter by status"), status_filter: Optional[List[str]] = Query(None, description="Filter by status"),
owner_id: Optional[List[str]] = Query(None, description="Filter by owner ID"), owner_id: Optional[List[str]] = Query(None, description="Filter by owner id"),
connection: AsyncConnection = Depends(get_connection_dep), connection: AsyncConnection = Depends(get_connection_dep),
creator_id: Optional[int] = Query(None, description="Filter by creator ID"), creator_id: Optional[int] = Query(None, description="Filter by creator id"),
current_user=Depends(get_current_user), current_user=Depends(get_current_user),
): ):
filters = { filters = {
**({"status": status_filter} if status_filter else {}), **({"status": status_filter} if status_filter else {}),
**({"owner_id": owner_id} if owner_id else {}), **({"owner_id": owner_id} if owner_id else {}),
@@ -67,7 +60,7 @@ async def get_all_process_schema(
pagination={"page": page, "limit": limit}, pagination={"page": page, "limit": limit},
search=search, search=search,
order={"field": order_field, "direction": order_direction}, order={"field": order_field, "direction": order_direction},
filters=filters if filters else None filters=filters if filters else None,
) )
authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user) authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user)
@@ -77,13 +70,10 @@ async def get_all_process_schema(
filter_dto.filters = {} filter_dto.filters = {}
filter_dto.filters["creator_id"] = [str(authorize_user.id)] filter_dto.filters["creator_id"] = [str(authorize_user.id)]
process_schema_page = await get_process_schema_page(connection, filter_dto) process_schema_page = await get_process_schema_page_DTO(connection, filter_dto)
if process_schema_page is None: if process_schema_page is None:
raise HTTPException( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
status_code=status.HTTP_404_NOT_FOUND,
detail="Process schema not found"
)
return process_schema_page return process_schema_page

View File

@@ -1,5 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import List, Optional, Dict
from pydantic import EmailStr, Field, TypeAdapter from pydantic import EmailStr, Field, TypeAdapter
@@ -36,6 +36,8 @@ class AllUser(Base):
email: Optional[EmailStr] = None email: Optional[EmailStr] = None
bind_tenant_id: Optional[str] = None bind_tenant_id: Optional[str] = None
role: AccountRole role: AccountRole
meta: Optional[dict] = None
creator_id: Optional[int] = None
created_at: datetime created_at: datetime
status: AccountStatus status: AccountStatus
@@ -49,3 +51,10 @@ class AllUserResponse(Base):
all_user_adapter = TypeAdapter(List[AllUser]) all_user_adapter = TypeAdapter(List[AllUser])
class UserFilterDTO(Base):
pagination: Dict[str, int]
search: Optional[str] = None
order: Optional[Dict[str, str]] = None
filters: Optional[Dict[str, List[str]]] = None

View File

@@ -35,3 +35,15 @@ class AllListEventResponse(Base):
all_list_event_adapter = TypeAdapter(List[AllListEvent]) all_list_event_adapter = TypeAdapter(List[AllListEvent])
class ListEventFilterDTO(Base):
pagination: Dict[str, int] = Field(
default={"page": 1, "limit": 10},
description="Пагинация (номер страницы и количество элементов)",
)
search: Optional[str] = Field(default=None, description="Поиск по текстовым полям (name, title)")
order: Optional[Dict[str, str]] = Field(
default={"field": "id", "direction": "asc"}, description="Сортировка (поле и направление)"
)
filters: Optional[Dict[str, List[str]]] = Field(default=None, description="Фильтрация по точным значениям")

View File

@@ -37,9 +37,8 @@ class AllProcessSchemaResponse(Base):
all_process_schema_adapter = TypeAdapter(List[AllProcessSchema]) all_process_schema_adapter = TypeAdapter(List[AllProcessSchema])
# DTO объект для фильтрации
class ProcessSchemaFilterDTO(Base): class ProcessSchemaFilterDTO(Base):
pagination: Dict[str, int] # {page: 1, limit: 10} pagination: Dict[str, int]
search: Optional[str] = None search: Optional[str] = None
order: Optional[Dict[str, str]] = None # {field: "id", direction: "asc"} order: Optional[Dict[str, str]] = None
filters: Optional[Dict[str, List[str]]] = None # {"status": ["active"], "owner_id": ["123"]} filters: Optional[Dict[str, List[str]]] = None