Как выполнить запланированную задачу в Keycloak при запуске
10 февраля 2023 г.Keycloak и Kafka — две популярные технологии, которые широко используются в современных микросервисных архитектурах. Keycloak предоставляет централизованное решение для аутентификации и авторизации, а Kafka — это масштабируемая и высокопроизводительная система очередей сообщений.
В этой статье мы рассмотрим, как выполнить запланированную задачу в Keycloak при запуске на примере потребителя Kafka. Комбинируя эти две технологии, вы можете создать безопасную и масштабируемую микросервисную архитектуру, способную обрабатывать большие объемы данных.
Одной из возможностей Keycloak является выполнение запланированных задач, которые могут выполнять различные действия, такие как синхронизация данных, задачи обслуживания и многие другие. Keycloak позволяет создавать пользовательских поставщиков для расширения его функциональности. В нашем случае мы создадим собственный провайдер для запуска потребителя Kafka в качестве запланированной задачи.
ProviderEvents и PostMigrationEvent
API Keycloak предоставляет события, которые можно использовать для запуска запланированной задачи.
* ProviderEvents — это событие общего назначения, которое может запускаться на различных этапах жизненного цикла Keycloak, например при запуске, после проверки подлинности и после авторизации. Однако важно отметить, что ProviderEvents не поддерживается EventListenerProvider, который представляет собой компонент Keycloak, используемый для прослушивания событий.
* PostMigrationEvent является расширением ProviderEvents. PostMigrationEvent запускается после того, как Keycloak завершит миграцию базы данных. Это событие можно использовать для выполнения любых необходимых задач после миграции, таких как очистка старых данных или обновление индексов. Используя любое из этих событий, вы можете запланировать автоматический запуск задач в определенные моменты жизненного цикла Keycloak, что позволит вам выполнять действия, необходимые для вашего приложения.
Шаги для выполнения запланированной задачи в Keycloak при запуске
Создайте фиктивный EventListenerProvider
Этот класс будет просто заглушкой, поскольку EventListenerProvider не позволяет вам подписываться на ProviderEvents.
public class KeycloakConsumerEventListener implements EventListenerProvider {
private static final Logger log = Logger.getLogger(KeycloakConsumerEventListener.class);
public KeycloakConsumerEventListener(String topicKafka, Properties props) {
log.info("init custom event listener consumer");
}
@Override
public void onEvent(Event event) {
}
@Override
public void onEvent(AdminEvent adminEvent, boolean b) {
}
@Override
public void close() {
}
}
Создать EventListenerProviderFactory
Этот класс будет отвечать за создание экземпляров EventListenerProvider и запуск запланированных задач. В методе init мы определим конфиги для нашего потребителя.
public class KeycloakConsumerEventListenerFactory implements EventListenerProviderFactory {
private KeycloakConsumerEventListener keycloakConsumerEventListener;
private static final Logger log = Logger.getLogger(KeycloakConsumerEventListenerFactory.class);
private String topicKafka;
private String bootstrapServers;
private Properties properties;
private Consumer<String, String> consumer;
@Override
public EventListenerProvider create(KeycloakSession keycloakSession) {
if (keycloakConsumerEventListener == null) {
keycloakConsumerEventListener = new KeycloakConsumerEventListener(topicKafka, properties);
}
return keycloakConsumerEventListener;
}
@Override
public void init(Config.Scope scope) {
log.info("Init kafka consumer");
bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
topicKafka = System.getenv("KAFKA_TOPIC");
if (topicKafka == null || topicKafka.isEmpty()) {
throw new NullPointerException("topic is required.");
}
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
throw new NullPointerException("bootstrapServers are required");
}
properties = getProperties();
}
@Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
// define latter
}
@Override
public void close() {
}
@Override
public String getId() {
return "kafka-event-consumer";
}
private Properties getProperties() {
Properties propsKafka = new Properties();
propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
propsKafka.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsKafka.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsKafka.put(ConsumerConfig.GROUP_ID_CONFIG, "KeycloakKafkaExampleConsumer");
return propsKafka;
}
}
Переопределить метод postInit Factory
В методе postInit инициализируйте потребителя Kafka и запустите запланированную задачу с помощью PostMigrationEvent. Это гарантирует, что запланированная задача будет выполнена при запуске после того, как Keycloak завершит миграцию базы данных.
@Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
keycloakSessionFactory.register(event -> {
if (event instanceof PostMigrationEvent) {
log.info("Init kafka consumer task");
if (consumer == null) {
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topicKafka));
}
KeycloakSession keycloakSession = keycloakSessionFactory.create();
TimerProviderFactory timerProviderFactory = (TimerProviderFactory) keycloakSessionFactory.getProviderFactory(TimerProvider.class);
//execute task every 10 seconds
timerProviderFactory.create(keycloakSession)
.scheduleTask(this::processRecords, 10000, "kafka-consumer");
}
});
}
private void processRecords(KeycloakSession session) {
log.debug("task execute");
final ConsumerRecords<String, String> consumerRecords =
this.consumer.poll(1000);
try {
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%s, %s, %d, %d)n",
record.key(), record.value(),
record.partition(), record.offset());
});
} finally {
consumer.commitAsync();
}
}
Заключение
Keycloak предоставляет мощный и гибкий способ запуска запланированных задач, которые могут выполнять различные действия, такие как синхронизация данных, задачи обслуживания и многие другие. Объединив его с Kafka, вы можете создать безопасную и масштабируемую микросервисную архитектуру, способную обрабатывать большие объемы данных. Следуя шагам, описанным в этой статье, вы можете выполнить запланированную задачу в Keycloak при запуске, используя в качестве примера потребителя Kafka. Это всего лишь один пример того, как вы можете расширить функциональность Keycloak, и те же шаги можно применить и для реализации других запланированных задач.
Оригинал