Mapping_ya_tracker/mapping.py

191 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import json
import os
import sys
import requests
from logging_config import setup_logging
logger = setup_logging(__name__)
base_dir = os.path.dirname(os.path.realpath(__file__))
asana_data_json = os.path.join(base_dir, './data/data_asana.json')
users_data = os.path.join(base_dir, './data/users.mapping.asana2ya.csv')
sys.stdin.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
QUEUE_NAME = 'TESTIMPORT'
def get_assignee_data():
""" Получение данных для поля assignee """
mapping = {}
with open(users_data, 'r', encoding="utf8") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
asana_id = row['ID']
ya_login = row['ya_login']
mapping[asana_id] = ya_login
return mapping
assignee = get_assignee_data()
def get_task_status(task):
""" Получение данных о статусе задачи """
if task.get('completed') is True:
status = 'resolved'
completed_at = task.get('completed_at', '')
else:
status = 'open'
completed_at = ''
return status, completed_at
# здесь функция для дальнейшей работы с parent
def get_parent_task_ids(task, ya_imported_task_response):
""" Получение соответствий между идентификаторами подзадач в Асане и Яндекс Трекере """
parent_task_ids = {}
if task.get('parent') and 'gid' in task['parent']:
parent_gid = task['parent']['gid']
if parent_gid in ya_imported_task_response:
parent_id = ya_imported_task_response[parent_gid]
parent_task_ids[task['gid']] = parent_id
return parent_task_ids
def transform_data(data):
""" Преобразование данных из Asana в ЯндексТрекер в совместимом формате """
transformed_data = []
for task in data["data"]:
status, completed_at = get_task_status(task)
transformed_task = {
'summary': task['name'],
'description': task['notes'],
'createdAt': task['created_at'],
'deadline': task['due_on'],
'assignee': task['assignee'],
'status': status,
'completedAt': completed_at,
'parent': task['parent'] # это я добавила, чтоб parent добавить
}
transformed_data.append(transformed_task)
return transformed_data
def create_tasks_in_tracker(data):
""" Создание задач в ЯндексТрекер """
base_url = 'https://api.tracker.yandex.net/v2/issues/_import'
headers = {
'Host': 'api.tracker.yandex.net',
'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k',
'X-Org-ID': '7095769',
'Content-Type': 'appication/json',
}
datalen = len(data)
logger.info('Import started. Task count: %d', datalen)
# этот датасет тоже для parent добавлен
ya_imported_task_response = {}
for task in data:
if task.get('assignee') and 'gid' in task['assignee']:
ya_assignee = assignee.get(task['assignee']['gid'], 'dr.cyrill')
else:
ya_assignee = 'dr.cyrill'
payload = {
'queue': QUEUE_NAME,
'summary': task['summary'],
'description': task['description'],
'createdAt': task['createdAt'],
'createdBy': 'dr.cyrill',
'deadline': task['deadline'],
'assignee': ya_assignee,
'status': task['status'],
}
logger.debug('Request: %s', json.dumps(payload))
response = requests.post(
base_url,
headers=headers,
data=json.dumps(payload),
)
if response.status_code == 201:
print('Задача успешно создана в ЯндексТрекер')
# здесь добавлено про родительскую задачу
if 'gid' in task:
ya_imported_task_response[task['gid']] = response.json()['id']
assign_parent_task(response.json()['id'], task['parent']['gid'])
else:
print(
'Ошибка при создании задачи в ЯндексТрекер:',
response.content,
)
logger.debug('Response: %s', response.content)
# вот здесь тоже про родительскую задачу добавлено
parent_task_ids = get_parent_task_ids(task, ya_imported_task_response)
for gid, parent_id in parent_task_ids.items():
assign_parent_task(ya_imported_task_response[gid], parent_id)
#и эта функция для родительской задачи написана
def assign_parent_task(task, parent_id):
""" Назначение родительской задачи """
base_url = f'https://api.tracker.yandex.net/v2/issues/{task[id]}'
headers = {
'Host': 'api.tracker.yandex.net',
'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k',
'X-Org-ID': '7095769',
'Content-Type': 'application/json',
}
payload = {
'parent': parent_id,
}
logger.debug('Request: %s', json.dumps(payload))
response = requests.patch(
base_url,
headers=headers,
data=json.dumps(payload),
)
if response.status_code == 200:
print('Родительская задача успешно назначена')
else:
print('Ошибка при назначении родительской задачи:', response.content)
logger.debug('Response: %s', response.content)
# здесь я закончила добавлять про родительскую задачу с большой надеждой
file = open(asana_data_json, "r", encoding="utf8")
json_data = json.loads(file.read())
yandex_tracker_data = transform_data(json_data)
create_tasks_in_tracker(yandex_tracker_data)