Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 23 additions & 34 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,11 @@
PropertiesUpdateSummary,
)
from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
from pyiceberg.catalog.rest.response import _handle_non_200_response
from pyiceberg.catalog.rest.response import ErrorHandlers
from pyiceberg.exceptions import (
AuthorizationExpiredError,
CommitFailedException,
CommitStateUnknownException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchIdentifierError,
NoSuchNamespaceError,
NoSuchTableError,
NoSuchViewError,
TableAlreadyExistsError,
UnauthorizedError,
)
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
Expand All @@ -69,6 +62,7 @@
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
from pyiceberg.table.update import (
AssertCreate,
TableRequirement,
TableUpdate,
)
Expand Down Expand Up @@ -366,7 +360,7 @@ def _fetch_config(self) -> None:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {})
ErrorHandlers.default_error_handler(exc)
config_response = ConfigResponse.model_validate_json(response.text)

config = config_response.defaults
Expand Down Expand Up @@ -524,7 +518,7 @@ def _create_table(
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {409: TableAlreadyExistsError, 404: NoSuchNamespaceError})
ErrorHandlers.table_error_handler(exc)
return TableResponse.model_validate_json(response.text)

@retry(**_RETRY_ARGS)
Expand Down Expand Up @@ -597,7 +591,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {409: TableAlreadyExistsError})
ErrorHandlers.table_error_handler(exc)

table_response = TableResponse.model_validate_json(response.text)
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
Expand All @@ -610,7 +604,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
ErrorHandlers.namespace_error_handler(exc)
return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers]

@retry(**_RETRY_ARGS)
Expand All @@ -628,7 +622,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchTableError})
ErrorHandlers.table_error_handler(exc)

table_response = TableResponse.model_validate_json(response.text)
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
Expand All @@ -642,7 +636,7 @@ def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool =
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchTableError})
ErrorHandlers.table_error_handler(exc)

@retry(**_RETRY_ARGS)
def purge_table(self, identifier: Union[str, Identifier]) -> None:
Expand All @@ -658,7 +652,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchTableError, 409: TableAlreadyExistsError})
ErrorHandlers.table_error_handler(exc)

return self.load_table(to_identifier)

Expand All @@ -681,7 +675,7 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
ErrorHandlers.view_error_handler(exc)
return [(*view.namespace, view.name) for view in ListViewsResponse.model_validate_json(response.text).identifiers]

@retry(**_RETRY_ARGS)
Expand Down Expand Up @@ -719,15 +713,10 @@ def commit_table(
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(
exc,
{
409: CommitFailedException,
500: CommitStateUnknownException,
502: CommitStateUnknownException,
504: CommitStateUnknownException,
},
)
if AssertCreate() in requirements:
ErrorHandlers.table_error_handler(exc)
else:
ErrorHandlers.commit_error_handler(exc)
return CommitTableResponse.model_validate_json(response.text)

@retry(**_RETRY_ARGS)
Expand All @@ -738,7 +727,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {409: NamespaceAlreadyExistsError})
ErrorHandlers.namespace_error_handler(exc)

@retry(**_RETRY_ARGS)
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
Expand All @@ -748,7 +737,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError})
ErrorHandlers.drop_namespace_error_handler(exc)

@retry(**_RETRY_ARGS)
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
Expand All @@ -763,7 +752,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
ErrorHandlers.namespace_error_handler(exc)

return ListNamespaceResponse.model_validate_json(response.text).namespaces

Expand All @@ -775,7 +764,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
ErrorHandlers.namespace_error_handler(exc)

return NamespaceResponse.model_validate_json(response.text).properties

Expand All @@ -790,7 +779,7 @@ def update_namespace_properties(
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
ErrorHandlers.namespace_error_handler(exc)
parsed_response = UpdateNamespacePropertiesResponse.model_validate_json(response.text)
return PropertiesUpdateSummary(
removed=parsed_response.removed,
Expand All @@ -812,7 +801,7 @@ def namespace_exists(self, namespace: Union[str, Identifier]) -> bool:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {})
ErrorHandlers.namespace_error_handler(exc)

return False

Expand All @@ -838,7 +827,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {})
ErrorHandlers.table_error_handler(exc)

return False

Expand All @@ -863,7 +852,7 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {})
ErrorHandlers.view_error_handler(exc)

return False

Expand All @@ -875,7 +864,7 @@ def drop_view(self, identifier: Union[str]) -> None:
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchViewError})
ErrorHandlers.view_error_handler(exc)

