import csv import json import os import sys import requests from logging_config import setup_logging logger = setup_logging(__name__) base_dir = os.path.dirname(os.path.realpath(__file__)) asana_data_json = os.path.join(base_dir, './data/data_asana.json') users_data = os.path.join(base_dir, './data/users.mapping.asana2ya.csv') sys.stdin.reconfigure(encoding='utf-8') sys.stdout.reconfigure(encoding='utf-8') QUEUE_NAME = 'TESTIMPORT' def get_assignee_data(): """ Получение данных для поля assignee """ mapping = {} with open(users_data, 'r', encoding="utf8") as csv_file: reader = csv.DictReader(csv_file) for row in reader: asana_id = row['ID'] ya_login = row['ya_login'] mapping[asana_id] = ya_login return mapping assignee = get_assignee_data() def get_task_status(task): """ Получение данных о статусе задачи """ if task.get('completed') is True: status = 'resolved' completed_at = task.get('completed_at', '') else: status = 'open' completed_at = '' return status, completed_at # здесь функция для дальнейшей работы с parent def get_parent_task_ids(data, ya_imported_task_response): """ Получение соответствий между идентификаторами подзадач в Асане и Яндекс Трекере """ parent_task_ids = {} for task in data["data"]: if task.get('parent') and 'gid' in task['parent']: parent_gid = task['parent']['gid'] if parent_gid in ya_imported_task_response: parent_id = ya_imported_task_response[parent_gid] parent_task_ids[task['gid']] = parent_id return parent_task_ids def transform_data(data): """ Преобразование данных из Asana в ЯндексТрекер в совместимом формате """ transformed_data = [] for task in data["data"]: status, completed_at = get_task_status(task) transformed_task = { 'summary': task['name'], 'description': task['notes'], 'createdAt': task['created_at'], 'deadline': task['due_on'], 'assignee': task['assignee'], 'status': status, 'completedAt': completed_at, } transformed_data.append(transformed_task) return transformed_data def create_tasks_in_tracker(data): """ Создание задач в ЯндексТрекер """ base_url = 'https://api.tracker.yandex.net/v2/issues/_import' headers = { 'Host': 'api.tracker.yandex.net', 'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k', 'X-Org-ID': '7095769', 'Content-Type': 'appication/json', } datalen = len(data) logger.info('Import started. Task count: %d', datalen) # этот датасет тоже для parent добавлен ya_imported_task_response = {} for task in data: if 'gid' not in task: continue if task.get('assignee') and 'gid' in task['assignee']: ya_assignee = assignee.get(task['assignee']['gid'], 'dr.cyrill') else: ya_assignee = 'dr.cyrill' payload = { 'queue': QUEUE_NAME, 'summary': task['summary'], 'description': task['description'], 'createdAt': task['createdAt'], 'createdBy': 'dr.cyrill', 'deadline': task['deadline'], 'assignee': ya_assignee, 'status': task['status'], } logger.debug('Request: %s', json.dumps(payload)) response = requests.post( base_url, headers=headers, data=json.dumps(payload), ) if response.status_code == 201: print('Задача успешно создана в ЯндексТрекер') # здесь добавлено про родительскую задачу ya_imported_task_response[task['gid']] = response.json()['id'] else: print( 'Ошибка при создании задачи в ЯндексТрекер:', response.content, ) logger.debug('Response: %s', response.content) # вот здесь тоже про родительскую задачу добавлено parent_task_ids = get_parent_task_ids(data, ya_imported_task_response) for gid, parent_id in parent_task_ids.items(): assign_parent_task(gid, parent_id) #и эта функция для родительской задачи написана def assign_parent_task(gid, parent_id): """ Назначение родительской задачи """ base_url = f'https://api.tracker.yandex.net/v2/issues/{gid}' headers = { 'Host': 'api.tracker.yandex.net', 'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k', 'X-Org-ID': '7095769', 'Content-Type': 'application/json', } payload = { 'parent': parent_id, } logger.debug('Request: %s', json.dumps(payload)) response = requests.patch( base_url, headers=headers, data=json.dumps(payload), ) if response.status_code == 200: print('Родительская задача успешно назначена') else: print('Ошибка при назначении родительской задачи:', response.content) logger.debug('Response: %s', response.content) # здесь я закончила добавлять про родительскую задачу с большой надеждой file = open(asana_data_json, "r", encoding="utf8") json_data = json.loads(file.read()) yandex_tracker_data = transform_data(json_data) create_tasks_in_tracker(yandex_tracker_data)