refactor(core): general refactor for subtasks and their nested mapping

This commit is contained in:
cyrill 2023-12-08 17:55:38 +05:00
parent fe39119518
commit 0b668f85c5

View File

@ -22,6 +22,14 @@ sys.stdout.reconfigure(encoding='utf-8')
QUEUE_NAME = 'TESTIMPORT'
headers = {
'Host': 'api.tracker.yandex.net',
'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k',
'X-Org-ID': '7095769',
'Content-Type': 'appication/json',
}
def get_assignee_data():
""" Получение данных для поля assignee """
@ -71,7 +79,7 @@ def transform_data(data):
""" Преобразование данных из Asana в ЯндексТрекер в совместимом формате """
transformed_data = []
for task in data["data"]:
for task in data:
status, completed_at = get_task_status(task)
transformed_task = {
'gid':task['gid'],
@ -82,23 +90,60 @@ def transform_data(data):
'assignee': task['assignee'],
'status': status,
'completedAt': completed_at,
'parent': task['parent']
'parent': task['parent'],
'subtasks' : transform_data(task['subtasks'])
# 'parent': task['parent'] # это я добавила, чтоб parent добавить
}
transformed_data.append(transformed_task)
return transformed_data
def create_task(task):
base_url = 'https://api.tracker.yandex.net/v2/issues/_import'
print(task)
if task.get('assignee') and 'gid' in task['assignee']:
ya_assignee = assignee.get(task['assignee']['gid'], 'dr.cyrill')
else:
ya_assignee = 'dr.cyrill'
payload = {
'queue': QUEUE_NAME,
'summary': task['summary'],
'description': task['description'],
'createdAt': task['createdAt'],
'createdBy': 'dr.cyrill',
'deadline': task['deadline'],
'assignee': ya_assignee,
'status': task['status'],
}
logger.debug('Request: %s', json.dumps(payload))
response = requests.post(
base_url,
headers=headers,
data=json.dumps(payload),
params=urlencode({'parent': task['parent']}) # добавила параметр для parent в url-запрос
)
if response.status_code == 201:
print('Задача успешно создана в ЯндексТрекер')
# здесь добавлено про родительскую задачу
return response.json()['id']
else:
print(
'Ошибка при создании задачи в ЯндексТрекер:',
response.content,
)
logger.debug('Response: %s', response.content)
def create_tasks_in_tracker(data, limit=10):
""" Создание задач в ЯндексТрекер """
base_url = 'https://api.tracker.yandex.net/v2/issues/_import'
headers = {
'Host': 'api.tracker.yandex.net',
'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k',
'X-Org-ID': '7095769',
'Content-Type': 'appication/json',
}
datalen = (len(data) if limit==-1 else min(limit, len(data)))
logger.info('Import started. Task count: %d', datalen)
@ -107,57 +152,19 @@ def create_tasks_in_tracker(data, limit=10):
for i in range(datalen):
task = data[i]
task_id = create_task(task)
logger.info('Created task id = %s',task_id)
for subtask in task['subtasks']:
logger.info('Found subtask gid = %s for id = %s',subtask['gid'], task_id)
subtask_id = create_task(subtask)
assign_parent_task(subtask_id, task_id)
if task.get('assignee') and 'gid' in task['assignee']:
ya_assignee = assignee.get(task['assignee']['gid'], 'dr.cyrill')
else:
ya_assignee = 'dr.cyrill'
payload = {
'queue': QUEUE_NAME,
'summary': task['summary'],
'description': task['description'],
'createdAt': task['createdAt'],
'createdBy': 'dr.cyrill',
'deadline': task['deadline'],
'assignee': ya_assignee,
'status': task['status'],
}
logger.debug('Request: %s', json.dumps(payload))
response = requests.post(
base_url,
headers=headers,
data=json.dumps(payload),
params=urlencode({'parent': task['parent']}) # добавила параметр для parent в url-запрос
)
if response.status_code == 201:
print('Задача успешно создана в ЯндексТрекер')
# здесь добавлено про родительскую задачу
if 'gid' in task:
ya_imported_task_response[task['gid']] = response.json()['id']
logger.info('Got respnse for asana task gid = %s: ya.task.id = %s',task['gid'],ya_imported_task_response[task['gid']])
if task['parent'] is not None:
assign_parent_task(ya_imported_task_response[task['gid']], task['parent']['gid'])
else:
print(
'Ошибка при создании задачи в ЯндексТрекер:',
response.content,
)
logger.debug('Response: %s', response.content)
# вот здесь тоже про родительскую задачу добавлено
parent_task_ids = get_parent_task_ids(task, ya_imported_task_response)
#parent_task_ids = get_parent_task_ids(task, ya_imported_task_response)
for gid, parent_id in parent_task_ids.items():
assign_parent_task(ya_imported_task_response[gid], parent_id)
#for gid, parent_id in parent_task_ids.items():
# assign_parent_task(ya_imported_task_response[gid], parent_id)
#и эта функция для родительской задачи написана
@ -186,6 +193,7 @@ def assign_parent_task(task_id, parent_id):
if response.status_code == 200:
print('Родительская задача успешно назначена')
logger.info('Task %s successfully assigned a parent',task_id)
else:
print('Ошибка при назначении родительской задачи:', response.content)
@ -198,5 +206,5 @@ def assign_parent_task(task_id, parent_id):
file = open(asana_data_json, "r", encoding="utf8")
json_data = json.loads(file.read())
yandex_tracker_data = transform_data(json_data)
yandex_tracker_data = transform_data(json_data['data'])
create_tasks_in_tracker(yandex_tracker_data, limit=3)