def close(self) -> None:
"""Close the catalog and release Session connection adapters.
Expand Down
112 changes: 90 additions & 22 deletions pyiceberg/catalog/rest/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,29 @@
# specific language governing permissions and limitations
# under the License.
from json import JSONDecodeError
from typing import Dict, Literal, Optional, Type
from typing import Dict, Literal, Optional, Type, TypeAlias

from pydantic import Field, ValidationError
from requests import HTTPError

from pyiceberg.exceptions import (
AuthorizationExpiredError,
BadRequestError,
CommitFailedException,
CommitStateUnknownException,
ForbiddenError,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
NoSuchViewError,
OAuthError,
RESTError,
ServerError,
ServiceUnavailableError,
TableAlreadyExistsError,
UnauthorizedError,
ViewAlreadyExistsError,
)
from pyiceberg.typedef import IcebergBaseModel

Expand Down Expand Up @@ -60,33 +69,92 @@ class OAuthErrorResponse(IcebergBaseModel):
error_uri: Optional[str] = None


def _handle_non_200_response(exc: HTTPError, error_handler: Dict[int, Type[Exception]]) -> None:
_ErrorHandler: TypeAlias = Dict[int, Type[Exception]]


class ErrorHandlers:
"""
Utility class providing static methods to handle HTTP errors for table, namespace, and view operations.

Maps HTTP error responses to appropriate custom exceptions, ensuring consistent error handling.
"""

@staticmethod
def default_error_handler(exc: HTTPError) -> None:
_handle_non_200_response(exc, {})

@staticmethod
def namespace_error_handler(exc: HTTPError) -> None:
handler: _ErrorHandler = {
400: BadRequestError,
404: NoSuchNamespaceError,
409: NamespaceAlreadyExistsError,
422: RESTError,
}

if "NamespaceNotEmpty" in exc.response.text:
handler[400] = NamespaceNotEmptyError

_handle_non_200_response(exc, handler)

@staticmethod
def drop_namespace_error_handler(exc: HTTPError) -> None:
handler: _ErrorHandler = {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}

_handle_non_200_response(exc, handler)

@staticmethod
def table_error_handler(exc: HTTPError) -> None:
handler: _ErrorHandler = {404: NoSuchTableError, 409: TableAlreadyExistsError}

if "NoSuchNamespace" in exc.response.text:
handler[404] = NoSuchNamespaceError

_handle_non_200_response(exc, handler)

@staticmethod
def commit_error_handler(exc: HTTPError) -> None:
handler: _ErrorHandler = {
404: NoSuchTableError,
409: CommitFailedException,
500: CommitStateUnknownException,
502: CommitStateUnknownException,
503: CommitStateUnknownException,
504: CommitStateUnknownException,
}

_handle_non_200_response(exc, handler)

@staticmethod
def view_error_handler(exc: HTTPError) -> None:
handler: _ErrorHandler = {404: NoSuchViewError, 409: ViewAlreadyExistsError}

if "NoSuchNamespace" in exc.response.text:
handler[404] = NoSuchNamespaceError

_handle_non_200_response(exc, handler)


def _handle_non_200_response(exc: HTTPError, handler: _ErrorHandler) -> None:
exception: Type[Exception]

if exc.response is None:
raise ValueError("Did not receive a response")

code = exc.response.status_code
if code in error_handler:
exception = error_handler[code]
elif code == 400:
exception = BadRequestError
elif code == 401:
exception = UnauthorizedError
elif code == 403:
exception = ForbiddenError
elif code == 422:
exception = RESTError
elif code == 419:
exception = AuthorizationExpiredError
elif code == 501:
exception = NotImplementedError
elif code == 503:
exception = ServiceUnavailableError
elif 500 <= code < 600:
exception = ServerError
else:
exception = RESTError

default_handler: _ErrorHandler = {
400: BadRequestError,
401: UnauthorizedError,
403: ForbiddenError,
419: AuthorizationExpiredError,
422: RESTError,
501: NotImplementedError,
503: ServiceUnavailableError,
}

# Merge handler passed with default handler map, if no match exception will be ServerError or RESTError
exception = handler.get(code, default_handler.get(code, ServerError if 500 <= code < 600 else RESTError))

try:
if exception == OAuthError:
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class NoSuchViewError(Exception):
"""Raises when the view can't be found in the REST catalog."""


class ViewAlreadyExistsError(Exception):
"""Raised when creating a view with a name that already exists."""


class NoSuchIdentifierError(Exception):
"""Raises when the identifier can't be found in the REST catalog."""

Expand Down
Loading