Как создать и удалить таблицы в Apache Hadoop c использованием PySpark

Сегодня я расскажу, как затратив минимум усилий при работе с большими данными, справиться с задачей создания таблиц с нужными параметрами, а также, как удалить сразу большое количество потерявших актуальность таблиц.

Текущую деятельность уже невозможно представить без использования больших данных. Предлагаю рассмотреть некоторые методы оптимизации своих усилий в работе с большими данными при решении задач создания и удаления таблиц. Но для начала разберемся в терминах.

Apache Hadoop – это пакет утилит, библиотек и фреймворков, его используют для построения систем, которые работают с Big Data. Он позволяет хранить и обрабатывать данные для выгрузки в другие сервисы.

Hive – система хранения данных, которая помогает запрашивать большие наборы данных в HDFS, использует HQL (Hive Query Language) – язык запросов Hive. Для того чтобы использовать данные выборок Apache Hadoop для дальнейшей их обработки и анализа, можно сохранять их в свои пользовательские таблицы. Существует несколько способов создания таблиц. Самый «лёгкий» способ сохранить данные - написать в Hive QL запрос:

Create table database_lab.mytable as Select a.* from databasename.tablename a;

Здесь и далее указан «простой select», но в вашем случае это может быть сложный запрос с большим количеством таблиц (с использованием join). Выбираем таблицу, структуру которой хотим воссоздать. Данный способ нежелателен, так как таблица будет создаваться в процессе запроса. Другой способ – создать совсем новую таблицу:

Create table database_lab.mytable (column1 string, column2 bigint,……. , columnN int);

После этого уже можно будет заливать выбранные данные в готовую таблицу:

Insert into database_lab.mytable Select a.* from databasename.tablename a;

В данном методе самой трудоёмкой задачей является описание полей и их тип данных. Для решения этой задачи я предлагаю использовать PySpark. Запускаю пользовательскую сессию PySpark с помощью Jupyter NoteBook. Устанавливаю конфигурацию параметров SparkContext и подключаюсь к Apache Hadoop c использованием PySpark:

import os import sys from PySpark import SparkConf from PySpark.sql import HiveContext, SQLContext conf = SparkConf().setAppName('Create Drop Table').setMaster('yarn') conf.setAll([ ## здесь ваши настройки ]) spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate() hivecontext = HiveContext(spark) sqlContext =SQLContext(hivecontext)

Следующий шаг – напишу запрос, с помощью которого получаю данные для анализа:

query = “Select a.* from databasename.tablename a”

Создаю spark DataFrame, эта операция не нагружает систему, так как фактически к данным обращения нет:

df_sql = sqlContext.sql(query)

Набираю следующую команду:

df_sql

DataFrame[column1 : string, column2: bigint,……. , columnN : int]

Видим практически готовую структуру для создания таблицы. Сделаю несколько преобразований и получаю готовый скрипт для создания пустой таблицы:

# возьмём это значение и преобразуем в строку df_sq_str = str(df_sql) df_sq_str

‘DataFrame[column1 : string, column2: bigint,……. , columnN : int]’

# заменим значения “DataFrame[, ], :” columns_table = df_sq_str.replace(‘DataFrame[’,’’) .replace(‘]’,’’) .replace(‘:’,’’) # напишем команду запроса для создания таблицы hql_create_table = ‘Create table database_lab.mytable ’ + ‘(‘ + columns_table + ‘)’ hql_create_table

‘Create table database_lab.mytable (column1 string, column2 bigint,……. , columnN int)’

На последнем этапе запускаю готовую команду. Это можно осуществить как в PySpark, так и в Hive.

Рано или поздно наступает момент, когда данные таблиц уже использованы и потеряли свою актуальность. Рассмотрю, как удалить свои таблицы с неактуальными данными. Если у вас не так много таблиц в Hadoop, то проблем с удалением не возникает.

Набираю в Hive QL:

Drop table database_lab.mytable;

Однако, когда их больше 1000, и кроме ваших таблиц есть таблицы других пользователей, то возникает вопрос как быстро удалить уже не нужные вам таблицы. Для этого необходимо при создании таблиц стандартизировать их название. Например:

tmp_IvanovVS_kreditfl_2022_08_08_1

где: tmp – временная (промежуточная),

IvanovVS – идентификатор принадлежности, kreditfl – тип данных, 2022_08_08 – дата создания, 1 – номер таблицы, если их больше 1.

Для удаления таблиц снова воспользуюсь возможностями PySpark. Запускаю пользовательскую сессию PySpark с помощью Jupyter NoteBook, устанавливаю конфигурацию параметров SparkContext и подключаюсь к Apache Hadoop c использованием PySpark как указано выше. Создаю Spark DataFrame таблиц базы данных:

df_table_list = spark.sql(“””show tables in database_lab”””) #Выберем списком названия таблиц tables_all = [row.tablename for row in df_table_list] #создадим список моих таблиц tables_me = [] for table_name in tables_all: if ‘IvanovVS’ in table_name: tables_me.append(table_name)

Теперь можно удалить все таблицы:

for table_name in tables_me: query =”drop table database_lab.” + table_name df_drop_table = sqlContext.sql(query) print(‘Удалена таблица {0}’.format(table_name))

Таким образом будут удалены все ваши таблицы. Можно также выбирать таблицы для удаления с помощью дополнительных параметров, заложенных в названии таблиц.

В результате, используя предложенные методы PySpark, можно значительно упростить работу по созданию и удалению собственных таблиц в Apache Hadoop.

22
Начать дискуссию