Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50648][CORE] when the job is cancelled during shuffle retry in parent stage, might leave behind zombie running tasks #49270

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

yabola
Copy link
Contributor

@yabola yabola commented Dec 23, 2024

What changes were proposed in this pull request?

This is a problem that Spark always had. See the following section for the scenario when the problem occurs.

When cancel a job, some tasks may be still running.
The reason is that when DAGScheduler#handleTaskCompletion encounters FetchFailed, markStageAsFinished will be called to remove the stage in DAGScheduler#runningStages (see

markStageAsFinished(failedStage, errorMessage = Some(failureMessage),
) and don't killAllTaskAttempts.
But DAGScheduler#cancelRunningIndependentStages only find runningStages, this will leave zombie shuffle tasks, occupying cluster resources.

Why are the changes needed?

Assume a job is stage1-> stage2, when FetchFailed occurs during the stage 2, the stage1 and stage2 will resubmit (stage2 may still have some tasks running even if stage2 is resubmitted , this is as expected, because these tasks may eventually succeed and avoid retry)

But during the execution of the stage1-retry , if the SQL is canceled, the tasks in stage1 and stage1-retry can all be killed, but the tasks previously running in stage2 are still running and can't be killed. These tasks can greatly affect cluster stability and occupy resources.

Does this PR introduce any user-facing change?

No

Was this patch authored or co-authored using generative AI tooling?

No

…n parent stage, might leave behind zombie tasks
@github-actions github-actions bot added the CORE label Dec 23, 2024
@yabola yabola changed the title [SPARK-50648] [core] when the job is cancelled during shuffle retry in parent stage, might leave behind zombie tasks [SPARK-50648] [core] when the job is cancelled during shuffle retry in parent stage, might leave behind zombie running tasks Dec 23, 2024
@HyukjinKwon HyukjinKwon changed the title [SPARK-50648] [core] when the job is cancelled during shuffle retry in parent stage, might leave behind zombie running tasks [SPARK-50648][CORE] when the job is cancelled during shuffle retry in parent stage, might leave behind zombie running tasks Dec 24, 2024
@wangyum
Copy link
Member

wangyum commented Dec 24, 2024

cc @cloud-fan

@@ -2937,7 +2937,9 @@ private[spark] class DAGScheduler(
} else {
// This stage is only used by the job, so finish the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
val isRunningStage = runningStages.contains(stage) ||
(waitingStages.contains(stage) && taskScheduler.hasRunningTasks(stageId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we just kill all waiting stages? Does taskScheduler.killAllTaskAttempts handle it well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and can we add a special flag to indicate the waiting stages that are submitted due to retry?

Copy link
Contributor Author

@yabola yabola Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we just kill all waiting stages? Does taskScheduler.killAllTaskAttempts handle it well?

I tested: if the normally generated waiting stages call killAllTaskAttempts, the stage status will be displayed as FAILED, which was SKIPPED before, killAllTaskAttempts itself will not go wrong.
Always kill waiting stage seems to be a safer approach (tasks really shouldn't be run anymore), but it may generate unnecessary stageFailed events compared with before.

and can we add a special flag to indicate the waiting stages that are submitted due to retry?

Yes, we can add a flag , please see the update codes.
Actually , there is a trade-off here to kill waiting stages, the range of choices from large to small :

  • kill all waiting stage
  • kill waiting stage when had failed ( stage#failedAttemptIds > 0)
  • kill waiting stage when retry in fetch failed (stage#resubmitInFetchFailed)
  • kill waiting stage which only has running tasks (this might not be enough? )

@cloud-fan
Copy link
Contributor

This is a good catch! cc @jiangxb1987 @Ngone51

@github-actions github-actions bot added the SQL label Dec 24, 2024
@@ -2248,7 +2250,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
// original result task 1.0 succeed
runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, 42))
sc.listenerBus.waitUntilEmpty()
assert(completedStage === List(0, 1, 1, 0))
assert(completedStage === List(0, 1, 1, 0, 1))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last one added is the resubmit stage (FetchFailed) and in waiting stages. We will kill it and one more SparkListenerStageCompleted event will be added ( see markStageAsFinished)

Comment on lines 2190 to 2191
failedStage.markResubmitInFetchFailed()
mapStage.markResubmitInFetchFailed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: markResubmitInFetchFailed -> setResubmitByFetchFailure.

Also, please reset this to false - when stage transitions to running - so that scheduler state is consistent.
submitMissingTasks preferably, or in submitStage

Copy link
Contributor Author

@yabola yabola Dec 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, this can make the variable meaning more accurate.
In addition, when submittingMissingTasks, the stage will be added to the runningStages, we can always cancel job normally.

@@ -121,4 +121,6 @@ private[spark] trait TaskScheduler {
*/
def applicationAttemptId(): Option[String]


def hasRunningTasks(stageId: Int): Boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this method ?

Copy link
Contributor Author

@yabola yabola Dec 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see this
This is the code I wrote at the beginning (kill waiting stage which only has running tasks).
I'm not sure which way is better and I will delete this if we choose one.

@@ -2937,7 +2938,9 @@ private[spark] class DAGScheduler(
} else {
// This stage is only used by the job, so finish the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
val shouldKill = runningStages.contains(stage) ||
(waitingStages.contains(stage) && stage.resubmitInFetchFailed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we check the failedAttemptIds instead?

Suggested change
(waitingStages.contains(stage) && stage.resubmitInFetchFailed)
stage.failedAttemptIds.nonEmpty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can Please see this
I'm not sure which way is better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants