Параллельная обработка и преобразование JSON-фалов в Pandas

Исходными данными являлись 10 тыс. json-файлов, в каждом из которых было около 3 тыс. конечных узлов. При последовательной загрузке всех файлов и преобразовании в pandas при помощи json_normalize время выполнения составило 10 минут, что довольно долго.

В связи с тем, что данную операцию необходимо было выполнить несколько раз на различных наборах данных, было принято решение ускорить процесс преобразования за счет реализации механизма распараллеливания и реализовать его как полноценный настраиваемый скрипт.

Для создания параллельных вычислений важны два условия: разделимость и независимость данных. Каждый из json-файлов представляет одну строку результирующей таблицы, что позволяет спокойно разделить их на партиции. Также для преобразования одного файла не нужен какой-либо другой, что удовлетворяет условию независимости данных. Условия соблюдены, можно приступать к коду.

Импорт библиотек:

from tqdm import tqdm from pathlib import Path import json from multiprocessing import Pool, RLock import pandas as pd import pickle import argparse

Весь алгоритм программы разделен на следующие блоки:

  • Парсинг аргументов
  • Получение всех путей до файлов и разделение их на n групп (Для n подпроцессов)
  • Запуск процессов, выполняющих выгрузку и преобразование в pandas
  • Объединение результатов выполнения подпроцессов в единый фрейм и сохранение

1. Парсинг аргументов

Для того, чтобы скрипт можно было удобно переиспользовать с другими настройками, необходимо определить входные параметры скрипта с помощью библиотеки argparse. Отделить инициализацию парсера аргументов от основной логики скрипта, описав ее в функции get_arg_pareser:

def get_arg_pareser(): parser = argparse.ArgumentParser(description= 'From json files creates pd.DataFrame') parser.add_argument('-i', '--input-folder', type=str, help='input data folder', required=True) parser.add_argument('-o', '--output-file', type=str, default=r'output.pickle', help='output.pickle') parser.add_argument('-e', '--n-executors', type=int, default=8, help='number of subprocesses (default: 8)') return parser

Вызвать его в блоке __main__:

args = get_arg_pareser().parse_args() N_GROUPS = args.n_executors jsons_folder_path = args.input_folder output_file = args.output_file

2. Пути до файлов

При помощи библиотеки Pathlib можно получить все пути до входных файлов:

f_paths = list(Path(jsons_folder_path).glob('*.json'))

Теперь разделить данные на n частей, пронумеровав каждую партицию (Для красивого отслеживания выполнения процесса)

in_group = len(f_paths) // N_GROUPS + 1 inp_args = [f_paths[i:i + in_group] for i in range(0, len(f_paths), in_group)] inp_args = list(enumerate(inp_args))

3. Распараллеливание

В любой задаче, в которой необходимо создать распараллеливание, необходимо определить логику выполнения подпроцесса в отдельном блоке (функции). Создать функцию one_process_execution, определяющую обработку одной партиции данных.

Функция в рамках одного процесса выгружает все json файлы партиции, сохраняя два массива: названия файлов (indexes) и сами данные (res_array). После выгрузки всех данных, при помощи функции pd.json_normalize, преобразую список словарей в таблицу и выставлю имена файлов как индекс.

Важным замечанием является то, что функцию json_normalize следует выполнять именно на массиве словарей внутри подпроцессов, а не на каждом отдельном файле. Если же поставить преобразование в pandas каждого файла отдельно и итерировано добавлять в pd.DataFrame по строчке, то это замедлит выполнение в 3 раза. Главное правило преобразования чего-либо в pandas, делать саму трансформацию как можно позднее.

def one_process_execution(pid, f_paths): res_arr = [] indexes = [] tqdm_text = '#' + f'{pid}'.zfill(3) with tqdm(total=len(f_paths), position=pid+1, desc=tqdm_text) as pbar: for path in f_paths: with open(str(path), 'r') as f: d = json.load(f) indexes.append(path.stem) res_arr.append(d) pbar.update(1) df = pd.json_normalize(res_arr).assign(index=indexes).set_index('index') print(f'Subproc {pid} done') return df

Теперь, когда у меня есть разделенные данные и есть описание функции подпроцесса – время приступать к созданию пула (контейнера) процессов. Для реализации параллельных вычислений в Python используется библиотека multiprocessing. При помощи класса Pool инициализирую контейнер задач, указав необходимое число подпроцессов и дополнительные параметры для работы отображения статуса выполнения. Далее необходимо заполнить этот пул описанной выше функции one_process_execution с входными данными, сформированными на шаге 1. Добавление задач происходит с помощью ключевого слова apply_async, определяющего поведение выполнения наших процессов.

pool = Pool(processes=N_GROUPS, initializer=tqdm.set_lock, initargs=(RLock(),)) jobs = [pool.apply_async(one_process_execution, args=x) for x in inp_args]

Следует отметить, что метод apply_async не запускает сам процесс, а лишь добавляет его в список задач. Таким образом, в переменной jobs хранятся определения подпроцессов, готовых к началу выполнения.

Для запуска вычислений надо у каждого элемента массива вызвать метод get, который после выполнения вернет результат нашей функции, по мере выполнения. Вызов этого метода для каждого элемента массива задач (Рис. 1):

# run pool df_lists = list(map(lambda x: x.get(), jobs))
Рисунок 1 - Статус выполнения программы
Рисунок 1 - Статус выполнения программы

4. Объединение и сохранение

Теперь дело за малым — осталось объединить все датафреймы в один единый и сохранить:

res_df = pd.concat(df_lists) # save with open(output_file, 'wb') as f: pickle.dump(res_df, f)

Заключение

Таким образом, был создан скрипт, позволяющий преобразовывать большое количество json файлов в единый датафрейм, используя технологии распараллеливания вычислений. Данная реализация преобразовала те же 10 тысяч файлов в единый датафрейм за 2 минуты, тем самым ускорив процесс в 5 раз.

Полноценный скрипт можно посмотреть на моей личной странице github. Он был реализован таким образом, что может быть запущен в двух режимах:

  1. Как скрипт, сохраняющий результат как pickle файл
  2. Как подключаемый модуль, функция main которого возвращает сгенерированный pd.DataFrame.
55
4 комментария

не понятно, зачем здесь петухон, он же медленный. Если не нужен мастер процесс (шареная память), то если не быстрее, то примерно так же будет работать php pcntl. Если все же шареная память нужна - нода + worker_threads. Если нужно запускаться нативно на всех системах, без установки окружений go.

Ответить

Добрый день! В данной статье описываются возможности именно python для преобразования в известный аналитикам pandas Dataframe. Не спорю, что можно придумать и более изощренные варианты распределенной обработки на других языках, сохранять в csv и далее загружать питоном в pandas)

Ответить

Комментарий недоступен

Ответить