{"id":14291,"url":"\/distributions\/14291\/click?bit=1&hash=257d5375fbb462be671b713a7a4184bd5d4f9c6ce46e0d204104db0e88eadadd","hash":"257d5375fbb462be671b713a7a4184bd5d4f9c6ce46e0d204104db0e88eadadd","title":"\u0420\u0435\u043a\u043b\u0430\u043c\u0430 \u043d\u0430 Ozon \u0434\u043b\u044f \u0442\u0435\u0445, \u043a\u0442\u043e \u043d\u0438\u0447\u0435\u0433\u043e \u0442\u0430\u043c \u043d\u0435 \u043f\u0440\u043e\u0434\u0430\u0451\u0442","buttonText":"","imageUuid":""}

Как потратить 7 лет на написание 4 строк кода на Python?

В данной статье я хочу рассказать историю о тестировании.

Я работаю в команде, которая разрабатывает распределённую систему, написанную на Python. В нашей компании у нас есть довольно большой набор интеграционных тестов, которые выполняются с помощью Jenkins. По состоянию на январь 2023 года, выполнение всего пакета занимает около 5 часов. Мы реализуем все тесты на каждом PR, поэтому они обязательно должны выполняться параллельно. Эта история об эволюции способа их оптимизации.

Начало

В начале проекта,примерно в 2013 году, мы решили использовать программу для тестирования - nose. В то время это было неплохое решение; nose всё ещё был жив и популярен, и у нас был некоторый опыт работы с ним.

Выполнение интеграционных тестов, поначалу, было приятным и лёгким:

nosetests tests/integration/

Когда это стало раздражающе долгим (это было примерно в 2015 году), мы предприняли нашу первую попытку использовать распределённое тестирования. Наша идея была проста: создать список всех файлов, содержащих тесты, и разделить его на 3 части. Например, для 4 тестовых файлов, таких как test_a.py , test_b.py , test_c.py , test_d.py мы бы имели:

  • part 0 с test_a.py
  • part 1 с test_b.py
  • part2 с test_c.py и test_d.py

Это фрагмент из split_test_files.py (обратите внимание на старый синтаксис Python 2...):

part = int(sys.argv[1]) - 1 processes = int(sys.argv[2]) files = sys.stdin.read().split(' ') start = len(files) * part / processes if processes == part - 1: end = None else: end = len(files) * (part + 1) / processes print ' '.join(files[start:end])

...который был использован следующим образом:

