Mapping_ya_tracker/mapping.py

124 lines
4.0 KiB
Python

import csv
import json
import os
import sys
import logging
import requests
# from yandex_tracker_client import TrackerClient
# YANDEX_TRACKER_TOKEN = 'y0_AgAAAABAmFP8AArkXwAAAADzJvmiOpP3VkBkQXyHUPEkimPtj5LvovE'
# ORG_ID = '7095769'
# TESTIMPORT = 'https://tracker.yandex.ru/pages/projects/5'
logging.basicConfig(level=logging.DEBUG, filename='mapping.log', filemode='w')
# client = TrackerClient(
# token='YANDEX_TRACKER_TOKEN',
# org_id='ORG_ID'
# )
base_dir = os.path.dirname(os.path.realpath(__file__))
asana_data_json = os.path.join(base_dir, './data/data_asana.json')
users_data = os.path.join(base_dir, './data/users.mapping.asana2ya.csv')
sys.stdout.reconfigure(encoding='utf-8')
# ниже функция для случаев, когда не будет готового файла для импорта
# def get_data_from_asana():
# """ Получение данных из Asana """
# headers = {
# 'Authorization': 'ASANA_TOKEN',
# }
# response = requests.get('https://api.asana.com/projects/PROJECT_ID/tasks', headers=headers)
# data = response.json()
# return data
def get_assignee_data():
""" Получение данных для поля assignee """
mapping = {}
with open(users_data, 'r', encoding="utf8") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
asana_id = row['ID']
ya_login = row['ya_login']
mapping[asana_id] = ya_login
return mapping
assignee = get_assignee_data()
def transform_data(data):
""" Преобразование данных из Asana в ЯндексТрекер в совместимом формате """
transformed_data = []
for task in data["data"]:
transformed_task = {
'summary': task['name'],
'description': task['notes'],
'createdAt': task['created_at'],
'deadline': task['due_on'],
# 'assignee': task['assignee'],
# 'milestone': task['section'],
# 'memberships': task['memberships'],
# 'parent': task[''],
# 'status': task['completed'],
}
transformed_data.append(transformed_task)
return transformed_data
def create_tasks_in_tracker(data):
""" Создание задач в ЯндексТрекер """
base_url = 'https://api.tracker.yandex.net/v2/issues/_import'
headers = {
'Host': 'api.tracker.yandex.net',
# 'Authorization': 'OAuth y0_AgAEA7qkB3KjAArTqQAAAADzHp7Ut0nhAmx5Q_25b1jpqiuBXRIJksk',
'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k',
'X-Org-ID': '7095769',
'Content-Type': 'appication/json',
}
for task in data:
ya_assignee = assignee[task['assignee']['gid']] if task.get('assignee', 0) !=0 else 'dr.cyrill'
payload = {
'queue': 'TESTIMPORT', # Вынести в переменную
'summary': task['summary'],
'description': task['description'],
'createdAt': task['createdAt'],
'createdBy': 'dr.cyrill@heado.ru',
'deadline': task['deadline'],
'assignee': ya_assignee,
# 'milestone': task['milestone'],
# 'memberships': task['memberships'],
# 'parent': task['parent'],
# 'status': task['status'],
}
response = requests.post(
base_url,
headers=headers,
data=json.dumps(payload),
)
if response.status_code == 201:
print('Задача успешно создана в ЯндексТрекер')
else:
print(
'Ошибка при создании задачи в ЯндексТрекер:',
response.content,
)
# asana_data = get_data_from_asana()
file = open(asana_data_json, "r", encoding="utf8")
json_data = json.loads(file.read())
yandex_tracker_data = transform_data(json_data)
create_tasks_in_tracker(yandex_tracker_data)