import csv import json import os import sys import logging import requests # from yandex_tracker_client import TrackerClient # YANDEX_TRACKER_TOKEN = 'y0_AgAAAABAmFP8AArkXwAAAADzJvmiOpP3VkBkQXyHUPEkimPtj5LvovE' # ORG_ID = '7095769' # TESTIMPORT = 'https://tracker.yandex.ru/pages/projects/5' logging.basicConfig(level=logging.DEBUG, filename='mapping.log', filemode='w') # client = TrackerClient( # token='YANDEX_TRACKER_TOKEN', # org_id='ORG_ID' # ) base_dir = os.path.dirname(os.path.realpath(__file__)) asana_data_json = os.path.join(base_dir, './data/data_asana.json') users_data = os.path.join(base_dir, './data/users.mapping.asana2ya.csv') sys.stdout.reconfigure(encoding='utf-8') # ниже функция для случаев, когда не будет готового файла для импорта # def get_data_from_asana(): # """ Получение данных из Asana """ # headers = { # 'Authorization': 'ASANA_TOKEN', # } # response = requests.get('https://api.asana.com/projects/PROJECT_ID/tasks', headers=headers) # data = response.json() # return data def get_assignee_data(): """ Получение данных для поля assignee """ mapping = {} with open(users_data, 'r', encoding="utf8") as csv_file: reader = csv.DictReader(csv_file) for row in reader: asana_id = row['ID'] ya_login = row['ya_login'] mapping[asana_id] = ya_login return mapping assignee = get_assignee_data() def transform_data(data): """ Преобразование данных из Asana в ЯндексТрекер в совместимом формате """ transformed_data = [] for task in data["data"]: transformed_task = { 'summary': task['name'], 'description': task['notes'], 'createdAt': task['created_at'], 'deadline': task['due_on'], # 'assignee': task['assignee'], # 'milestone': task['section'], # 'memberships': task['memberships'], # 'parent': task[''], # 'status': task['completed'], } transformed_data.append(transformed_task) return transformed_data def create_tasks_in_tracker(data): """ Создание задач в ЯндексТрекер """ base_url = 'https://api.tracker.yandex.net/v2/issues/_import' headers = { 'Host': 'api.tracker.yandex.net', # 'Authorization': 'OAuth y0_AgAEA7qkB3KjAArTqQAAAADzHp7Ut0nhAmx5Q_25b1jpqiuBXRIJksk', 'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k', 'X-Org-ID': '7095769', 'Content-Type': 'appication/json', } for task in data: ya_assignee = assignee[task['assignee']['gid']] if task.get('assignee', 0) !=0 else 'dr.cyrill' payload = { 'queue': 'TESTIMPORT', # Вынести в переменную 'summary': task['summary'], 'description': task['description'], 'createdAt': task['createdAt'], 'createdBy': 'dr.cyrill', 'deadline': task['deadline'], 'assignee': ya_assignee, # 'milestone': task['milestone'], # 'memberships': task['memberships'], # 'parent': task['parent'], # 'status': task['status'], } response = requests.post( base_url, headers=headers, data=json.dumps(payload), ) if response.status_code == 201: print('Задача успешно создана в ЯндексТрекер') else: print( 'Ошибка при создании задачи в ЯндексТрекер:', response.content, ) # asana_data = get_data_from_asana() file = open(asana_data_json, "r", encoding="utf8") json_data = json.loads(file.read()) yandex_tracker_data = transform_data(json_data) create_tasks_in_tracker(yandex_tracker_data)