PROCESSES=3 ALL_TEST_FILES=tests/integration/*.py for part_id in $(seq 1 ${PROCESSES}); do nosetests $(echo ${ALL_TEST_FILES} | python ./scripts/split_test_files.py ${part_id} ${PROCESSES}) & done

С этого момента “тестовая часть” означает один из процессов, которые выполняют тесты.

Мы решили, что наилучшее значение для $PROCESSES равно 3, поскольку наши интеграционные тесты являются тяжёлыми, и их выполнение в 3 отдельных процессах на одной машине создаёт большую нагрузку. Таким образом, PROCESSES=3 был не идеальным выбором, а скорее максимальным параллелизмом, который мы могли получить в то время.

Мы становимся умнее

Оказалось (что не было большим сюрпризом), что c файлом split_test_files.py иногда возникали ошибки. Наивное разделение на 3 части создало распределение, которое не обязательно было хорошо сбалансированным. Каждый тестовый файл может содержать разное количество тестов, и каждый тест может иметь разную продолжительность. Нашей следующей мыслью было связать продолжительность теста с размером тестового файла. Подробный алгоритм, основанный на размере файла, был следующим:

size_for_file = {file_name: get_file_size(file_name) for file_name in file_names} file_names_sorted_by_size = sorted(file_names, key=lambda x: size_for_file[x]) file_names_for_part = defaultdict(list) for i, file_name in enumerate(file_names_sorted_by_size): file_names_for_part[i % parts].append(file_name) for part, file_names_in_part in file_names_for_part.items(): print(' '.join(file_names_in_part))

Bash выполнил nosetests для каждой выходной строки.

По сути, мы попытались разделить файлы на подмножества одинакового общего размера. Конечно, этот метод также не был идеальным: тесты из маленьких файлов могут занимать много времени, а тесты из больших файлов могут быть быстрыми. Рассмотрим большой файл с одним коротким тестом, но множеством вспомогательных функций, множеством комментариев и т.д.; аналогично рассмотрим небольшой файл с тяжёлым и длинным циклом, повторяющим множество тестовых примеров. На самом деле, у нас были такие файлы в репозитории, поэтому нам нужна была ещё одна оптимизация.

Улучшение механизма

Следующая идея состояла в том, чтобы создать специальную строку в начале каждого тестового файла, которая более или менее описывала бы продолжительность тестов в файле. Итак, мы помещаем в начало каждого тестового файла данную строку:

# EXPECTED_TEST_FILE_DURATION_SEC=X

Чтобы, примерно, понимать время продолжительности теста каждого файла, возьмите первую строку, прочитайте ожидаемую продолжительность и используйте её для оценки. Мы использовали наивный алгоритм, помещающий целые числа в N сегментов:

class TestPart: def __init__(self): self.filenames = [] self.expected_len = 0 def estimated_test_length(file_name): with open(file_name, 'r') as f: l = f.readline() if 'EXPECTED_TEST_FILE_DURATION_SEC' in l[0]: return int(l.split('=')[1]) [...] [...] size_for_file = {file_name: estimated_test_length(file_name) for file_name in file_names} file_names_sorted_by_size = sorted(file_names, key=lambda x: size_for_file[x], reverse=True) test_parts = [TestPart() for _ in range(args.parts)] for file_name in file_names_sorted_by_size: test_part = smallest_test_part(test_parts) test_part.filenames.append(file_name) test_part.expected_len += size_for_file[file_name] for test_part in test_parts: print(' '.join(test_part.filenames))

Этот алгоритм сортирует файлы от самого большого к самому маленькому и помещает их в наименее занятую корзину. Если значения, которые возвращают эти заголовки,близки к истинному времени выполнения, такой алгоритм даёт хорошо сбалансированное распределение. Но! Возникли две новые проблемы: как эффективно рассчитать истинное время выполнения теста и как гарантировать, что эти заголовки останутся актуальными?

Автоматизация процесса

Сначала мы устанавливали значение EXPECTED_TEST_FILE_DURATION_SEC в соответствии с нашим внутренним чутьем. Но это было громоздко и ненадёжно. В 2017 году мы создали инструмент ( test-length-analyzer.py), который вычислял среднюю продолжительность выполнения каждого тестового файла. Вот его фрагмент кода:

durations=defaultdict(list) for build_id in range(args.build_id_start, args.build_id_end + 1): durations_by_filename = analyze_build(args.test_url, build_id) for filename in durations_by_filename: durations[filename].append(durations_by_filename[filename]) for filename in durations: print('{filename}: {median_dur}'.format(filename=filename, median_dur=median(durations[filename])))

Затем нам нужно было автоматически обновлять файлы с тестами, поэтому кто-то внедрил test-length-analyzer.py --update обновления, которые заменили значения EXPECTED_TEST_FILE_DURATION_SEC во всех файлах интеграционных тестов.

А затем наступила эра таких коммитов, как:

  • update expected tests lengths
  • update test duration
  • test duration update
  • updates tests' lengths
  • update expected test file durations
  • и так далее...

Распределение процессов по множеству механизмов

Ограничение в 3 процесса тестирования со временем становилось всё более болезненным. Таким образом, нам пришла в голову идея использовать Multi-configuration Job Jenkins для выполнения множества экземпляров nosetests, потенциально на многих механизмах. Этот плагин позволил нам добавить пользовательские оси, параметризованные произвольными значениями (здесь: идентификаторы деталей), к которым можно было получить доступ в каждом воплощении задания.

Мы произвольно установили количество тестовых частей равным 8. Это было намного больше, чем 3, и процесс выполнялся проще, чтобы не загромождать всю инфраструктуру Jenkins , когда 2 или 3 человека одновременно выполняли тесты по своим запросам на извлечение.

Нам нужно было только убедиться, что алгоритм, сгенерировавший тестовое распределение, был детерминированным. К счастью, split_test_files.py всегда генерируется один и тот же вывод для одного и того же ввода. Оставалась одна проблема: как обеспечить актуальность длины тестов?

Полностью автоматизированное распределение тестов

В 2018 году мы добавили ещё одно задание Jenkins , которое выполняло test-length-analyzer.py один раз в день. Оно сохраняло длины тестов в файл JSON, который позже был проанализирован split_test_files.py в каждом выполнении nosetests. Это остановило лавину расточительных и избыточных коммитов и избавило нас от большого количества ручной работы.

Прикосновение к сложной информатике

Можно было бы подумать, что наш алгоритм вычисления распределения был наивным, но на самом деле это было решение серьёзной теоретической проблемы, называемой Multiway Number Partitioning Problem, которая является NP-трудностью. Наша наивная реализация называется Greedy Number Partitioning, и (точнее) мы использовали её автономную версию, называемую LPT scheduling. Она может возвращать приближение типа 4/3 (что означает, что решение не хуже, чем 4/3 от оптимального решения), но эксперименты показали, что этого будет достаточно для наших нужд. Мы решили не исследовать более сложные алгоритмы, такие как алгоритм Kamarkar-Karp algorithm или Multifit algorithm, который дает аппроксимацию 13/11. Хотя это было заманчиво.

Решение новой проблемы

И вот мы достигли точки, когда всё было автоматизировано, продолжительность тестов была актуальной, распределения вычислялись детерминированным образом, все тестовые части охватывали все тесты, а результаты тестов были даже красиво представлены Jenkins. Реализация всего механизма заняла около 1000 строк кода (ой!), и было похоже, что на этом наша работа закончена.

На самом деле, не совсем. Была ещё одна проблема. Мы проводили тесты на разных машинах, и у этих машин были разные наборы аппаратного обеспечения. У нас было несколько быстрых машин (с меньшим количеством, но более быстрыми ядрами процессора) и несколько параллельно ориентированных хостов (с большим количеством, но более медленных ядер). Следовательно, если тестовая часть была выполнена на быстром компьютере, все тесты были значительно быстрее, чем среднее время выполнения, и вся тестовая часть завершалась раньше. Если тестовая часть выполнялась на более медленной машине, все тесты были значительно медленнее, чем среднее время, и вся тестовая часть завершалась позже. Различия были значительными , поэтому нам потребовалась ещё одна оптимизация.

Что мы сделали дальше, так это добавили ещё один уровень абстракции, вычисляя всё на каждой машине. На первый взгляд это может показаться простым, но было (как всегда) предостережение: тестовые части не обязательно запускаются в одно и то же время. У Jenkins ограниченное количество исполнителей на машину, поэтому, когда все исполнители на всех машинах заняты, тестовая часть помещается в очередь и ожидает, пока какой-нибудь исполнитель не станет доступен. Итак, теперь перестало быть очевидным, как обеспечить вариант, при котором все части совместно использовали бы одни и те же знания о тестовом распределении, поскольку алгоритм выдавал разные выходные данные для разных наборов машин.

Мы решили синхронизировать тестовые части. Мы внедрили другой алгоритм, на этот раз распределённый, который определял, какие части теста выполняются достаточно рано, чтобы получить точную ожидаемую продолжительность теста, а какие части выполняются слишком поздно и должны оцениваться по среднему значению медиан на машину…

Мы внесли ещё много улучшений:

  • У нас были графики времени выполнения для каждой машины.
  • Мы распространяли тестовые классы (не целые тестовые файлы).
  • Другие наборы тестов (например, сервисные тесты) со временем росли, и возникла необходимость распространять и их.

По сути, оборудование всё росло и росло. Его нужно было поддерживать, в нём были ошибки, и чем сложнее оно было, тем больше расстраивались разработчики, которые его использовали. Всё взывало к упрощению.

Эксплуатация Pytest!

Упрощение произошло в 2020 году. В начале пандемии мы перешли на удалённую работу, и у некоторых из нас было свободное время по вечерам. Мы перенесли всю нашу тестовую среду в pytest. Мы долго обдумывали это, но было трудно найти достаточно времени, чтобы перерыть все кроличьи норы. В конце концов, примерно в мае 2020 года nose исчез навсегда.

pytest задуман таким образом, чтобы разработчик мог легко писать новые плагины. Код плагина регистрируется с помощью хуков, например, если вы хотели пропустить все тесты, которые заканчиваются на букву y, вы просто написали такой хук:

def pytest_runtest_setup(item): if item.nodeid[-1] == 'y': pytest.skip("test name ends with 'y', skipping")

pytest может выполнять это на этапе настройки, перед запуском каждого теста. Вы можете найти подробную информацию в документации pytest.

Итак, отлично, мы могли бы пропустить тест, если бы его выполнял какой-то другой процесс. Нам просто нужно было синхронизировать тестовые части. К счастью, в нашей тестовой инфраструктуре есть общая точка монтирования, которая видна через NFS каждому работнику Jenkins. Нам просто нужно было синхронизировать тестовые процессы с файлом блокировки и иметь другой файл, который использовался бы в качестве реестра выполненных тестов. Весь хук был более или менее следующим:

class DistributeTestsPlugin: def __init__(self, config): self.part_id = config.getvalue("part_id") self.synchronization_file = os.path.join(config.getvalue("synchronization_dir"), config.getvalue("build_id")) self.synchronization_file_lock = FileLock(f"{self.synchronization_file}.lock") self.registered_tests : Dict[str, int] = {} def pytest_runtest_setup(self, item): with self.synchronization_file_lock.acquire(timeout=30): with open(self.synchronization_file, "r") as f: self.registered_tests = self._parse_distributions(f.read()) if item.nodeid in self.registered_tests.keys(): other_part = self.registered_tests[item.nodeid] pytest.skip(f"part {other_part}") with open(self.synchronization_file, "a") as f: f.write(self._pickle_item(item)) self.registered_tests[item.nodeid] = self.part_id

Это было значительно проще, чем все алгоритмы аппроксимации, машинно-ориентированное время выполнения, автономное планирование тестирования и так далее. Большое спасибо pytest!

Рассмотрим пример выполнение теста из 5 частей. Если первая часть завершилась неудачей перед выполнением какого-либо теста (например, произошел сбой всего хоста, произошла ошибка настройки репозитория), остальные четыре части разделили бы все тесты между собой.

К сожалению, реальность редко бывает такой простой. Несмотря на приведённый выше фрагмент кода, в плагине было ещё 100 строк, состоящих из ведения журнала, множества проверок работоспособности и поддержки распространения по тестовым классам. В нём также было одно важное предположение о том, что все тестовые части выполняют тесты точно в одном и том же порядке. Но все же миру, существовавшему до pytest, требовалось более 1000 строк, так что упрощение было поразительным.

Новая реализация

Вышеприведённая версия просуществовала около года. В 2021 году мы столкнулись с другой проблемой. В какой-то новой версии библиотеки, которая обеспечивала реализацию класса FileLock, была ошибка, которая появлялась при блокировке через NFS. Мы решили переписать наш плагин, чтобы он вообще не использовал блокировку файлов. Новая реализация ловко избежала блокировки, и её единственным предположением было то, что одна функция write() выполнялась атомарно.

Я не буду вставлять эту версию алгоритма здесь, так как она довольно длинная и очень трудная для понимания. Кроме того, в ней также были некоторые ошибки, которые незаметно сохранялись в коде около года. Эти ошибки были ужасными: два процесса иногда (очень редко) могли записывать в файл с одинаковым смещением, и один мог стереть несколько байтов, только что записанных другим. Кроме того, оказалось, что наше предположение об атомарности единственной функции write() было ошибочным… что чётко написано в официальном Linux NFS FAQ: Протокол NFS не поддерживает атомарную запись с добавлением, поэтому записи с добавлением никогда не являются атомарными в NFS для любой платформы. Нам снова пришлось отступить от сложностей, о которых мы только что написали.

Эксплуатация redis!

Когда были выявлены проблемы с NFS (незадолго до Рождества 2022 года), мы обсудили возможность использования redis. Мы думали об этом решении раньше, но не хотели добавлять ещё один сервис, на который полагалась большая часть платформы тестирования. Однако, в конце концов, мы решили попробовать. Это подтолкнуло нас к следующему решению:

class DistributeTestsPlugin: def __init__(self, config): self.test_order: list[str] = [] self.synchronization_id = config.getvalue('synchronization_id') self.part_id = config.getvalue("part_id") self.redis = redis.Redis(config.getvalue('redis_url')) self.redis_lock = self.redis.lock(name=self.synchronization_id + '.lock') self.redis_parts_name = f"{self.synchronization_id}.parts" self.redis_queue_name = f"{self.synchronization_id}.deque" self.redis_test_to_part_id_dict_name = f"{self.synchronization_id}.dict" def pytest_collection_modifyitems(self, session, config, items): self.test_order = [item.nodeid for item in items] with self.redis_lock: if self.redis.scard(self.redis_parts_name) == 0: self.redis.lpush(self.redis_queue_name, *self.test_order) self.redis.sadd(self.redis_parts_name, self.part_id) self.test_order = [item.nodeid for item in items] self._fetch_next_test() def pytest_runtest_setup(self, item): if item.nodeid != self.next_test_to_process: assigned_to_part_id = self._get_part_id_for_test(item.nodeid) pytest.skip(f"part {assigned_to_part_id}") def pytest_runtest_logfinish(self, nodeid, location): if self.next_test_to_process == nodeid: self._fetch_next_test() def _fetch_next_test(self): self.next_test_to_process = self.redis.rpop(self.redis_queue_name) if self.next_test_to_process is not None: # there is still a test to be processed self.next_test_to_process = self.next_test_to_process.decode('utf-8') self.redis.set(self.redis_test_to_part_id_dict_name + self.next_test_to_process, self.part_id, ex=24*3600) def _get_part_id_for_test(self, nodeid): start = time.time() while time.time() - start < 30: res = self.redis.get(self.redis_test_to_part_id_dict_name + nodeid) if res is None: time.sleep(0.5) continue return res.decode('utf-8')

Алгоритм кратко изложен в двух предложениях: перед всеми тестами каждый процесс проверяет (используя redis), является ли он первым. Если да, то он заполняет очередь тестирования. Позже все части получают свои тесты из очереди.

Если вы присмотритесь повнимательнее, то легко сможете увидеть некоторые несовершенства:

  • Нам нужно было синхронизироваться с самого начала.
  • Нам нужно было активно ждать, чтобы проверить, какая часть на самом деле выполняет тест.
  • Нам нужно было использовать три разных хука.
  • Нам нужно было убедиться, что все части выполняют тесты точно в одном и том же порядке.

Может быть, это всё ещё было слишком сложно?

Всё ещё слишком сложно

Внимательно изучив этот алгоритм, мы обнаружили, что на самом деле никакая очередь не нужна. Мы могли бы просто проверить, был ли тест выполнен другой частью! Синхронизация была необходима только при получении значения из redis:

class DistributeTestsPlugin: def __init__(self, config): self.synchronization_id = config.getvalue('synchronization_id') self.part_id = config.getvalue("part_id") self.redis = redis.Redis(config.getvalue('redis_url')) self._current_part_id = None def _try_register(self, test_id): with self.redis.pipeline() as p: p.watch(test_id) if (ret := p.get(test_id)) is None: p.multi() p.set(test_id, self.part_id, ex=24*3600) try: p.execute() return self.part_id except redis.WatchError: ret = self.redis.get(test_id) return ret.decode('utf-8') def pytest_runtest_logstart(self, nodeid, location): self._current_part_id = self._try_register(self.synchronization_id + nodeid) def pytest_runtest_setup(self, item): if self._current_part_id != self.part_id: pytest.skip(f"part {self._current_part_id}")

Приведённый выше код должен синхронизироваться только в одной точке: при фактическом получении значения части, которая выполняет тест, и при установке этого значения. redis предлагает приятную концепцию транзакций, которая может реализовать оптимистичную блокировку, как это сделано выше. Значение можно просмотреть и изменить только в том случае, если никто другой не изменил его во время просмотра.

С помощью этой реализации мы были освобождены от предположения, что каждая тестовая часть пытается выполнять тесты в одном и том же порядке. Мы просто проверили, был ли тест зарезервирован другой частью: если нет, мы могли бы выполнить его. Так просто, но в то же время так гениально!

Снова улучшаем наш плагин

Мы очень гордились последней версией реализации плагина. Однако, кто-то снова задал тот же вопрос: не могли бы мы упростить его ещё больше? И ответ был: Да, могли бы. Вместо использования WATCH мы могли бы просто выполнить setnx(), который является своего рода операцией тестирования и установки. Как показано ниже:

class DistributeTestsPlugin: def __init__(self, config): self.synchronization_id = config.getvalue('synchronization_id') self.part_id = config.getvalue("part_id") self.redis = redis.Redis(config.getvalue('redis_url')) def pytest_runtest_setup(self, item): redis_test_key = self.synchronization_id + item.nodeid if self.redis.setnx(redis_test_key, self.part_id) == 0: pytest.skip(f"part {self.redis.get(redis_test_key).decode('utf-8')}") self.redis.expire(redis_test_key, 24 * 3600)

Решающий (и единственный необходимый) хук состоит из 4 линий.

Подождите, неужели это действительно так просто?

Э-э, да. И это немного неловко признавать. У меня есть только одно объяснение: нам просто нужно было больше времени, чтобы сделать его короче. И быть среди таких людей, как Блез Паскаль, - большая привилегия. :)

Наш 7-летний способ интеграционного тестирования в одной таблице:

Размещение продукта

Четыре Строки теперь выпущены как проект с открытым исходным кодом и доступны здесь.

Статья была взята из этого источника:

0
Комментарии
-3 комментариев
Раскрывать всегда