Обработка больших данных средствами PySpark SQL

Наиболее популярными инструментами для сбора и обработки больших данных являются Hadoop MapReduce и Apache Spark. Оба этих инструмента имеют свои плюсы и минусы, однако в данной статье мы будем рассматривать Apache Spark, в частности библиотеку для python PySpark.

Любая программа на PySpark начинается с определения конфигурации запускаемой задачи. В PySpark конфигурация задается классом SparkConf. Основными конфигурационными параметрами данного класса являются методы setAppName(‘SparkJobName’) – задает наименование Spark job. И метод setMaster(connection_url) – задает ссылку на подключение к кластеру данных. Данный параметр принимает следующие параметры:

  • URI кластера,

  • ‘yarn-client’ — указатель на использование yarn менеджера задач,

  • ‘local/local[*]’– указатель на использование локального компьютера в качестве хранилища данных.

Остальные конфигурационные параметры задаются методом set(‘name_conf_param,’ value’), принимающим на вход 2 параметра:

  • Параметр конфигурирования spark job;

  • Значение этого параметра.
from pyspark import SparkContext, SparkConf, HiveContext conf = SparkConf().setAppName('SparkContextExampleNew')\ .setMaster("yarn-client")\ .set('spark.dynamicAllocation.enabled', 'false')\ .set('spark.ui.showConsoleProgress','true')\

После того, как определили параметры конфигурации, необходимо вызвать метод SparkContext и передать в него параметры SparkConf:

sc = SparkContext.getOrCreate(conf=conf)

Для работы непосредственно с кластером данных удобно использовать не так давно добавленное API SparkSql, позволяющее более просто, а главное, в большинстве случаев, более оптимально производить различные манипуляции с данными.

Для работы с данным API на кластере необходимо инициализировать HiveContext:

sqlc = HiveContext(sc)

Ниже представлены простейшие операции, такие как:

  • Select
  • Update
  • Truncate
  • Delete
  • Create
#create sqlc.sql(f'create table default.test_table (id bigint, name string)').show() #insert numbers = [ 1, 2, 3, 4, 5, ] for num in numbers: sqlc.sql(f'insert into default.test_table (id, name) set ({num}, value_{str(num)})').show() #select df = sqlc.sql(f'select * from default.test_table').collect() df_.coalesce(1).write.csv(f'some_file.csv', sep=';') #преобразование RDD Dataframe в Pandas Dataframe df_ = sqlc.sql(f'select * from default.test_table)').toPandas() df_.head() #update sqlc.sql(f'update default.test_table set name = a.name + '_0' from default.test_table a').show() sqlc.sql(f'select * from default.test_table').show() #truncate sqlc.sql(f'truncate table default.test_table').show() sqlc.sql(f'select * from default.test_table').show() #delete sqlc.sql(f'drop table default.test_table').show()

Помимо вышеизложенных операций, pyspark позволяет выгружать данные в различные форматы 2 способами:
используя встроенные методы загрузки RDD Dataframe, либо преобразуя RDD Dataframe в Pandas Dataframe методом Pandas:

#select df = sqlc.sql(f'select * from default.test_table').collect() df_.coalesce(1).write.csv(f'some_file.csv', sep=';') #преобразование RDD Dataframe в Pandas Dataframe df_ = sqlc.sql(f'select * from default.test_table').toPandas() df_.to_csv(f'some_file.csv', sep=';')

После того, как наша задача полностью отработает, необходимо завершить spark job и освободить ресурсы кластера. В pyspark за это отвечает метод stop() класса SparkContext.

sc.stop()

Так же стоит отметить следующее, если spark job по тем или иным причинам упал, то его необходимо так же завершить методом stop.

По итогу мы имеем достаточно мощный и быстрый инструмент, позволяющий, не изучая специализированные методологии и среды разработки экосистемы Hadoop, эффективно и просто обрабатывать огромные объемы данных, чем он и привлек нас.

Начать дискуссию