Custom Kafka Deserializer и Spring’овый контекст. Как инжектить в статические поля

Привет, меня зовут Николай Пискунов, я ведущий разработчик в подразделении Big Data. Мы в beeline cloud регулярно сталкиваемся с задачами, требующими кастомных решений, поэтому сегодня расскажу, как инжектить в статические поля. Как всегда — на примерах. Поехали.

Custom Kafka Deserializer и Spring’овый контекст. Как инжектить в статические поля

На практике десериализаторов, представленных в ядре Spring, хватает в 99% случаев. Но бывают ситуации, когда всё же требуется описать свою логику предобработки входящего сообщения.

Для наглядности предлагаю рассмотреть гипотетический случай, когда на вход Kafka подается строка лога с задаваемым разделителем. В этой строке нас интересует лишь часть сообщения: мы будем делить строку по определенному символу и собирать из получившегося массива новую строку.

Задавать разделитель и индексы в файле application.properties мы планируем так:

kafka.message.deserializer.delimiter=, kafka.message.deserializer.indexes=0,2,3,5

Сразу скажу, что org.apache.kafka.common.serialization.Deserializer и все его реализации не знают о спринговом контексте ничего — от слова совсем. Даже если попытаться сделать из него бин, пометив соответствующей аннотацией, например @Component.

Для этого напишем сам Deserializer и попробуем прямо в него заинжектить требуемые настройки, используя спринговую аннотацию @Value:

@Slf4j @Component public class SmartStringDeserializer implements Deserializer<String> { @Value("${kafka.message.deserializer.delimiter}") String delimiter; @Value("${kafka.message.deserializer.indexes}") List<Integer> indexes; @Override public void configure(Map<String, ?> configs, boolean isKey) { Deserializer.super.configure(configs, isKey); } @Override public String deserialize(String s, byte[] bytes) { return setValueByIndex(bytes); } @Override public void close() { Deserializer.super.close(); } private String setValueByIndex(byte[] bytes) { String msg = new String(bytes, StandardCharsets.UTF_8); LOGGER.debug("Income message: {}", msg); if (indexes.getFirst() == -1) { return msg; } String[] incomeMessage = msg.split(delimiter); String formattedIncomeMessage = indexes.stream() .map(index -> incomeMessage[index]) .collect(Collectors.joining(delimiter)); LOGGER.debug("Consumed message: {}", formattedIncomeMessage); return formattedIncomeMessage; } }

К сожалению, нам вернется null при попытке получить значения:

Custom Kafka Deserializer и Spring’овый контекст. Как инжектить в статические поля

А если мы проставим стандартные значения в @Values, результат будет таким же:

@Value("${kafka.message.deserializer.delimiter:,}")

Но можно использовать статические поля, описав их в соседнем классе. Вот только надо придумать, как в них заинжектить данные из спрингового контекста. Конечно, это должен быть спринговый бин, например @Component, и в нем мы определим статические поля, включая поля, которые будут заполняться данными из properties:

@Component public class DeserializerUtils { public static String DELIMITER; public static List<Integer> INDEXES; private String delimiter; private List<Integer> indexes; … }

Теперь напишем сеттеры, они-то и будут присваивать значения статическим полям:

@Component public class DeserializerUtils { public static String DELIMITER; public static List<Integer> INDEXES; private String delimiter; private List<Integer> indexes; @Value("${kafka.message.deserializer.delimiter:,}") private void setDelimiter(String delimiter){ DeserializerUtils.DELIMITER = delimiter; } @Value("${kafka.message.deserializer.indexes:-1}") private void setIndexes(List<Integer> indexes){ DeserializerUtils.INDEXES = indexes; } }

Важно помнить, что @Values необходимо навешивать над методами, иначе не заработает. И, если вы решите обновить контекст без перезапуска приложения, например, поправив значения в файле конфигураций, обновятся и статические поля.

Теперь перепишем сам десериалайзер:

@Slf4j public class SmartStringDeserializer implements Deserializer<String> { @Override public void configure(Map<String, ?> configs, boolean isKey) { Deserializer.super.configure(configs, isKey); } @Override public String deserialize(String s, byte[] bytes) { return setValueByIndex(bytes); } @Override public void close() { Deserializer.super.close(); } private String setValueByIndex(byte[] bytes) { String delimiter = DeserializerUtils.DELIMITER; List<Integer> indexes = DeserializerUtils.INDEXES; String msg = new String(bytes, StandardCharsets.UTF_8); LOGGER.debug("Income message: {}", msg); if (indexes.getFirst() == -1) { return msg; } String[] incomeMessage = msg.split(delimiter); String formattedIncomeMessage = indexes.stream() .map(index -> incomeMessage[index]) .collect(Collectors.joining(delimiter)); LOGGER.debug("Consumed message: {}", formattedIncomeMessage); return formattedIncomeMessage; } }

А вот теперь мы можем конфигурировать Deserializer, используя спринговый контекст:

Custom Kafka Deserializer и Spring’овый контекст. Как инжектить в статические поля

У такого подхода есть свои минусы. По факту у нас дублируются данные в хипе, которые не удалит GC ввиду того, что первые хранятся в контексте спринга, вторые — в статических полях. Поэтому старайтесь инжектить конкретные бины и не делайте так:

public static ApplicationContext CONTEXT; private ApplicationContext context; private void setIndexes(ApplicationContext context){ DeserializerUtils.CONTEXT = context; }

beeline cloud — secure cloud provider.

Разрабатываем облачные решения, чтобы вы предоставляли клиентам лучшие сервисы.

Начать дискуссию