Как выполнить запланированную задачу в Keycloak при запуске

Как выполнить запланированную задачу в Keycloak при запуске

10 февраля 2023 г.

Keycloak и Kafka — две популярные технологии, которые широко используются в современных микросервисных архитектурах. Keycloak предоставляет централизованное решение для аутентификации и авторизации, а Kafka — это масштабируемая и высокопроизводительная система очередей сообщений.

В этой статье мы рассмотрим, как выполнить запланированную задачу в Keycloak при запуске на примере потребителя Kafka. Комбинируя эти две технологии, вы можете создать безопасную и масштабируемую микросервисную архитектуру, способную обрабатывать большие объемы данных.

Одной из возможностей Keycloak является выполнение запланированных задач, которые могут выполнять различные действия, такие как синхронизация данных, задачи обслуживания и многие другие. Keycloak позволяет создавать пользовательских поставщиков для расширения его функциональности. В нашем случае мы создадим собственный провайдер для запуска потребителя Kafka в качестве запланированной задачи.

ProviderEvents и PostMigrationEvent

API Keycloak предоставляет события, которые можно использовать для запуска запланированной задачи.

* ProviderEvents — это событие общего назначения, которое может запускаться на различных этапах жизненного цикла Keycloak, например при запуске, после проверки подлинности и после авторизации. Однако важно отметить, что ProviderEvents не поддерживается EventListenerProvider, который представляет собой компонент Keycloak, используемый для прослушивания событий.

* PostMigrationEvent является расширением ProviderEvents. PostMigrationEvent запускается после того, как Keycloak завершит миграцию базы данных. Это событие можно использовать для выполнения любых необходимых задач после миграции, таких как очистка старых данных или обновление индексов. Используя любое из этих событий, вы можете запланировать автоматический запуск задач в определенные моменты жизненного цикла Keycloak, что позволит вам выполнять действия, необходимые для вашего приложения.

Шаги для выполнения запланированной задачи в Keycloak при запуске

Создайте фиктивный EventListenerProvider

Этот класс будет просто заглушкой, поскольку EventListenerProvider не позволяет вам подписываться на ProviderEvents.

public class KeycloakConsumerEventListener implements EventListenerProvider {

    private static final Logger log = Logger.getLogger(KeycloakConsumerEventListener.class);


    public KeycloakConsumerEventListener(String topicKafka, Properties props) {
        log.info("init custom event listener consumer");
    }



    @Override
    public void onEvent(Event event) {

    }

    @Override
    public void onEvent(AdminEvent adminEvent, boolean b) {
    }

    @Override
    public void close() {

    }
}

Создать EventListenerProviderFactory

Этот класс будет отвечать за создание экземпляров EventListenerProvider и запуск запланированных задач. В методе init мы определим конфиги для нашего потребителя.

public class KeycloakConsumerEventListenerFactory implements EventListenerProviderFactory {
    private KeycloakConsumerEventListener keycloakConsumerEventListener;

    private static final Logger log = Logger.getLogger(KeycloakConsumerEventListenerFactory.class);

    private String topicKafka;
    private String bootstrapServers;

    private Properties properties;

    private Consumer<String, String> consumer;

    @Override
    public EventListenerProvider create(KeycloakSession keycloakSession) {
        if (keycloakConsumerEventListener == null) {
            keycloakConsumerEventListener = new KeycloakConsumerEventListener(topicKafka, properties);
        }
        return keycloakConsumerEventListener;
    }

    @Override
    public void init(Config.Scope scope) {
        log.info("Init kafka consumer");


        bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");


        topicKafka = System.getenv("KAFKA_TOPIC");

        if (topicKafka == null || topicKafka.isEmpty()) {
            throw new NullPointerException("topic is required.");
        }
        if (bootstrapServers == null || bootstrapServers.isEmpty()) {
            throw new NullPointerException("bootstrapServers are required");
        }

        properties = getProperties();

    }

    @Override
    public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
        // define latter
    }

    @Override
    public void close() {

    }

    @Override
    public String getId() {
        return "kafka-event-consumer";
    }

    private Properties getProperties() {
        Properties propsKafka = new Properties();
        propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        propsKafka.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsKafka.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsKafka.put(ConsumerConfig.GROUP_ID_CONFIG, "KeycloakKafkaExampleConsumer");


        return propsKafka;
    }
}

Переопределить метод postInit Factory

В методе postInit инициализируйте потребителя Kafka и запустите запланированную задачу с помощью PostMigrationEvent. Это гарантирует, что запланированная задача будет выполнена при запуске после того, как Keycloak завершит миграцию базы данных.

@Override
    public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
        keycloakSessionFactory.register(event -> {
            if (event instanceof PostMigrationEvent) {
                log.info("Init kafka consumer task");

                if (consumer == null) {
                    consumer = new KafkaConsumer<>(properties);

                    consumer.subscribe(Collections.singletonList(topicKafka));
                }

                KeycloakSession keycloakSession = keycloakSessionFactory.create();

                TimerProviderFactory timerProviderFactory = (TimerProviderFactory) keycloakSessionFactory.getProviderFactory(TimerProvider.class);
                //execute task every 10 seconds
                timerProviderFactory.create(keycloakSession)
                        .scheduleTask(this::processRecords, 10000, "kafka-consumer");
            }
        });
    }

    private void processRecords(KeycloakSession session) {
        log.debug("task execute");
        final ConsumerRecords<String, String> consumerRecords =
                this.consumer.poll(1000);

        try {
            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%s, %s, %d, %d)n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });
        } finally {
            consumer.commitAsync();
        }
    }

Заключение

Keycloak предоставляет мощный и гибкий способ запуска запланированных задач, которые могут выполнять различные действия, такие как синхронизация данных, задачи обслуживания и многие другие. Объединив его с Kafka, вы можете создать безопасную и масштабируемую микросервисную архитектуру, способную обрабатывать большие объемы данных. Следуя шагам, описанным в этой статье, вы можете выполнить запланированную задачу в Keycloak при запуске, используя в качестве примера потребителя Kafka. Это всего лишь один пример того, как вы можете расширить функциональность Keycloak, и те же шаги можно применить и для реализации других запланированных задач.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE