Многопотоковая обработка файлов

В работе по анализу данных иногда требуется обработать большое количество файлов, настолько большое, что обработка может занять неделю, другую. В этой статье вы познакомитесь с многопоточностью в Python и как я решал проблему обработки большого количества файлов.

Мне потребовалось преобразовать достаточно большое количество аудио файлов для дальнейшей обработки моделью по распознанию аудио (SpeechToText). Для преобразования речи в текст я использовал достаточно популярный инструмент - VOSK для Python. Чтобы речь правильно распознавалась моделью, аудио файлы должны быть определенного формата и частоты дискретизации (битрейт), а файлы, которые были в наличии, не подходили и их необходимо было предварительно подготовить. Для обработки использовалась консольная программа ffmpeg, которая могла изменять битрейт аудио дорожки на необходимый для распознания речи. Запустив ее в командной строке с определенными параметрами (подробно останавливаться на параметрах запуска я не буду, все подробно описано в документации на сайте к самой программе), я опишу только те которые использовал: Данный пример позволяет преобразовать один файл.

ffmpeg –i file.wav –ar 16000 –ac 1 –f s16le new_file.wav
  • i – имя исходного файла, который предстоит с конвертировать;
  • ar – частота дискредитации;
  • ac – количество каналов;
  • f – формат аудио;

С помощью Python собираем список файлов и поочередно отправляем ffmpeg для обработки и получаем необходимый результат.

files_path= '/' for subdir, dirs, files in os.walk(files_path): print(subdir) for f in tqdm(files): if f.endswith('.wav'): process = subprocess.Popen( ['ffmpeg', '-loglevel', 'quiet', '-i',file,'-ar', '16000', '-ac', '1', '-f', 's16le', file.replace(path, new_path) ])

Но, если файлов достаточно много и мы будем обрабатывать их все по порядку, то обработка займет очень большое время, а нам надо еще передать эти файлы для распознания. Чтобы сократить время преобразования, аудио можно распараллелить работу. Сама программа ffmpeg это не умеет, и здесь в дело вступает модуль multiprocessing, с помощью этой библиотеки можно из python запускать несколько потоков (или дочерних процессов).

import multiprocessing as mp #импортируем библиотеку

В итоге я написал простенький код, который постарался подробно прокомментировать:

if __name__ == "__main__": path='files/' new_path='new_files/' file_collection = collect_files(path) #функция списка получения файлов count_t=5 #максимальное кол-во потоков count_p=len(file_collection)//count_t+1 #кол-во файлов для обработки в одном потоке procs=list() #Массив процессов list_path=[] #Список файлов для отправки в поток count=0 #Счетчик кол-ва файлов в потоке for file_path in file_collection: count+=1 list_path.append(file_path) if count_p==count: proc=mp.Process(target=stt_file, args=(list_path)) procs.append(proc) proc.start() count=0 list_path=[] for proc in procs: proc.join()

В коде видно, как мы получаем большой список файлов для обработки, запускаем цикл и собираем небольшой список для отправки в поток, так же в коде определяется количество потоков, их количество можем определить сами, но не желательно делать их больше, чем количество ядер на компьютере.

Как уже обратили внимание из прочтения кода, поток запускается следующими строками:

proc=mp.Process(target=stt_file, args=(list_path, path, new_path,))) proc.start() procs.append(proc)

Немного о параметрах передаваемых в mp.Process:

  • target=stt_file – это функция внутри файла, которая будет запускаться
  • args=(list_path, path, new_path,) – параметры передаваемые в функцию

Так же в коде присутствует цикл по массиву procs, это нужно для того чтобы родительский процесс не завершался, пока не отработают все дочерние процессы.

Важно обратить внимание что, для правильной работы потоков, которые будет порождать наш скрипт, необходимо учесть одну вещь, основной код должен быть изолирован. Что это значит? Скрипт должен быть завернут в условие if __name__ == «__main__»: если этого не сделать, скрипт попросту не будет запускаться и будет выпадать ошибка:

if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.

Для примера я запустил обработку на сервере в 20 потоков, самая долгая обработка у потока заняла 25 мин., обработка в один поток оценочно займет более 8 часов(см. скрин ниже).

Многопотоковая обработка файлов
Многопотоковая обработка файлов

Используя этот подход, мне удалось выполнить задачу в кратчайшие сроки и преобразовать свыше 500 тыс. аудио файлов всего за неделю, в то время, если бы я обрабатывал файлы в один поток, то было бы потрачено свыше ста дней, и все благодаря модулю multiprocessing который позволил запустить обработку в несколько потоков. У меня на этом все, ставьте лайки если понравилась статья и пишите в комментарии насколько была полезна статья.

Код, используемый в этой статье, можно найти по ссылке на Github’е: https://github.com/madeyneko/multi_audio

44
3 комментария

Потоки и процессы — не одно и то же.

А если просто запускать ffmpeg через xargs с ограничением числа параллельных запусков?

Автор

Да, такой вариант тоже возможен. В статье указана часть кода для примера. В своем проекте был использован ffmpeg для преобразования и вывода в модель VOSK.