Как отслеживать статус задачи YARN в DolphinScheduler

Как отслеживать статус задачи YARN в DolphinScheduler

3 ноября 2024 г.

Фон

В DolphinScheduler для задач YARN, таких как MapReduce (MR), Spark, Flink и даже Shell, первоначальный подход заключался в определении статуса задачи на основе идентификатора приложения при обнаружении задачи YARN. Это означает, что вместо того, чтобы полагаться исключительно на статус клиентского процесса, DolphinScheduler также учитывал статус YARN для принятия решения о состоянии задачи.

Позднее сообщество переработало этот процесс (что стало шагом в правильном направлении, но все еще не завершено), что привело к некоторым проблемам.

Например, в режиме Flink Stream Application, где клиент отсоединен, клиентская оболочка немедленно завершает работу, заставляя DolphinScheduler помечать задачу как успешную. Однако задача на YARN все еще выполняется, и DolphinScheduler больше не может отслеживать ее статус на YARN.

Итак, как мы можем реализовать отслеживание статуса задачи YARN в DolphinScheduler?

Примечание: этот пример основан на версии 3.2.1.

Диаграмма взаимосвязи рабочих задач

Сначала давайте рассмотрим принцип взаимосвязи рабочих задач в DolphinScheduler.

  • AbstractTask: в основном определяет базовый интерфейс жизненного цикла задачи, такой как инициализация, обработка и отмена.

  • AbstractRemoteTask: реализует метод handle, демонстрируя шаблон проектирования метода шаблона и извлекая три основных метода интерфейса:submitApplication, trackApplicationStatus, иcancelApplication.

  • AbstractYarnTask: Для задач YARN,AbstractYarnTaskабстрагируется, иsubmitApplication, trackApplicationStatus, иcancelApplicationпрямой доступ к API YARN.

Реализация отслеживания статуса YARN в AbstractYarnTask

TheAbstractYarnTaskможет реализовать отслеживание статуса YARN. Полный код см. вorg.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask:

