Из Kafka в Elasticsearch с помощью sink-коннектора OpenSearch: практический пример
Как передать JSON-документы из топика Kafka в Elasticsearch, используя OpenSearch Sink Connector. Подробная демонстрация с настройкой и регистрацией коннектора в Kafka Connect.
Настройка sink-коннектора и отправка в Kafka Connect
Как передать данные из Kafka в Elasticsearch, я уже показывала здесь, развернув экземпляр Kafka в облаке на платформе Upstash. Однако, с 1 октября 2024 года Upstash перестала поддерживать коннекторы, а также с марта 2025 года вообще отключает инстансы этой платформы потоковой передачи событий. Поэтому мне пришлось развернуть образ от Confluent со всеми сопутствующими сервисами (GUI на AKHQ, Connect, Schema Registry) в Docker на своем локальном компьютере. На этом локальном развертывании, получив данные из PostgreSQL с помощью JDBC-коннектора источника, я решила передать их в облачный Elasticsearch 7.10.2, экземпляр которого у меня развернут в сервисе Bonsai.
Скачав с хаба Confluent архив Self-Hosted ElasticSearch Sink Connector confluentinc-kafka-connect-elasticsearch-14.1.2, его надо развернуть в папке jars, которая находится в директории с конфигурационным YAML-файлом контейнеров docker-compose. После этого можно настраивать конфигурацию коннектора и отправлять его в Connect, передав JSON-файл конфигурации с помощью POST-запроса.
Конфигурация моего коннектора к Elasticsearch 7.10.2, описанная в файле elastic-sink-connector.json, выглядела так:
Но попытка отправить коннектор с этой конфигурацией в Kafka Connect с помощью инструкции
выдала ошибку
Эта ошибка связана с несовместимостью между используемой версией Elasticsearch и ожидаемой версией от коннектора. В частности, Confluent Elasticsearch Sink Connector требует коммерческую, а не OSS-версию Elasticsearch, которая поддерживает определённые функции. Поэтому вместо sink-коннектора к Elasticsearch пришлось использовать его открытую альтернативу: Opensearch. OpenSearch представляет собой вариацию Elasticsearch и Kibana, созданную после изменения лицензии Elasticsearch. Он предоставляет аналогичные функции для поиска и аналитики данных. Sink-коннектор OpenSearch работает аналогично Confluent Elasticsearch Sink Connector, позволяя передавать данные из топика в документо-ориентированную БД для индексации и поиска.
Скачав с Github архив opensearch-connector-for-apache-kafka-3.1.1 и распаковав его в папку jars, я настроила следующую конфигурацию коннектора:
Отправка коннектора с этой конфигурацией в Kafka Connect уже не выдает ошибок, а регистрирует коннектор и успешно запускает коннектор, что сразу видно в GUI.
Поскольку в топике ishop-product-product уже есть данные в виде JSON-документов, полученные с помощью JDBC-коннектора из PostgreSQL, они сразу же передаются в Elasticsearch. Посмотреть это можно в интерфейсе сервиса Bonsai, отправив POST-запрос к автоматически созданному индексу в API поиска Elasticsearch:
Таким образом, при использовании собственного развертывания Kafka Connect, надо очень тщательно обращать внимание на варианты коннекторов и их совместимость с разными версиями систем-источников и приемников.
Освойте администрирование и эксплуатацию Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Статья:
Курсы:
Наш сайт:
Копирование, размножение, распространение, перепечатка (целиком или частично), или иное использование материала допускается только с письменного разрешения правообладателя ООО "УЦ Коммерсант"