DataFrame API – придётся учить
При написании запросов во многих базах данных используется, в целом, схожий синтаксис, могут отличаться названия функций, типов данных и т.п. Научившись работать, например, в MS SQL Server затем можно довольно быстро освоиться в Teradata или Greenplum. Особняком стоит PySpark – это API Apache Spark, применяемый для распределенной обработки больших данных. Его отличие в том, что при написании запросов могут использоваться разные техники, в том числе:
1. Использование SQL запросов
2. Использование DataFrame API.
Если первый способ будет понятен подавляющему большинству пользователей баз данных, то для применения второго требуется некоторое время, чтобы его освоить. В данной статье я приведу примеры аналогов основных конструкций, используемых в SQL, написанных на синтаксисе Spark DataFrame.
Перед работой в PySpark рекомендую выполнить импорт всех функций и типов данных:
Пример выбора всех строк из таблицы, написанного на языке SQL в PySpark:
spark.sql – означает, что будет использован текст SQL запроса. Сам запрос передается в качестве аргумента.
Причем при запуске данной строки набор данных сгенерирован не будет поскольку в Spark используются «ленивые» вычисления. Для выполнения данного запроса необходимо выполнить, так называемое, действие (action): .show(), .count(), .take(), .write, .toPandas() и т.д.
Например, отображение 10 строк из таблицы, поля «field_1», «field_2»:
При использовании PySpark DataFrame API текст запроса не используется, а вместо spark.sql применяется spark.table:
В select() указываем названия полей, которые необходимо выгрузить.
Выборка по условию:
Из таблицы «tablename» берем строки, которые содержат значения = ‘Condition_1’ в поле «field_2» и значения = ‘Condition_2’
SQL:
PySpark Dataframe API:
Так же вместо filter() можно использовать where() – это одно и то же. Синтаксис указания условий точно такой же, как в SQL.
Сортировка и удаление дубликатов:
Из таблицы «tablename» два поля «date_field» и «oper_amt», сортируем по убыванию дат («date_field»), убираем дублирующиеся строки.
SQL:
PySpark Dataframe API:
Добавление колонки с одинаковым значением:
Добавим столбец, который называется «new_field» и содержит одинаковое значение - ‘Some_text’
SQL:
PySpark Dataframe API:
Добавление колонки с case-when:
Добавим поле «operation_name» по условию, если значение в поле «operation_id» равно 1, то ‘Category_1’, иначе ‘Category_2’
SQL:
PySpark Dataframe API:
Так же внутри when() можно указывать несколько условий, для их соединения используются побитовые операторы (& - и, | - или), а каждое условие заключается в скобки:
Внутрь otherwise можно добавлять еще один when():
Функции аггрегации:
Посчитаем общую сумму транзакций за каждый день.
SQL:
PySpark Dataframe API:
Пример указан для функции sum, аналогично применяются и другие функции : count(), min(), max() и т.п.
Оконные функции:
Выведем строку, которая содержит максимальную сумму операции по каждой «operation_id».
SQL:
PySpark Dataframe API:
Join-ы:
К таблице «tablename_1» добавим название операции, которое найдено в таблице «tablename_2» по полю «operation_id».
SQL:
PySpark Dataframe API:
'inner' – способ соединения таблиц, соответствует SQL ‘inner join’ или просто ‘join’, остаются строки из обеих таблиц, в которых найдено совпадение.
Так же есть следующие способы join-ов:
'leftouter'– соответствует SQL ‘left join’ (‘left outer join’), остаются все строки из первой (левой) таблицы, добавляются строки из второй (правой) таблицы, где найдено совпадение. Если совпадение не найдено, то ставится Null.
'rightouter'– соответствует SQL ‘right join’ (‘right outer join’), остаются все строки из правой таблицы, добавляются строки из левой таблицы, где найдено совпадение. Если совпадение не найдено, то ставится Null.
'fullouter'– соответствует SQL ‘full join’ (‘full outer join’), остаются все строки из левой и правой таблиц, где найдено совпадение. Если совпадение не найдено, то ставится Null.
'leftsemi'– остаются строки из левой таблицы, где найдены совпадения, из правой таблицы поля не берутся.
'leftanti'– остаются строки из левой таблицы, где НЕ найдены совпадения, из правой таблицы поля не берутся.
Основное и главное преимущество использования DataFrame API - это унифицированный формат работы с базами данных, который одинаково хорошо отрабатывается и в PySpark и в Java, а так же ряде других актуальных движках, в которых так или иначе, приходится подключать вычисления BigData. В минусы можно записать только необходимость выучить новый синтаксис.