public abstract class AbstractYarnTask extends AbstractRemoteTask {
private static final int MAX_RETRY_ATTEMPTS = 3;
    private ShellCommandExecutor shellCommandExecutor;
    public AbstractYarnTask(TaskExecutionContext taskRequest) {
        super(taskRequest);
        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest);
    }
    @Override
    public void submitApplication() throws TaskException {
        try {
            IShellInterceptorBuilder shellActuatorBuilder =
                    ShellInterceptorBuilderFactory.newBuilder()
                            .properties(getProperties())
                            .appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator()));
            TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, null);
            setExitStatusCode(response.getExitStatusCode());
            setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
            setProcessId(response.getProcessId());
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            log.info("The current yarn task has been interrupted", ex);
            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
            throw new TaskException("The current yarn task has been interrupted", ex);
        } catch (Exception e) {
            log.error("yarn process failure", e);
            exitStatusCode = -1;
            throw new TaskException("Execute task failed", e);
        }
    }
    @Override
    public void trackApplicationStatus() throws TaskException {
        if (StringUtils.isEmpty(appIds)) {
            return;
        }
        List<String> appIdList = Arrays.asList(appIds.split(","));
        boolean continueTracking = true;
        while (continueTracking) {
            Map<String, YarnState> yarnStateMap = new HashMap<>();
            for (String appId : appIdList) {
                if (StringUtils.isEmpty(appId)) {
                    continue;
                }
                boolean hadoopSecurityAuthStartupState =
                        PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
                String yarnStateJson = fetchYarnStateJsonWithRetry(appId, hadoopSecurityAuthStartupState);
                if (StringUtils.isNotEmpty(yarnStateJson)) {
                    String appJson = JSONUtils.getNodeString(yarnStateJson, "app");
                    YarnTask yarnTask = JSONUtils.parseObject(appJson, YarnTask.class);
                    log.info("yarnTask : {}", yarnTask);
                    yarnStateMap.put(yarnTask.getId(), YarnState.of(yarnTask.getState()));
                }
            }
            YarnState yarnTaskOverallStatus = YarnTaskStatusChecker.getYarnTaskOverallStatus(yarnStateMap);
            if (yarnTaskOverallStatus.isFinalState()) {
                handleFinalState(yarnTaskOverallStatus);
                continueTracking = false;
            } else {
                try {
                    TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS * 10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
        }
    }
    private String fetchYarnStateJsonWithRetry(String appId,
                                               boolean hadoopSecurityAuthStartupState) throws TaskException {
        int retryCount = 0;
        while (retryCount < MAX_RETRY_ATTEMPTS) {
            try {
                return fetchYarnStateJson(appId, hadoopSecurityAuthStartupState);
            } catch (Exception e) {
                retryCount++;
                log.error("Failed to fetch or parse Yarn state for appId: {}. Attempt: {}/{}",
                        appId, retryCount, MAX_RETRY_ATTEMPTS, e);
                if (retryCount >= MAX_RETRY_ATTEMPTS) {
                    throw new TaskException("Failed to fetch Yarn state after "
                            + MAX_RETRY_ATTEMPTS + " attempts for appId: " + appId, e);
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(ie);
                }
            }
        }
        return null;
    }
    private void handleFinalState(YarnState yarnState) {
        switch (yarnState) {
            case FINISHED:
                setExitStatusCode(EXIT_CODE_SUCCESS);
                break;
            case KILLED:
                setExitStatusCode(EXIT_CODE_KILL);
                break;
            default:
                setExitStatusCode(EXIT_CODE_FAILURE);
                break;
        }
    }
    private String fetchYarnStateJson(String appId, boolean hadoopSecurityAuthStartupState) throws Exception {
        return hadoopSecurityAuthStartupState
                ? KerberosHttpClient.get(getApplicationUrl(appId))
                : HttpUtils.get(getApplicationUrl(appId));
    }
    static class YarnTaskStatusChecker {
        public static YarnState getYarnTaskOverallStatus(Map<String, YarnState> yarnTaskMap) {
            boolean hasKilled = yarnTaskMap.values().stream()
                    .anyMatch(state -> state == YarnState.KILLED);
            if (hasKilled) {
                return YarnState.KILLED;
            }
            boolean hasFailed = yarnTaskMap.values().stream()
                    .anyMatch(state -> state == YarnState.FAILED);
            if (hasFailed) {
                return YarnState.FAILED;
            }
            boolean allFINISHED = yarnTaskMap.values().stream()
                    .allMatch(state -> state == YarnState.FINISHED);
            if (allFINISHED) {
                return YarnState.FINISHED;
            }
            boolean hasRunning = yarnTaskMap.values().stream()
                    .anyMatch(state -> state == YarnState.RUNNING);
            if (hasRunning) {
                return YarnState.RUNNING;
            }
            boolean hasSubmitting = yarnTaskMap.values().stream()
                    .anyMatch(state -> state == YarnState.NEW || state == YarnState.NEW_SAVING
                            || state == YarnState.SUBMITTED || state == YarnState.ACCEPTED);
            if (hasSubmitting) {
                return YarnState.SUBMITTING;
            }
            return YarnState.UNKNOWN;
        }
    }
}

Здесь основная логика заключается в том, что вместо того, чтобы переопределятьhandleПри использовании метода напрямую задачам YARN необходимо реализовать только два основных интерфейса:submitApplicationиtrackApplicationStatus.cancelApplicationметод в идеале должен быть делегированYarnApplicationManager(в настоящее время эта интеграция отсутствует, но не влияет на функциональность).

Отображение ApplicationId для потоковых задач на фронтенде

Файл:dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts

Обертывание ApplicationId как URL-адреса YARN на бэкэнде

Файл:dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

Файл:dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

Файл:dolphinscheduler-common/src/main/resources/common.properties

Файл:dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java

Файл:dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageProperties.java

Окончательный вид пользовательского интерфейса:

Примечание: вам нужно будет вручную вставить URL-адрес; приведенный выше код не включает эту функцию.

Отслеживание проблем

Здесь есть проблема с состоянием. Существует три состояния: FINISHED, FAILED и KILLED. Однако в состоянии FINISHED есть также FinalStatus, и «завершено» не обязательно означает успех. В FINISHED на самом деле есть состояния SUCCEEDED, FAILED и KILLED. По сути, FINISHED не может рассматриваться как конечное состояние в DolphinScheduler, и требуется дополнительная оценка.

В коде дляorg.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask#handleFinalState:

private void handleFinalState(YarnState yarnState) {
    switch (yarnState) {
        case FINISHED:
            setExitStatusCode(EXIT_CODE_SUCCESS);
            break;
        case KILLED:
            setExitStatusCode(EXIT_CODE_KILL);
            break;
        default:
            setExitStatusCode(EXIT_CODE_FAILURE);
            break;
    }
}

Использование HTTP для завершения задачи

curl -X PUT -d '{"state":"KILLED"}' \
-H "Content-Type: application/json" \
http://xx.xx.xx.xx:8088/ws/v1/cluster/apps/application_1694766249884_1098/state?user.name=hdfs

Примечание: необходимо указатьuser.name, в противном случае задача может быть не завершена успешно.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE