diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx b/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx index 22462c6022c86..9d8e3ed403bdb 100644 --- a/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx @@ -17,6 +17,7 @@ * under the License. */ import { ReactFlowProvider } from "@xyflow/react"; +import { useState } from "react"; import { FiBarChart, FiCode } from "react-icons/fi"; import { LuChartColumn } from "react-icons/lu"; import { MdDetails, MdOutlineEventNote } from "react-icons/md"; @@ -27,6 +28,7 @@ import { useDagServiceGetDagDetails, useDagsServiceRecentDagRuns } from "openapi import type { DAGWithLatestDagRunsResponse } from "openapi/requests/types.gen"; import { TaskIcon } from "src/assets/TaskIcon"; import { DetailsLayout } from "src/layouts/Details/DetailsLayout"; +import { useRefreshOnNewDagRuns } from "src/queries/useRefreshOnNewDagRuns"; import { isStatePending, useAutoRefresh } from "src/utils"; import { Header } from "./Header"; @@ -53,6 +55,7 @@ export const Dag = () => { }); const refetchInterval = useAutoRefresh({ dagId }); + const [hasPendingRuns, setHasPendingRuns] = useState(false); // TODO: replace with with a list dag runs by dag id request const { @@ -61,14 +64,20 @@ export const Dag = () => { isLoading: isLoadingRuns, } = useDagsServiceRecentDagRuns({ dagIds: [dagId], dagRunsLimit: 1 }, undefined, { enabled: Boolean(dagId), - refetchInterval: (query) => - query.state.data?.dags - .find((recentDag) => recentDag.dag_id === dagId) - ?.latest_dag_runs.some((run) => isStatePending(run.state)) - ? refetchInterval - : false, + refetchInterval: (query) => { + setHasPendingRuns( + query.state.data?.dags + .find((recentDag) => recentDag.dag_id === dagId) + ?.latest_dag_runs.some((run) => isStatePending(run.state)), + ); + + return hasPendingRuns ? refetchInterval : false; + }, }); + // Ensures continuous refresh to detect new runs when there's no pending state and new runs are initiated from other page + useRefreshOnNewDagRuns(dagId, hasPendingRuns); + let dagWithRuns = runsData?.dags.find((recentDag) => recentDag.dag_id === dagId); if (dagWithRuns === undefined && dag !== undefined) { diff --git a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts new file mode 100644 index 0000000000000..4c792993da5b9 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts @@ -0,0 +1,66 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useEffect, useRef } from "react"; + +import { + useDagRunServiceGetDagRuns, + useDagServiceGetDagDetailsKey, + UseDagRunServiceGetDagRunsKeyFn, + UseDagServiceGetDagDetailsKeyFn, + useDagsServiceRecentDagRunsKey, + UseGridServiceGridDataKeyFn, + UseTaskInstanceServiceGetTaskInstancesKeyFn, +} from "openapi/queries"; + +import { useConfig } from "./useConfig"; + +export const useRefreshOnNewDagRuns = (dagId: string, hasPendingRuns: boolean | undefined) => { + const queryClient = useQueryClient(); + const previousDagRunIdRef = useRef(); + const autoRefreshInterval = useConfig("auto_refresh_interval") as number; + + const { data } = useDagRunServiceGetDagRuns({ dagId, limit: 1, orderBy: "-run_after" }, undefined, { + enabled: Boolean(dagId) && !hasPendingRuns, + refetchInterval: Boolean(autoRefreshInterval) ? autoRefreshInterval * 1000 : 5000, + }); + + useEffect(() => { + const latestDagRun = data?.dag_runs[0]; + + const latestDagRunId = latestDagRun?.dag_run_id; + + if ((latestDagRunId ?? "") && previousDagRunIdRef.current !== latestDagRunId) { + previousDagRunIdRef.current = latestDagRunId; + + const queryKeys = [ + [useDagsServiceRecentDagRunsKey], + [useDagServiceGetDagDetailsKey], + UseDagServiceGetDagDetailsKeyFn({ dagId }, [{ dagId }]), + UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]), + UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{ dagId, dagRunId: "~" }]), + UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]), + ]; + + queryKeys.forEach((key) => { + void queryClient.invalidateQueries({ queryKey: key }); + }); + } + }, [data, dagId, queryClient]); +};