Skip to content

Commit 3a113c2

Browse files
RCdeWitmarwan116
andauthored
Add target_job_queue to SubmitAnyscaleJob operator (#52)
Anyscale Job Queues is a new functionality that supports submitting multiple workloads to the same cluster. An Anyscale Customer requested this feature on the Airflow integration. [Job Queues](https://docs.anyscale.com/1.0.0/jobs/job-queues) And the [Job Queue Spec](https://docs.anyscale.com/canary/reference/job-api#jobqueuespec) Adds two optional parameters to submit to existing job queues. _Does not_ implement creation of new job queues. Closes: #44 --------- Co-authored-by: marwan116 <sarieddine.marwan@gmail.com>
1 parent 6bbb4c7 commit 3a113c2

File tree

4 files changed

+80
-3
lines changed

4 files changed

+80
-3
lines changed

anyscale_provider/operators/anyscale.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from airflow.models import BaseOperator
1010
from airflow.utils.context import Context
1111
from anyscale.compute_config.models import ComputeConfig
12-
from anyscale.job.models import JobConfig, JobState
12+
from anyscale.job.models import JobConfig, JobQueueConfig, JobState
1313
from anyscale.service.models import RayGCSExternalStorageConfig, ServiceConfig, ServiceState
1414

1515
from anyscale_provider.hooks.anyscale import AnyscaleHook
@@ -26,6 +26,7 @@ class SubmitAnyscaleJob(BaseOperator):
2626
2727
:param conn_id: Required. The connection ID for Anyscale.
2828
:param entrypoint: Required. Command that will be run to execute the job, e.g., `python main.py`.
29+
:param job_queue_config: Optional. Allow users to specify JobQueueConfig (more information: https://docs.anyscale.com/platform/jobs/job-queues/ and https://docs.anyscale.com/reference/job-api/).
2930
:param name: Optional. Name of the job. Multiple jobs can be submitted with the same name.
3031
:param image_uri: Optional. URI of an existing image. Exclusive with `containerfile`.
3132
:param containerfile: Optional. The file path to a containerfile that will be built into an image before running the workload. Exclusive with `image_uri`.
@@ -81,6 +82,7 @@ def __init__(
8182
wait_for_completion: bool = True,
8283
job_timeout_seconds: float = 3600,
8384
poll_interval: float = 60,
85+
job_queue_config: JobQueueConfig | None = None,
8486
*args: Any,
8587
**kwargs: Any,
8688
) -> None:
@@ -103,6 +105,7 @@ def __init__(
103105
self.wait_for_completion = wait_for_completion
104106
self.job_timeout_seconds = job_timeout_seconds
105107
self.poll_interval = poll_interval
108+
self.job_queue_config = job_queue_config
106109

107110
self.job_id: str | None = None
108111

@@ -148,6 +151,7 @@ def execute(self, context: Context) -> str:
148151
"cloud": self.cloud,
149152
"project": self.project,
150153
"max_retries": self.max_retries,
154+
"job_queue_config": self.job_queue_config,
151155
}
152156

153157
self.log.info(f"Using Anyscale version {anyscale.__version__}")

docs/index.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Hook
2222

2323
Operators
2424
~~~~~~~~~
25-
- **SubmitAnyscaleJob**: This operator submits a job to Anyscale. It takes configuration parameters for the job, such as the entrypoint, image URI, and compute configuration. The operator uses ``AnyscaleHook`` to handle the submission process.
25+
- **SubmitAnyscaleJob**: This operator submits a job to Anyscale. It takes configuration parameters for the job, such as the entrypoint, image URI, compute configuration, and job queue configuration. The operator uses ``AnyscaleHook`` to handle the submission process.
2626
- **RolloutAnyscaleService**: Similar to the job submission operator, this operator is designed to manage services on Anyscale. It can be used to deploy new services or update existing ones, leveraging ``AnyscaleHook`` for all interactions with the Anyscale API.
2727

2828
Triggers

example_dags/anyscale_job.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from airflow import DAG
55

66
from anyscale_provider.operators.anyscale import SubmitAnyscaleJob
7+
from anyscale.job.models import JobQueueConfig, JobQueueSpec, JobQueueExecutionMode
8+
79

810
default_args = {
911
"owner": "airflow",
@@ -44,6 +46,77 @@
4446
dag=dag,
4547
)
4648

49+
JOB_QUEUE_NAME = "test-job-queue-180s-idle-timeout"
50+
51+
submit_anyscale_job_with_new_job_queue = SubmitAnyscaleJob(
52+
task_id="submit_anyscale_job_with_new_job_queue",
53+
conn_id=ANYSCALE_CONN_ID,
54+
name="AstroJobWithJobQueue",
55+
image_uri="anyscale/image/airflow-integration-testing:1",
56+
compute_config="airflow-integration-testing:1",
57+
working_dir=str(FOLDER_PATH),
58+
entrypoint="python ray-job.py",
59+
max_retries=1,
60+
job_timeout_seconds=3000,
61+
poll_interval=30,
62+
dag=dag,
63+
job_queue_config=JobQueueConfig(
64+
priority=100,
65+
job_queue_spec=JobQueueSpec(
66+
name=JOB_QUEUE_NAME,
67+
execution_mode=JobQueueExecutionMode.PRIORITY,
68+
max_concurrency=5,
69+
idle_timeout_s=180,
70+
),
71+
),
72+
)
73+
74+
submit_anyscale_job_with_existing_job_queue = SubmitAnyscaleJob(
75+
task_id="submit_anyscale_job_with_existing_job_queue",
76+
conn_id=ANYSCALE_CONN_ID,
77+
name="AstroJobWithJobQueue",
78+
image_uri="anyscale/image/airflow-integration-testing:1",
79+
compute_config="airflow-integration-testing:1",
80+
working_dir=str(FOLDER_PATH),
81+
entrypoint="python ray-job.py",
82+
max_retries=1,
83+
job_timeout_seconds=3000,
84+
poll_interval=30,
85+
dag=dag,
86+
job_queue_config=JobQueueConfig(
87+
priority=100,
88+
# NOTE: This will reuse the existing job queue given it has the same spec
89+
job_queue_spec=JobQueueSpec(
90+
name=JOB_QUEUE_NAME,
91+
execution_mode=JobQueueExecutionMode.PRIORITY,
92+
max_concurrency=5,
93+
idle_timeout_s=180,
94+
),
95+
),
96+
)
97+
98+
submit_another_anyscale_job_with_existing_job_queue = SubmitAnyscaleJob(
99+
task_id="submit_another_anyscale_job_with_existing_job_queue",
100+
conn_id=ANYSCALE_CONN_ID,
101+
name="AstroJobWithJobQueue",
102+
image_uri="anyscale/image/airflow-integration-testing:1",
103+
compute_config="airflow-integration-testing:1",
104+
working_dir=str(FOLDER_PATH),
105+
entrypoint="python ray-job.py",
106+
max_retries=1,
107+
job_timeout_seconds=3000,
108+
poll_interval=30,
109+
dag=dag,
110+
job_queue_config=JobQueueConfig(
111+
priority=100,
112+
target_job_queue_name=JOB_QUEUE_NAME,
113+
),
114+
)
115+
47116

48117
# Defining the task sequence
49118
submit_anyscale_job
119+
submit_anyscale_job_with_new_job_queue >> [
120+
submit_anyscale_job_with_existing_job_queue,
121+
submit_another_anyscale_job_with_existing_job_queue,
122+
]

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ requires-python = ">=3.8, <3.13"
2626
dependencies = [
2727
"apache-airflow>=2.7",
2828
"pyyaml",
29-
"anyscale==0.24.44",
29+
"anyscale>=0.24.54",
3030
]
3131

3232
[project.urls]

0 commit comments

Comments
 (0)