{"id":14276,"url":"\/distributions\/14276\/click?bit=1&hash=721b78297d313f451e61a17537482715c74771bae8c8ce438ed30c5ac3bb4196","title":"\u0418\u043d\u0432\u0435\u0441\u0442\u0438\u0440\u043e\u0432\u0430\u0442\u044c \u0432 \u043b\u044e\u0431\u043e\u0439 \u0442\u043e\u0432\u0430\u0440 \u0438\u043b\u0438 \u0443\u0441\u043b\u0443\u0433\u0443 \u0431\u0435\u0437 \u0431\u0438\u0440\u0436\u0438","buttonText":"","imageUuid":""}

DataFrame API – придётся учить

При написании запросов во многих базах данных используется, в целом, схожий синтаксис, могут отличаться названия функций, типов данных и т.п. Научившись работать, например, в MS SQL Server затем можно довольно быстро освоиться в Teradata или Greenplum. Особняком стоит PySpark – это API Apache Spark, применяемый для распределенной обработки больших данных. Его отличие в том, что при написании запросов могут использоваться разные техники, в том числе:

1. Использование SQL запросов

2. Использование DataFrame API.

Если первый способ будет понятен подавляющему большинству пользователей баз данных, то для применения второго требуется некоторое время, чтобы его освоить. В данной статье я приведу примеры аналогов основных конструкций, используемых в SQL, написанных на синтаксисе Spark DataFrame.

Перед работой в PySpark рекомендую выполнить импорт всех функций и типов данных:

from pyspark.sql.functions import * from pyspark.sql.types import *

Пример выбора всех строк из таблицы, написанного на языке SQL в PySpark:

tbl = spark.sql("select * from tablename")

spark.sql – означает, что будет использован текст SQL запроса. Сам запрос передается в качестве аргумента.

Причем при запуске данной строки набор данных сгенерирован не будет поскольку в Spark используются «ленивые» вычисления. Для выполнения данного запроса необходимо выполнить, так называемое, действие (action): .show(), .count(), .take(), .write, .toPandas() и т.д.

Например, отображение 10 строк из таблицы, поля «field_1», «field_2»:

tbl = spark.sql("select field_1, field_2 from tablename") tbl.show(10)

При использовании PySpark DataFrame API текст запроса не используется, а вместо spark.sql применяется spark.table:

tbl = spark.table("tablename").select('field_1','field_2') tbl.show(10)

В select() указываем названия полей, которые необходимо выгрузить.

Выборка по условию:

Из таблицы «tablename» берем строки, которые содержат значения = ‘Condition_1’ в поле «field_2» и значения = ‘Condition_2’

SQL:

tbl = spark.sql("select * from tablename where filed_1 = 'Condition_1' and field_2 = 'Condition_2'")

PySpark Dataframe API:

tbl = spark.table("tablename").filter("filed_1 = 'Condition_1' and field_2 = 'Condition_2'") или tbl = spark.table("tablename")\ .filter("filed_1 = 'Condition_1'")\ .filter("field_2 = 'Condition_2'")

Так же вместо filter() можно использовать where() – это одно и то же. Синтаксис указания условий точно такой же, как в SQL.

Сортировка и удаление дубликатов:

Из таблицы «tablename» два поля «date_field» и «oper_amt», сортируем по убыванию дат («date_field»), убираем дублирующиеся строки.

SQL:

tbl = spark.sql("select distinct date_field, oper_amt from tablename order by date_field desc")

PySpark Dataframe API:

tbl = spark.table("tablename").select('date_field', 'oper_amt').orderBy(desc('date_field')).distinct() .orderBy() – сортировка. desc() – по убыванию, если требуется сортировка по возрастанию, то ничего не указывается. .distinct () – удаление дублирующихся строк.

Добавление колонки с одинаковым значением:

Добавим столбец, который называется «new_field» и содержит одинаковое значение - ‘Some_text’

SQL:

tbl = spark.sql("select date_field, 'Some_text' as new_field from tablename")

PySpark Dataframe API:

tbl = spark.table("tablename").withColumn("new_field", lit('Some_text')) или tbl = spark.table("tablename") tbl = tbl.withColumn("new_field", lit('Some_text')) lit() – функция для использования константы в качестве значений.

Добавление колонки с case-when:

