Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Handling task retries in task SDK + execution API #45106

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Dec 20, 2024

closes: #44351

"Retries" are majorly handled in airflow 2.x in here: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L3082-L3101.

The idea here is that in case a task is retry able, defined by https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L1054-L1073, the task is marked as "up_for_retry". Rest of the part is taken care by the scheduler loop normally if the ti state is marked correctly.

Coming to task sdk, we cannot perform validations such as https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L1054-L1073 in the task runner / sdk side because we do not have/ should not have access to the database.

image (7)

We can use the above state change diagram and handle the retry state while handling failed state. Instead of having API handler and states for "up_for_retry", we can handle it when we are handling failures - which we do by calling the https://github.com/apache/airflow/blob/main/airflow/api_fastapi/execution_api/routes/task_instances.py#L160-L212 API endpoint. If we can send in enough data to the api handler in the execution API, we should be able to handle the cases of retry well.

What needs to be done for porting this to task_sdk?

  1. Defining "try_number", "max_retries" for task instances ---> not needed because this is handled already in the scheduler side of things / parsing time and not at execution time, so we do not need to handle it. It is handled here https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L1445-L1471 when a dag run is created and it is initialised with the initial values: max_tries(https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L1809) and try_number(https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L1808)

  2. We need to have a mechanism that can send a signal from the task runner if retries are defined. We will send this in this fashion:
    task runner informs the supervisor while failing that it needs to retry -> supervisor sends a normal request to the client (but with task_retries defined) -> client sends a normal API request (TITerminalStatePayload) to the execution API but with task_retries

  3. At the execution API, we receive the request and perform a check to check if the Ti is eligible for retry, if it is, we mark it as "up_for_retry", the rest of things are taken care by the scheduler.

Testing results

Right now the PR is meant to handle BaseException -- will extend to all other eligible TI exceptions in follow ups.

Scenario 1: With retries = 3 defined.

DAG:

import sys
from time import sleep

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.exceptions import AirflowTaskTimeout


def print_hello():
    1//0

with DAG(
    dag_id="abcd",
    schedule=None,
    catchup=False,
    tags=["demo"],
) as dag:
    hello_task = PythonOperator(
        task_id="say_hello",
        python_callable=print_hello,
        retries=3
    )

Rightly marked as "up_for_retry"
image (3)

TI details with max_tries
image (4)

Try number in grid view
image (5)

Scenario 2: With retries not defined.

DAG:

import sys
from time import sleep

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.exceptions import AirflowTaskTimeout


def print_hello():
    1//0

with DAG(
    dag_id="abcd",
    schedule=None,
    catchup=False,
    tags=["demo"],
) as dag:
    hello_task = PythonOperator(
        task_id="say_hello",
        python_callable=print_hello,
    )

Rightly marked as "failed"
image

Ti detiails with 0 max_tries:
image

Try number in grid view
image

============

Pending:

  • UT coverage for execution API for various scenarios
  • UT coverage for supervisor and task_runner, client
  • Extending to various other scenarios when retry is needed -- eg: AirflowTaskTimeout / AirflowException

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh requested review from kaxil and ashb and removed request for ephraimbuddy and pierrejeambrun December 20, 2024 10:26
@amoghrajesh amoghrajesh added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Dec 20, 2024
@amoghrajesh
Copy link
Contributor Author

If we agree on the approach, I will work on the tests.

@amoghrajesh amoghrajesh self-assigned this Dec 23, 2024
@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Dec 23, 2024

It probably might also be a good idea to de couple the entire payload construction out of TaskState. We might need to handle future cases where "fail" ti state has specific attributes like fail_stop for example.

Somewhat like:

class TaskState(BaseModel):
    """
    Update a task's state.

    If a process exits without sending one of these the state will be derived from the exit code:
    - 0 = SUCCESS
    - anything else = FAILED
    """

    state: TerminalTIState
    end_date: datetime | None = None
    type: Literal["TaskState"] = "TaskState"

class FailTask(BaseModel):
    """
    Update a task's state to failed. Inherits TaskState to be able to define attributes specific to
    failure state.
    """

    state: TerminalTIState.FAILED
    task_retries: int | None = None
    fail_stop: bool = False
    type: Literal["FailTask"] = "FailTask"

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove task_retries from the payloads

@@ -59,6 +59,8 @@ class TITerminalStatePayload(BaseModel):
end_date: UtcDateTime
"""When the task completed executing"""

task_retries: int | None = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that this shows up in the API makes it too confusing to client authors, so lets work out a way to remove this

@@ -359,3 +364,23 @@ def ti_put_rtif(
_update_rtif(task_instance, put_rtif_payload, session)

return {"message": "Rendered task instance fields successfully set"}


def _is_eligible_to_retry(task_instance: TI, task_retries: int | None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things;

  1. This should probably be a method on TI. It might already exist as a method actually
  2. I don't grok what we are doing with the task_retries parameter -- why do we need to pass it at all? When is task_instance.try_number <= task_instance.max_tries not right/good enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1., there isn't a method on TI for this, I agree there should be one. Right now we have this https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L1054-L1073

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding 2., looking at the code, I think its just safe coding.

We do this:

def _is_eligible_to_retry(*, task_instance: TaskInstance):
    """
    Is task instance is eligible for retry.

    :param task_instance: the task instance

    :meta private:
    """
    if task_instance.state == TaskInstanceState.RESTARTING:
        # If a task is cleared when running, it goes into RESTARTING state and is always
        # eligible for retry
        return True
    if not getattr(task_instance, "task", None):
        # Couldn't load the task, don't know number of retries, guess:
        return task_instance.try_number <= task_instance.max_tries

    if TYPE_CHECKING:
        assert task_instance.task

    return task_instance.task.retries and task_instance.try_number <= task_instance.max_tries

So I think we check in the last line, if there are retries defined and task_instance.try_number <= task_instance.max_tries is True, then retry. The reason is probably because retry can be defined as NONE.

retries: int | None = DEFAULT_RETRIES,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this though? That's what I don't get? With the advent of DAG versioning I don't think we can really get in the state where we can't find a task for a TI. And if we do: lets just error, if we can't find it then we can't re-run it really anyway.

Comment on lines +523 to +524
print("The exit code is", self._exit_code)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is a debug print left over

Suggested change
print("The exit code is", self._exit_code)

Comment on lines +721 to +728
if msg.task_retries:
self.client.task_instances.finish(
id=self.id,
state=self.final_state,
when=datetime.now(tz=timezone.utc),
task_retries=msg.task_retries,
)
self._should_retry = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What triggers this path? I don't really think this is needed/I can't work out when it's not always triggered. This feels like something we should handle only on the server side, and not include this extra info at all from the client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this in the task runner:

        msg = TaskState(state=TerminalTIState.FAILED, end_date=datetime.now(tz=timezone.utc))
        if not getattr(ti, "task", None):
            # We do not know about retries, let's mark it -1, so that the execution api can make a guess
            msg.task_retries = -1
        else:
            # `None` indicates no retries provided, the default is anyway 0 which evaluates to false
            msg.task_retries = ti.task.retries or None

The task runner sets the value to -1 if it didn't have a "task", asking the server to guess it, if it sets a non None, it wants to retry.

Now when the request comes to supervisor, this part comes in:

            if msg.task_retries:
                self.client.task_instances.finish(
                    id=self.id,
                    state=self.final_state,
                    when=datetime.now(tz=timezone.utc),
                    task_retries=msg.task_retries,
                )
                self._should_retry = True

This simply means, if I HAVE to retry, let me send a finish call, but with retries defined. Then we set a variable self._should_retry to indicate that retry request has been sent, so that we dont send it in wait(), like this:

        if self.final_state in TerminalTIState and not self._should_retry:
            self.client.task_instances.finish(
                id=self.id, state=self.final_state, when=datetime.now(tz=timezone.utc), task_retries=None
            )

task_id="test_ti_update_state_to_retry_when_restarting",
state=State.RESTARTING,
)
session.commit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does nothing I think, or at the very least you should pass session on to create_task_instance too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The create_task_instancecreates a ti and returns it, which we commit right after, no need to pass session, all the tests in the file have same pattern and here is the ti with the state being set, running on debug mode

image

json={
"state": State.FAILED,
"end_date": DEFAULT_END_DATE.isoformat(),
"task_retries": retries,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this feels like a very leaky abstraction, lets remove it, and if we want a "fail and don't retry" lets have that be something else rather than overload task_retires in a non-obvious manner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handle Task retries in Task SDK
2 participants