Mapping_ya_tracker/mapping.py

211 lines
6.7 KiB
Python
Raw Normal View History

import csv
import json
import os
import sys
import time
import requests
2023-11-24 13:33:17 +05:00
from logging_config import setup_logging
from urllib.parse import urlencode
logger = setup_logging(__name__)
2023-11-27 21:22:57 +05:00
base_dir = os.path.dirname(os.path.realpath(__file__))
asana_data_json = os.path.join(base_dir, './data/data_asana.json')
users_data = os.path.join(base_dir, './data/users.mapping.asana2ya.csv')
sys.stdin.reconfigure(encoding='utf-8')
sys.stdout.reconfigure(encoding='utf-8')
QUEUE_NAME = 'TESTIMPORT'
headers = {
'Host': 'api.tracker.yandex.net',
'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k',
'X-Org-ID': '7095769',
'Content-Type': 'appication/json',
}
def get_assignee_data():
""" Получение данных для поля assignee """
mapping = {}
with open(users_data, 'r', encoding="utf8") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
asana_id = row['ID']
ya_login = row['ya_login']
mapping[asana_id] = ya_login
return mapping
assignee = get_assignee_data()
2023-11-29 14:08:36 +05:00
def get_task_status(task):
""" Получение данных о статусе задачи """
if task.get('completed') is True:
status = 'closed'
2023-11-29 14:08:36 +05:00
completed_at = task.get('completed_at', '')
else:
status = 'open'
completed_at = ''
return status, completed_at
# здесь функция для дальнейшей работы с parent
def get_parent_task_ids(task, ya_imported_task_response):
""" Получение соответствий между идентификаторами подзадач в Асане и Яндекс Трекере """
parent_task_ids = {}
if task.get('parent') and 'gid' in task['parent']:
parent_gid = task['parent']['gid']
if parent_gid in ya_imported_task_response:
parent_id = ya_imported_task_response[parent_gid]
parent_task_ids[task['gid']] = parent_id
return parent_task_ids
2023-11-29 14:08:36 +05:00
def transform_data(data):
""" Преобразование данных из Asana в ЯндексТрекер в совместимом формате """
transformed_data = []
for task in data:
2023-11-29 14:08:36 +05:00
status, completed_at = get_task_status(task)
transformed_task = {
'gid':task['gid'],
'summary': task['name'],
'description': task['notes'],
'createdAt': task['created_at'],
'deadline': task['due_on'],
'assignee': task['assignee'],
2023-11-29 14:08:36 +05:00
'status': status,
'completedAt': completed_at,
'parent': task['parent'],
'subtasks' : transform_data(task['subtasks'])
# 'parent': task['parent'] # это я добавила, чтоб parent добавить
}
transformed_data.append(transformed_task)
return transformed_data
def create_task(task):
base_url = 'https://api.tracker.yandex.net/v2/issues/_import'
print(task)
if task.get('assignee') and 'gid' in task['assignee']:
ya_assignee = assignee.get(task['assignee']['gid'], 'dr.cyrill')
else:
ya_assignee = 'dr.cyrill'
payload = {
'queue': QUEUE_NAME,
'summary': task['summary'],
'description': task['description'],
'createdAt': task['createdAt'],
'createdBy': 'dr.cyrill',
'deadline': task['deadline'],
'assignee': ya_assignee,
'status': task['status'],
}
logger.debug('Request: %s', json.dumps(payload))
response = requests.post(
base_url,
headers=headers,
data=json.dumps(payload),
params=urlencode({'parent': task['parent']}) # добавила параметр для parent в url-запрос
)
if response.status_code == 201:
print('Задача успешно создана в ЯндексТрекер')
# здесь добавлено про родительскую задачу
return response.json()['id']
else:
print(
'Ошибка при создании задачи в ЯндексТрекер:',
response.content,
)
logger.debug('Response: %s', response.content)
def create_tasks_in_tracker(data, limit=10):
""" Создание задач в ЯндексТрекер """
datalen = (len(data) if limit==-1 else min(limit, len(data)))
logger.info('Import started. Task count: %d', datalen)
# этот датасет тоже для parent добавлен
ya_imported_task_response = {}
for i in range(datalen):
task = data[i]
task_id = create_task(task)
logger.info('Created task id = %s',task_id)
for subtask in task['subtasks']:
logger.info('Found subtask gid = %s for id = %s',subtask['gid'], task_id)
subtask_id = create_task(subtask)
assign_parent_task(subtask_id, task_id)
# вот здесь тоже про родительскую задачу добавлено
#parent_task_ids = get_parent_task_ids(task, ya_imported_task_response)
#for gid, parent_id in parent_task_ids.items():
# assign_parent_task(ya_imported_task_response[gid], parent_id)
2023-12-05 18:26:18 +05:00
#и эта функция для родительской задачи написана
def assign_parent_task(task_id, parent_id):
""" Назначение родительской задачи """
base_url = f'https://api.tracker.yandex.net/v2/issues/{task_id}'
headers = {
'Host': 'api.tracker.yandex.net',
'Authorization': 'OAuth y0_AgAEA7qkB3KjAArkXwAAAADzMlP9oR1lwMzBS2e94jHzdnII8Laxi7k',
'X-Org-ID': '7095769',
'Content-Type': 'application/json',
}
payload = {
'parent': parent_id,
}
logger.debug('Request: %s', json.dumps(payload))
response = requests.patch(
base_url,
headers=headers,
data=json.dumps(payload),
)
if response.status_code == 200:
print('Родительская задача успешно назначена')
logger.info('Task %s successfully assigned a parent',task_id)
else:
print('Ошибка при назначении родительской задачи:', response.content)
logger.debug('Response: %s', response.content)
time.sleep(1) # добавляем задержку перед следующим запросом к API
# здесь я закончила добавлять про родительскую задачу с большой надеждой
2023-12-05 18:26:18 +05:00
file = open(asana_data_json, "r", encoding="utf8")
json_data = json.loads(file.read())
yandex_tracker_data = transform_data(json_data['data'])
create_tasks_in_tracker(yandex_tracker_data, limit=3)