Mapping_ya_tracker/mapping.py
2023-11-29 14:08:36 +05:00

126 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import json
import os
import sys
import logging
import requests
logging.basicConfig(level=logging.DEBUG, filename='mapping.log', filemode='w')
base_dir = os.path.dirname(os.path.realpath(__file__))
asana_data_json = os.path.join(base_dir, './data/data_asana.json')
users_data = os.path.join(base_dir, './data/users.mapping.asana2ya.csv')
sys.stdout.reconfigure(encoding='utf-8')
# ниже функция для случаев, когда не будет готового файла для импорта
# def get_data_from_asana():
# """ Получение данных из Asana """
# headers = {
# 'Authorization': 'ASANA_TOKEN',
# }
# response = requests.get('https://api.asana.com/projects/PROJECT_ID/tasks', headers=headers)
# data = response.json()
# return data
def get_assignee_data():
""" Получение данных для поля assignee """
mapping = {}
with open(users_data, 'r', encoding="utf8") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
asana_id = row['ID']
ya_login = row['ya_login']
mapping[asana_id] = ya_login
return mapping
assignee = get_assignee_data()
def get_task_status(task):
""" Получение данных о статусе задачи """
if task.get('completed') is True:
status = 'resolved'
completed_at = task.get('completed_at', '')
else:
status = 'open'
completed_at = ''
return status, completed_at
def transform_data(data):
""" Преобразование данных из Asana в ЯндексТрекер в совместимом формате """
transformed_data = []
for task in data["data"]:
status, completed_at = get_task_status(task)
transformed_task = {
'summary': task['name'],
'description': task['notes'],
'createdAt': task['created_at'],
'deadline': task['due_on'],
'assignee': task['assignee'],
'status': status,
'completedAt': completed_at,
}
transformed_data.append(transformed_task)
return transformed_data
def create_tasks_in_tracker(data):
""" Создание задач в ЯндексТрекер """
base_url = 'https://api.tracker.yandex.net/v2/issues/_import'
headers = {
'Host': 'api.tracker.yandex.net',
'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k',
'X-Org-ID': '7095769',
'Content-Type': 'appication/json',
}
for task in data:
if task.get('assignee') and 'gid' in task['assignee']:
ya_assignee = assignee.get(task['assignee']['gid'], 'dr.cyrill')
else:
ya_assignee = 'dr.cyrill'
payload = {
'queue': 'TESTIMPORT', # Вынести в переменную
'summary': task['summary'],
'description': task['description'],
'createdAt': task['createdAt'],
'createdBy': 'dr.cyrill',
'deadline': task['deadline'],
'assignee': ya_assignee,
# 'milestone': task['milestone'],
# 'memberships': task['memberships'],
# 'parent': task['parent'],
'status': task['status'],
}
response = requests.post(
base_url,
headers=headers,
data=json.dumps(payload),
)
if response.status_code == 201:
print('Задача успешно создана в ЯндексТрекер')
else:
print(
'Ошибка при создании задачи в ЯндексТрекер:',
response.content,
)
# asana_data = get_data_from_asana()
file = open(asana_data_json, "r", encoding="utf8")
json_data = json.loads(file.read())
yandex_tracker_data = transform_data(json_data)
create_tasks_in_tracker(yandex_tracker_data)