Добавим поле «operation_name» по условию, если значение в поле «operation_id» равно 1, то ‘Category_1’, иначе ‘Category_2’

SQL:

tbl = spark.sql("select operation_id, case when operation_id = 1 then 'Category_1' else 'Category_2' end as operation_name from tablename")

PySpark Dataframe API:

tbl = spark.table("tablename") tbl = tbl.withColumn("operation_name", when(col('operation_id') == 1, 'Category_1').otherwise('Category_2')) when() – указывается условие, а после запятой – значение, если условие соблюдается. otherwise() – значение, которое будет записано. Если условие не выполняется.

Так же внутри when() можно указывать несколько условий, для их соединения используются побитовые операторы (& - и, | - или), а каждое условие заключается в скобки:

tbl = tbl.withColumn("operation_name", when((col('operation_id') == 1) | (col('date_field') >= '2022-01-01'), 'Category_1').otherwise('Category_2'))

Внутрь otherwise можно добавлять еще один when():

tbl = tbl.withColumn("operation_name", when(col('operation_id') == 1, 'Category_1').otherwise(when(col('operation_id') == 2, 'Category_2').otherwise('Category_3')))

Функции аггрегации:

Посчитаем общую сумму транзакций за каждый день.

SQL:

tbl = spark.sql("select date_field, sum(oper_amt) as sum_oper from tablename group by date_field")

PySpark Dataframe API:

tbl = spark.table("tablename").groupBy("date_field").sum("oper_amt") .groupBy() – в скобках указываются поля аггрегации. .sum() – функция аггрегации.

Пример указан для функции sum, аналогично применяются и другие функции : count(), min(), max() и т.п.

Оконные функции:

Выведем строку, которая содержит максимальную сумму операции по каждой «operation_id».

SQL:

tbl = spark.sql('''select operation_id, date_field, oper_amt from ( select operation_id, date_field, oper_amt, row_number() over(partition by operation_id order by oper_amt desc) as rn from tablename group by date_field ) tbl where rn = 1''')

PySpark Dataframe API:

from pyspark.sql.window import Window tbl = spark.table('table_name')\ .withColumn('rn',row_number().over(Window.partitionBy('operation_id').orderBy(desc('oper_amt'))))\ .select('operation_id','date_field','oper_amt','rn')\ .filter("rn = 1")\ .drop("rn")

Join-ы:

К таблице «tablename_1» добавим название операции, которое найдено в таблице «tablename_2» по полю «operation_id».

SQL:

tbl = spark.sql('''select t1.operation_id, t1.date_field, t2.operation_name from tablename_1 t1 join tablename_2 t2 on t1.operation_id = t2.operation_id ''')

PySpark Dataframe API:

t1 = spark.table('tablename_1') t2 = spark.table('tablename_2') result = t1.join(t2, t1.operation_id == t2.operation_id, 'inner')\ .select(t1.operation_id, t1.date_field, t2.operation_name)

'inner' – способ соединения таблиц, соответствует SQL ‘inner join’ или просто ‘join’, остаются строки из обеих таблиц, в которых найдено совпадение.

Так же есть следующие способы join-ов:

'leftouter'– соответствует SQL ‘left join’ (‘left outer join’), остаются все строки из первой (левой) таблицы, добавляются строки из второй (правой) таблицы, где найдено совпадение. Если совпадение не найдено, то ставится Null.

'rightouter'– соответствует SQL ‘right join’ (‘right outer join’), остаются все строки из правой таблицы, добавляются строки из левой таблицы, где найдено совпадение. Если совпадение не найдено, то ставится Null.

'fullouter'– соответствует SQL ‘full join’ (‘full outer join’), остаются все строки из левой и правой таблиц, где найдено совпадение. Если совпадение не найдено, то ставится Null.

'leftsemi'– остаются строки из левой таблицы, где найдены совпадения, из правой таблицы поля не берутся.

'leftanti'– остаются строки из левой таблицы, где НЕ найдены совпадения, из правой таблицы поля не берутся.

Основное и главное преимущество использования DataFrame API - это унифицированный формат работы с базами данных, который одинаково хорошо отрабатывается и в PySpark и в Java, а так же ряде других актуальных движках, в которых так или иначе, приходится подключать вычисления BigData. В минусы можно записать только необходимость выучить новый синтаксис.

0
Комментарии
-3 комментариев
Раскрывать всегда