Быстрый старт в Apache Spark ML

Apache Spark – фреймворк, предоставляющий API для выполнения распределенной обработки данных. Его также можно использовать для разработки моделей машинного обучения и искусственного интеллекта, что позволит распараллеливать процессы обработки по нодам вычислительного кластера, влияя при этом на скорость обучения.

Apache Spark поддерживает языки Scala, Python, Java и R. PySpark – это Python API для использования Apache Spark. В данном материале рассмотрю реализацию модели градиентного бустинга над решающими деревьями (GBT) на примере популярного датасета табличных данных – Titanic (https://www.kaggle.com/competitions/titanic/code?competitionId=3136&searchQuery=pyspark). Реализованный пример поможет снизить пороги входа для использования Spark ML.

Spark ML является основной библиотекой для разработки моделей машинного обучения в Apache Spark. С помощью данного модуля можно решать множество задач: регрессия, классификация, кластеризация, снижение размерности. Можно также обрабатывать пропущенные значения, искать и устранять выбросы. Для борьбы с переобучением Spark ML поддерживает стандартную L1 и L2 регуляризацию. Для графовых структур Apache Spark имеет модуль GraphX, а для потоковой обработки данных Spark Streaming. Фреймворк активно развивается для достижения максимально быстрых результатов обучения моделей искусственного интеллекта.

Датасет Titanic содержит в себе характеристики человека, такие как пол, возраст, статус билета, цена билета, номер кабины и др. Необходимо предсказать выжил ли человек в результате крушения корабля или нет, основываясь на входных данных. Таким образом, реализую начало сессии PySpark и осуществлю просмотр первых десяти строк датасета. На данном этапе будет использоваться Spark DataFrame.

from pyspark.sql import SparkSession spark = SparkSession.builder.appName('example').getOrCreate() df = spark.read.csv('/train.csv', header=True, inferSchema=True) df.show(10)

Аналогичным образом можно выбрать только необходимые колонки для просмотра данных.

df1.select('Survived', 'Name', 'Parch').show(1)

Далее проведу обзорный взгляд на данные с помощью формирования дополнительных датафреймов. Таким образом, посчитаю сколько человек выжило и нет.

df.groupBy('Survived').count().show() +--------+-----+ |Survived|count| +--------+-----+ | 1| 342| | 0| 549|

Сколько человек мужчин и сколько человек женщин.

df.groupBy('Survived').pivot('Sex').count().show() +--------+------+----+ |Survived|female|male| +--------+------+----+ | 1| 233| 109| | 0| 81| 468|

Можно посмотреть сколько пропущенных значений наблюдается в датасете.

for col in df1.columns: print(col, df.filter(df[col].isNull()).count())

Теперь перейду непосредственно к части Spark построения модели. Использую StringIndexer для перевода текстовой фичи – пол, в категориальную переменную.

from pyspark.ml.feature import StringIndexer new_index = StringIndexer(inputCols=['Sex'], outputCols=['SexNum']) stringIndex_model = new_index.fit(df) df_ = stringIndex_model.transform(df).drop('Sex')

Далее буду использовать инициализацию VectorAssembler для формирования матрицы объясняющих переменных и целевой переменой. VectorAssembler формирует один векторный столбец из заданного ему на вход списка столбцов (фичей + целевой переменной). Не может работать со строковыми данными.

from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=df_.columns[1:], outputCol='features') df_ = assembler.transform(df_).select('features', 'Survived')

После этого стандартным образом делю выборку на тренировочную и тестовую в пропорции 70% и 30%.

train_df, test_df = df_.randomSplit([0.7, 0.3])

Теперь могу перейти к обучению модели и определению метрики качества построенной модели. Для этого сперва необходимо инициализировать экземпляр MulticlassClassificationEvaluator(), в который необходимо передать целевую переменную и метрику качества. Для оценки данной модели буду использовать метрику accuracy, так как она является распространенной и легко применимой.

evaluation = MulticlassClassificationEvaluator(labelCol='Survived', metricName='accuracy')

Теперь обучаю модель градиентного бустинга.

gradient_boosting = GBTClassifier(labelCol='Survived') model = gradient_boosting.fit(train_df) pred = model.transform(test_df) evaluation.evaluate(pred)

В результате построения такой модели метрика получилась 0,85.

Таким образом, в рамках данной задачи были продемонстрированы первоначальная обработка данных датасета, а также обучение модели градиентного бустинга. Показаны базовые трансформации и действия, необходимые для получения результата обучения модели, что служит хорошим и быстрым стартом для понимания работы Spark ML.

5
Начать дискуссию