From 1fcbea0a7a438268e90e2ee7602c17f99c22ca03 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Fri, 29 Aug 2025 18:15:13 -0700 Subject: [PATCH 1/2] Add ErrorHandler class, ViewAlreadyExistsError, fix unit tests --- pyiceberg/catalog/rest/__init__.py | 57 ++++++--------- pyiceberg/catalog/rest/response.py | 112 +++++++++++++++++++++++------ pyiceberg/exceptions.py | 4 ++ tests/catalog/test_rest.py | 40 ++++++++++- tests/integration/test_deletes.py | 2 +- 5 files changed, 156 insertions(+), 59 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 207b3c4ce2..1e1036ee97 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -40,18 +40,11 @@ PropertiesUpdateSummary, ) from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager -from pyiceberg.catalog.rest.response import _handle_non_200_response +from pyiceberg.catalog.rest.response import ErrorHandlers from pyiceberg.exceptions import ( AuthorizationExpiredError, - CommitFailedException, - CommitStateUnknownException, - NamespaceAlreadyExistsError, - NamespaceNotEmptyError, NoSuchIdentifierError, NoSuchNamespaceError, - NoSuchTableError, - NoSuchViewError, - TableAlreadyExistsError, UnauthorizedError, ) from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN @@ -69,6 +62,7 @@ from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids from pyiceberg.table.update import ( + AssertCreate, TableRequirement, TableUpdate, ) @@ -366,7 +360,7 @@ def _fetch_config(self) -> None: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {}) + ErrorHandlers.default_error_handler(exc) config_response = ConfigResponse.model_validate_json(response.text) config = config_response.defaults @@ -524,7 +518,7 @@ def _create_table( try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {409: TableAlreadyExistsError, 404: NoSuchNamespaceError}) + ErrorHandlers.table_error_handler(exc) return TableResponse.model_validate_json(response.text) @retry(**_RETRY_ARGS) @@ -597,7 +591,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {409: TableAlreadyExistsError}) + ErrorHandlers.table_error_handler(exc) table_response = TableResponse.model_validate_json(response.text) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @@ -610,7 +604,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.namespace_error_handler(exc) return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers] @retry(**_RETRY_ARGS) @@ -628,7 +622,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError}) + ErrorHandlers.table_error_handler(exc) table_response = TableResponse.model_validate_json(response.text) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @@ -642,7 +636,7 @@ def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError}) + ErrorHandlers.table_error_handler(exc) @retry(**_RETRY_ARGS) def purge_table(self, identifier: Union[str, Identifier]) -> None: @@ -658,7 +652,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError, 409: TableAlreadyExistsError}) + ErrorHandlers.table_error_handler(exc) return self.load_table(to_identifier) @@ -681,7 +675,7 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.view_error_handler(exc) return [(*view.namespace, view.name) for view in ListViewsResponse.model_validate_json(response.text).identifiers] @retry(**_RETRY_ARGS) @@ -719,15 +713,10 @@ def commit_table( try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response( - exc, - { - 409: CommitFailedException, - 500: CommitStateUnknownException, - 502: CommitStateUnknownException, - 504: CommitStateUnknownException, - }, - ) + if AssertCreate() in requirements: + ErrorHandlers.table_error_handler(exc) + else: + ErrorHandlers.commit_error_handler(exc) return CommitTableResponse.model_validate_json(response.text) @retry(**_RETRY_ARGS) @@ -738,7 +727,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {409: NamespaceAlreadyExistsError}) + ErrorHandlers.namespace_error_handler(exc) @retry(**_RETRY_ARGS) def drop_namespace(self, namespace: Union[str, Identifier]) -> None: @@ -748,7 +737,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) + ErrorHandlers.drop_namespace_error_handler(exc) @retry(**_RETRY_ARGS) def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: @@ -763,7 +752,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.namespace_error_handler(exc) return ListNamespaceResponse.model_validate_json(response.text).namespaces @@ -775,7 +764,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.namespace_error_handler(exc) return NamespaceResponse.model_validate_json(response.text).properties @@ -790,7 +779,7 @@ def update_namespace_properties( try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.namespace_error_handler(exc) parsed_response = UpdateNamespacePropertiesResponse.model_validate_json(response.text) return PropertiesUpdateSummary( removed=parsed_response.removed, @@ -812,7 +801,7 @@ def namespace_exists(self, namespace: Union[str, Identifier]) -> bool: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {}) + ErrorHandlers.namespace_error_handler(exc) return False @@ -838,7 +827,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {}) + ErrorHandlers.table_error_handler(exc) return False @@ -863,7 +852,7 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {}) + ErrorHandlers.view_error_handler(exc) return False @@ -875,7 +864,7 @@ def drop_view(self, identifier: Union[str]) -> None: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchViewError}) + ErrorHandlers.view_error_handler(exc) def close(self) -> None: """Close the catalog and release Session connection adapters. diff --git a/pyiceberg/catalog/rest/response.py b/pyiceberg/catalog/rest/response.py index 8f23af8c35..61b38bedd2 100644 --- a/pyiceberg/catalog/rest/response.py +++ b/pyiceberg/catalog/rest/response.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. from json import JSONDecodeError -from typing import Dict, Literal, Optional, Type +from typing import Dict, Literal, Optional, Type, TypeAlias from pydantic import Field, ValidationError from requests import HTTPError @@ -23,12 +23,21 @@ from pyiceberg.exceptions import ( AuthorizationExpiredError, BadRequestError, + CommitFailedException, + CommitStateUnknownException, ForbiddenError, + NamespaceAlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + NoSuchViewError, OAuthError, RESTError, ServerError, ServiceUnavailableError, + TableAlreadyExistsError, UnauthorizedError, + ViewAlreadyExistsError, ) from pyiceberg.typedef import IcebergBaseModel @@ -60,33 +69,92 @@ class OAuthErrorResponse(IcebergBaseModel): error_uri: Optional[str] = None -def _handle_non_200_response(exc: HTTPError, error_handler: Dict[int, Type[Exception]]) -> None: +_ErrorHandler: TypeAlias = Dict[int, Type[Exception]] + + +class ErrorHandlers: + """ + Utility class providing static methods to handle HTTP errors for table, namespace, and view operations. + + Maps HTTP error responses to appropriate custom exceptions, ensuring consistent error handling. + """ + + @staticmethod + def default_error_handler(exc: HTTPError) -> None: + _handle_non_200_response(exc, {}) + + @staticmethod + def namespace_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = { + 400: BadRequestError, + 404: NoSuchNamespaceError, + 409: NamespaceAlreadyExistsError, + 422: RESTError, + } + + if "NamespaceNotEmpty" in exc.response.text: + handler[400] = NamespaceNotEmptyError + + _handle_non_200_response(exc, handler) + + @staticmethod + def drop_namespace_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError} + + _handle_non_200_response(exc, handler) + + @staticmethod + def table_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = {404: NoSuchTableError, 409: TableAlreadyExistsError} + + if "NoSuchNamespace" in exc.response.text: + handler[404] = NoSuchNamespaceError + + _handle_non_200_response(exc, handler) + + @staticmethod + def commit_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = { + 404: NoSuchTableError, + 409: CommitFailedException, + 500: CommitStateUnknownException, + 502: CommitStateUnknownException, + 503: CommitStateUnknownException, + 504: CommitStateUnknownException, + } + + _handle_non_200_response(exc, handler) + + @staticmethod + def view_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = {404: NoSuchViewError, 409: ViewAlreadyExistsError} + + if "NoSuchNamespace" in exc.response.text: + handler[404] = NoSuchNamespaceError + + _handle_non_200_response(exc, handler) + + +def _handle_non_200_response(exc: HTTPError, handler: _ErrorHandler) -> None: exception: Type[Exception] if exc.response is None: raise ValueError("Did not receive a response") code = exc.response.status_code - if code in error_handler: - exception = error_handler[code] - elif code == 400: - exception = BadRequestError - elif code == 401: - exception = UnauthorizedError - elif code == 403: - exception = ForbiddenError - elif code == 422: - exception = RESTError - elif code == 419: - exception = AuthorizationExpiredError - elif code == 501: - exception = NotImplementedError - elif code == 503: - exception = ServiceUnavailableError - elif 500 <= code < 600: - exception = ServerError - else: - exception = RESTError + + default_handler: _ErrorHandler = { + 400: BadRequestError, + 401: UnauthorizedError, + 403: ForbiddenError, + 419: AuthorizationExpiredError, + 422: RESTError, + 501: NotImplementedError, + 503: ServiceUnavailableError, + } + + # Merge handler passed with default handler map, if no match exception will be ServerError or RESTError + exception = handler.get(code, default_handler.get(code, ServerError if 500 <= code < 600 else RESTError)) try: if exception == OAuthError: diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index c80f104e46..d525ac7219 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -44,6 +44,10 @@ class NoSuchViewError(Exception): """Raises when the view can't be found in the REST catalog.""" +class ViewAlreadyExistsError(Exception): + """Raised when creating a view with a name that already exists.""" + + class NoSuchIdentifierError(Exception): """Raises when the identifier can't be found in the REST catalog.""" diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 223c6d2f9e..87d800d235 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -942,7 +942,7 @@ def test_load_table_404(rest_mock: Mocker) -> None: json={ "error": { "message": "Table does not exist: examples.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", - "type": "NoSuchNamespaceErrorException", + "type": "NoSuchTableError", "code": 404, } }, @@ -954,6 +954,23 @@ def test_load_table_404(rest_mock: Mocker) -> None: RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_table(("fokko", "does_not_exists")) assert "Table does not exist" in str(e.value) +def test_load_table_404_non_existent_namespace(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists", + json={ + "error": { + "message": "Table does not exist: examples.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", + "type": "NoSuchNamespaceError", + "code": 404, + } + }, + status_code=404, + request_headers=TEST_HEADERS, + ) + + with pytest.raises(NoSuchNamespaceError) as e: + RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_table(("fokko", "does_not_exists")) + assert "Table does not exist" in str(e.value) def test_table_exists_200(rest_mock: Mocker) -> None: rest_mock.head( @@ -1003,7 +1020,7 @@ def test_drop_table_404(rest_mock: Mocker) -> None: json={ "error": { "message": "Table does not exist: fokko.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", - "type": "NoSuchNamespaceErrorException", + "type": "NoSuchTableError", "code": 404, } }, @@ -1016,6 +1033,25 @@ def test_drop_table_404(rest_mock: Mocker) -> None: assert "Table does not exist" in str(e.value) +def test_drop_table_404_non_existent_namespace(rest_mock: Mocker) -> None: + rest_mock.delete( + f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists", + json={ + "error": { + "message": "Table does not exist: fokko.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", + "type": "NoSuchNamespaceErrorException", + "code": 404, + } + }, + status_code=404, + request_headers=TEST_HEADERS, + ) + + with pytest.raises(NoSuchNamespaceError) as e: + RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(("fokko", "does_not_exists")) + assert "Table does not exist" in str(e.value) + + def test_create_table_200( rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any] ) -> None: diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 21c3d12999..90517edbb6 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -43,7 +43,7 @@ def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None: def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: identifier = "default.__test_table" arrow_table = pa.Table.from_arrays([pa.array([1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e"])], names=["idx", "value"]) - test_table = session_catalog.create_table( + test_table = session_catalog.create_table_if_not_exists( identifier, schema=Schema( NestedField(1, "idx", LongType()), From b922e1f7398cf2e414bbd6b4bf0577b0cb7cce1a Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Sat, 30 Aug 2025 14:29:05 -0700 Subject: [PATCH 2/2] lint --- tests/catalog/test_rest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 87d800d235..212e655765 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -954,6 +954,7 @@ def test_load_table_404(rest_mock: Mocker) -> None: RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_table(("fokko", "does_not_exists")) assert "Table does not exist" in str(e.value) + def test_load_table_404_non_existent_namespace(rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists", @@ -972,6 +973,7 @@ def test_load_table_404_non_existent_namespace(rest_mock: Mocker) -> None: RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_table(("fokko", "does_not_exists")) assert "Table does not exist" in str(e.value) + def test_table_exists_200(rest_mock: Mocker) -> None: rest_mock.head( f"{TEST_URI}v1/namespaces/fokko/tables/table",