Skip to content

Commit 6bbb4c7

Browse files
committed
Fix example DAG sample_anyscale_service_workflow (#62)
The example DAG `sample_anyscale_service_workflow` previously defined a UUID that was expected to be used in different tasks while maintaining the same value. This approach does not work in most setups of Airflow, especially in a distributed environment, since the DAG is reparsed at each executor, resulting in different UUIDs. This is an example of what happened even when trying to run this DAG locally: <img width="1189" alt="Screenshot 2025-03-20 at 14 56 55" src="https://github.com/user-attachments/assets/dca5fd21-c3cd-4296-8e39-36d17395b683" /> Where the second task mentions it cannot find the Anyscale service `AstroService-CICD-9220ff9a-ede3-47ee-a583-0f1bcd9581cc`. ``` File "/Users/tati/Code/astro-provider-anyscale/dags/anyscale_service.py", line 52, in terminate_service result = hook.terminate_service(service_name=SERVICE_NAME, time_delay=5) File "/Users/tati/Code/astro-provider-anyscale/venv/lib/python3.9/site-packages/anyscale_provider/hooks/anyscale.py", line 141, in terminate_service raise AirflowException(f"Service termination failed with error: {e}") airflow.exceptions.AirflowException: Service termination failed with error: Service with name 'AstroService-CICD-9220ff9a-ede3-47ee-a583-0f1bcd9581cc' was not found. ``` While the first task succeeds, mentioning it successfully created the service `AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297`, which has a different UUID. ``` [2025-03-20, 14:19:46 UTC] {taskinstance.py:289} INFO - Pausing task as DEFERRED. dag_id=sample_anyscale_service_workflow, task_id=rollout_anyscale_service, run_id=manual__2025-03-20T14:19:36.827994+00:00, execution_date=20250320T141936, start_date=20250320T141938 [2025-03-20, 14:19:46 UTC] {taskinstance.py:341} ▶ Post task execution logs [2025-03-20, 14:19:48 UTC] {anyscale.py:174} INFO - Monitoring service AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 every 30 seconds to reach RUNNING [2025-03-20, 14:19:48 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:19:48 UTC] {base.py:84} INFO - Retrieving connection 'anyscale_conn' [2025-03-20, 14:19:48 UTC] {anyscale.py:36} INFO - Using Anyscale connection_id: anyscale_conn [2025-03-20, 14:19:54 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: STARTING [2025-03-20, 14:20:24 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:20:26 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: STARTING [2025-03-20, 14:20:56 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:20:58 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: STARTING [2025-03-20, 14:21:28 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:21:29 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: STARTING [2025-03-20, 14:21:59 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:22:01 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: STARTING [2025-03-20, 14:22:31 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:22:33 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: STARTING [2025-03-20, 14:23:03 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:23:05 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: STARTING [2025-03-20, 14:23:35 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:23:38 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: STARTING [2025-03-20, 14:24:08 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:24:10 UTC] {anyscale.py:233} INFO - Current service state for AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 is: RUNNING [2025-03-20, 14:24:10 UTC] {anyscale.py:109} INFO - Fetching service status for Service: AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297 [2025-03-20, 14:24:12 UTC] {triggerer_job_runner.py:631} INFO - Trigger sample_anyscale_service_workflow/manual__2025-03-20T14:19:36.827994+00:00/rollout_anyscale_service/-1/1 (ID 1) fired: TriggerEvent<{'state': <ServiceState.RUNNING: 'RUNNING'>, 'message': 'Service deployment succeeded', 'service_name': 'AstroService-CICD-a3f7d907-318a-4cd8-98b4-a53340a78297'}> ``` This PR solves the issue by allowing users to specify a fixed ID that can be reused across multiple tasks of the same DAG. After this fix, the DAG run successfully: ![Screenshot 2025-03-20 at 15 08 41](https://github.com/user-attachments/assets/67868cdc-5b28-499f-83a0-835cee32fed8)
1 parent 4ff5913 commit 6bbb4c7

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

.github/workflows/test.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,11 @@ jobs:
104104
105105
- name: Test Anyscale against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }}
106106
run: |
107-
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration
107+
ID="${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}"
108+
# Service names must be no longer than 57 characters and can only contain alphanumeric characters, dashes, and underscores
109+
ID_CLEANED=$(echo $ID | tr '.' '-' | tr '/' '-') # Replace dots and forward slashes with hyphens so it is a valid Service name
110+
echo ASTRO_ANYSCALE_PROVIDER_SERVICE_ID
111+
ASTRO_ANYSCALE_PROVIDER_SERVICE_ID=$ID_CLEANED hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration
108112
env:
109113
ANYSCALE_CLI_TOKEN: ${{ secrets.ANYSCALE_CLI_TOKEN }}
110114
- name: Upload coverage to Github

example_dags/anyscale_service.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import uuid
23
from datetime import datetime
34

@@ -19,7 +20,8 @@
1920

2021
# Define the Anyscale connection
2122
ANYSCALE_CONN_ID = "anyscale_conn"
22-
SERVICE_NAME = f"AstroService-CICD-{uuid.uuid4()}"
23+
service_id = os.getenv("ASTRO_ANYSCALE_PROVIDER_SERVICE_ID", {uuid.uuid4()})
24+
SERVICE_NAME = f"AstroService-CICD-{service_id}"
2325

2426
dag = DAG(
2527
"sample_anyscale_service_workflow",

0 commit comments

Comments
 (0)