diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py index 2bd1b0b793..c79595f777 100644 --- a/sentry_sdk/integrations/concurrent.py +++ b/sentry_sdk/integrations/concurrent.py @@ -5,6 +5,7 @@ import sentry_sdk from sentry_sdk.integrations import Integration from sentry_sdk.scope import use_isolation_scope, use_scope +from sentry_sdk.utils import event_from_exception from typing import TYPE_CHECKING @@ -28,7 +29,7 @@ def setup_once(): def _wrap_submit_call(func): - # type: (Any) -> Any + # type: (Callable[..., Future[Any]]) -> Callable[..., Future[Any]] """ Wrap submit call to propagate scopes on task submission. """ @@ -49,6 +50,30 @@ def wrapped_fn(*args, **kwargs): with use_scope(current_scope): return fn(*args, **kwargs) - return func(self, wrapped_fn, *args, **kwargs) + future = func(self, wrapped_fn, *args, **kwargs) + + def report_exceptions(future): + # type: (Future[Any]) -> None + exception = future.exception() + integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration) + + if ( + exception is None + or integration is None + or not integration.record_exceptions_on_futures + ): + return + + event, hint = event_from_exception( + exception, + client_options=sentry_sdk.get_client().options, + mechanism={"type": "concurrent", "handled": False}, + ) + sentry_sdk.capture_event(event, hint=hint) + + if integration.record_exceptions_on_futures: + future.add_done_callback(report_exceptions) + + return future return sentry_submit diff --git a/tests/integrations/concurrent/test_concurrent.py b/tests/integrations/concurrent/test_concurrent.py index 74a0d0bcbc..21f2594c5c 100644 --- a/tests/integrations/concurrent/test_concurrent.py +++ b/tests/integrations/concurrent/test_concurrent.py @@ -2,14 +2,149 @@ from concurrent import futures from concurrent.futures import Future, ThreadPoolExecutor +import pytest + import sentry_sdk from sentry_sdk.integrations.concurrent import ConcurrentIntegration +from sentry_sdk.integrations.dedupe import DedupeIntegration +from sentry_sdk.integrations.excepthook import ExcepthookIntegration +from sentry_sdk.integrations.threading import ThreadingIntegration original_submit = ThreadPoolExecutor.submit original_set_exception = Future.set_exception +@pytest.mark.parametrize("record_exceptions_on_futures", (True, False)) +def test_handles_exceptions(sentry_init, capture_events, record_exceptions_on_futures): + sentry_init( + default_integrations=False, + integrations=[ + ConcurrentIntegration( + record_exceptions_on_futures=record_exceptions_on_futures + ) + ], + ) + events = capture_events() + + def crash(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(crash) + with pytest.raises(ZeroDivisionError): + future.result() + + if record_exceptions_on_futures: + (event,) = events + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == "concurrent" + assert not exception["mechanism"]["handled"] + else: + assert not events + + +# ThreadPoolExecutor uses threading, but catches exceptions before the Sentry threading integration +@pytest.mark.parametrize( + "potentially_conflicting_integrations", + [ + [ThreadingIntegration(propagate_scope=True)], + [ThreadingIntegration(propagate_scope=False)], + [], + ], +) +def test_threading_enabled_no_duplicate( + sentry_init, capture_events, potentially_conflicting_integrations +): + sentry_init( + default_integrations=False, + integrations=[ + ConcurrentIntegration(), + ] + + potentially_conflicting_integrations, + ) + events = capture_events() + + def crash(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(crash) + with pytest.raises(ZeroDivisionError): + future.result() + + (event,) = events + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == "concurrent" + assert not exception["mechanism"]["handled"] + + +def test_concurrent_deduplicates( + sentry_init, capture_events, capture_record_lost_event_calls +): + sentry_init( + default_integrations=False, + integrations=[ + ExcepthookIntegration(), + DedupeIntegration(), + ConcurrentIntegration(), + ], + ) + events = capture_events() + record_lost_event_calls = capture_record_lost_event_calls() + + def crash(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(crash) + try: + future.result() + except Exception: + sentry_sdk.capture_exception() + + (event,) = events + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + + (lost_event_call,) = record_lost_event_calls + assert lost_event_call == ("event_processor", "error", None, 1) + + +def test_propagates_tag(sentry_init, capture_events): + sentry_init( + default_integrations=False, + integrations=[ConcurrentIntegration()], + ) + events = capture_events() + + def stage1(): + sentry_sdk.get_isolation_scope().set_tag("stage1", "true") + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(stage2) + with pytest.raises(ZeroDivisionError): + future.result() + + def stage2(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(stage1) + future.result() + + (event,) = events + + (exception,) = event["exception"]["values"] + + assert exception["type"] == "ZeroDivisionError" + assert exception["mechanism"]["type"] == "concurrent" + assert not exception["mechanism"]["handled"] + + assert event["tags"]["stage1"] == "true" + + def test_propagates_threadpool_scope(sentry_init, capture_events): sentry_init( default_integrations=False, @@ -65,6 +200,23 @@ def double(number): assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"] +def test_double_patching(sentry_init, capture_events): + sentry_init(integrations=[ConcurrentIntegration()]) + events = capture_events() + + def run(): + 1 / 0 + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + for _ in range(10): + executor.submit(run) + + assert len(events) == 10 + for event in events: + (exception,) = event["exception"]["values"] + assert exception["type"] == "ZeroDivisionError" + + def test_scope_data_not_leaked_in_executor(sentry_init): sentry_init( integrations=[ConcurrentIntegration()],