Skip to content
Open
76 changes: 76 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,82 @@ def cleanup_old_snapshots(table_name: str, snapshot_ids: list[int]):
cleanup_old_snapshots("analytics.user_events", [12345, 67890, 11111])
```

#### Advanced Retention Strategies

PyIceberg provides additional retention helpers on `ExpireSnapshots` to balance safety and cleanup:

Key table properties used as defaults (all optional):

- `history.expire.max-snapshot-age-ms`: Default age threshold for `with_retention_policy`
- `history.expire.min-snapshots-to-keep`: Minimum total snapshots to retain
- `history.expire.max-ref-age-ms`: (Reserved for future protected ref/branch cleanup logic)

Protected snapshots (referenced by branches or tags) are never expired by these APIs.

Keep only the last N snapshots (plus protected):

```python
table.maintenance.expire_snapshots().retain_last_n(5).commit()
```

Expire older snapshots but always keep the most recent N and a safety floor:

```python
from datetime import datetime, timedelta

cutoff = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)
table.maintenance.expire_snapshots().older_than_with_retention(
timestamp_ms=cutoff,
retain_last_n=3,
min_snapshots_to_keep=4,
).commit()
```

Unified policy that also reads table property defaults:

```python
# Uses table properties if arguments omitted
table.maintenance.expire_snapshots().with_retention_policy().commit()

# Override selectively
table.maintenance.expire_snapshots().with_retention_policy(
retain_last_n=2, # keep 2 newest regardless of age
min_snapshots_to_keep=5, # never go below 5 total
# timestamp_ms omitted -> falls back to history.expire.max-snapshot-age-ms if set
).commit()
```

##### Using a context manager

You can use a context manager to automatically commit on successful exit (and skip commit if an exception occurs):

```python
# Keep the 3 newest snapshots (plus protected refs) and enforce a floor of 8 total
with table.maintenance.expire_snapshots() as expire:
expire.with_retention_policy(retain_last_n=3, min_snapshots_to_keep=8)

# Only keep the last 5 snapshots
with table.maintenance.expire_snapshots() as expire:
expire.retain_last_n(5)

# Combine explicit cutoff with other guards
from datetime import datetime, timedelta
cutoff = int((datetime.utcnow() - timedelta(days=14)).timestamp() * 1000)

with table.maintenance.expire_snapshots() as expire:
expire.older_than_with_retention(timestamp_ms=cutoff, retain_last_n=2, min_snapshots_to_keep=6)
```

Parameter interaction rules:

- `retain_last_n` snapshots are always kept (plus protected refs)
- `timestamp_ms` filters candidates (older than only)
- `min_snapshots_to_keep` stops expiration once the floor would be violated
- If all of (`timestamp_ms`, `retain_last_n`, `min_snapshots_to_keep`) are None in `with_retention_policy`, nothing is expired
- Passing invalid values (`< 1`) for counts raises `ValueError`

Safety tip: Start with higher `min_snapshots_to_keep` when first enabling automated cleanup.

## Views

PyIceberg supports view operations.
Expand Down
180 changes: 180 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,3 +1005,183 @@ def older_than(self, dt: datetime) -> "ExpireSnapshots":
if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids:
self._snapshot_ids_to_expire.add(snapshot.snapshot_id)
return self

def older_than_with_retention(
self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None
) -> ExpireSnapshots:
"""Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies.

Args:
timestamp_ms: Only snapshots with timestamp_ms < this value will be expired.
retain_last_n: Always keep the last N snapshots regardless of age.
min_snapshots_to_keep: Minimum number of snapshots to keep in total.

Returns:
This for method chaining.
"""
snapshots_to_expire = self._get_snapshots_to_expire_with_retention(
timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep
)
self._snapshot_ids_to_expire.update(snapshots_to_expire)
return self

def with_retention_policy(
self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None
) -> ExpireSnapshots:
"""Comprehensive snapshot expiration with multiple retention strategies.

This method provides a unified interface for snapshot expiration with various
retention policies to ensure operational resilience while allowing space reclamation.

The method will use table properties as defaults if they are set:
- history.expire.max-snapshot-age-ms: Default for timestamp_ms if not provided
- history.expire.min-snapshots-to-keep: Default for min_snapshots_to_keep if not provided
- history.expire.max-ref-age-ms: Used for ref expiration (branches/tags)

Args:
timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration.
If None, will use history.expire.max-snapshot-age-ms table property if set.
retain_last_n: Always keep the last N snapshots regardless of age.
Useful when regular snapshot creation occurs and users want to keep
the last few for rollback purposes.
min_snapshots_to_keep: Minimum number of snapshots to keep in total.
Acts as a guardrail to prevent aggressive expiration logic.
If None, will use history.expire.min-snapshots-to-keep table property if set.

Returns:
This for method chaining.

Raises:
ValueError: If retain_last_n or min_snapshots_to_keep is less than 1.

Examples:
# Use table property defaults
table.expire_snapshots().with_retention_policy().commit()

# Override defaults with explicit values
table.expire_snapshots().with_retention_policy(
timestamp_ms=1234567890000,
retain_last_n=10,
min_snapshots_to_keep=5
).commit()
"""
# Get default values from table properties
default_max_age, default_min_snapshots, _ = self._get_expiration_properties()

# Use defaults from table properties if not explicitly provided
if timestamp_ms is None:
timestamp_ms = default_max_age

if min_snapshots_to_keep is None:
min_snapshots_to_keep = default_min_snapshots

# If no expiration criteria are provided, don't expire anything
if timestamp_ms is None and retain_last_n is None and min_snapshots_to_keep is None:
return self

if retain_last_n is not None and retain_last_n < 1:
raise ValueError("retain_last_n must be at least 1")

if min_snapshots_to_keep is not None and min_snapshots_to_keep < 1:
raise ValueError("min_snapshots_to_keep must be at least 1")

snapshots_to_expire = self._get_snapshots_to_expire_with_retention(
timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep
)
self._snapshot_ids_to_expire.update(snapshots_to_expire)
return self

def retain_last_n(self, n: int) -> ExpireSnapshots:
"""Keep only the last N snapshots globally across all branches, expiring all others.

Note: This method considers snapshots globally across the entire table history,
not per-branch. Protected snapshots (branch/tag heads) are always preserved
regardless of the retention count.

Args:
n: Number of most recent snapshots to keep globally.

Returns:
This for method chaining.

Raises:
ValueError: If n is less than 1.
"""
if n < 1:
raise ValueError("Number of snapshots to retain must be at least 1")

snapshots_to_keep = self._get_snapshots_to_keep(retain_last_n=n)
snapshots_to_expire = [
id for snapshot in self._transaction.table_metadata.snapshots if (id := snapshot.snapshot_id) not in snapshots_to_keep
]

self._snapshot_ids_to_expire.update(snapshots_to_expire)
return self

def _get_snapshots_to_keep(self, retain_last_n: Optional[int] = None) -> Set[int]:
"""Get set of snapshot IDs that should be kept based on protection and retention rules.

Args:
retain_last_n: Number of most recent snapshots to keep.

Returns:
Set of snapshot IDs to keep.
"""
snapshots_to_keep = self._get_protected_snapshot_ids()

if retain_last_n is not None:
# Sort snapshots by timestamp (most recent first), and get most recent N
sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True)
snapshots_to_keep.update(snapshot.snapshot_id for snapshot in sorted_snapshots[:retain_last_n])

return snapshots_to_keep

def _get_snapshots_to_expire_with_retention(
self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None
) -> List[int]:
"""Get snapshots to expire considering retention strategies.

Args:
timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration.
retain_last_n: Always keep the last N snapshots regardless of age.
min_snapshots_to_keep: Minimum number of snapshots to keep in total.

Returns:
List of snapshot IDs to expire.
"""
snapshots_to_keep = self._get_snapshots_to_keep(retain_last_n=retain_last_n)

# Apply timestamp constraint
candidates_for_expiration = []
for snapshot in self._transaction.table_metadata.snapshots:
if snapshot.snapshot_id not in snapshots_to_keep and (timestamp_ms is None or snapshot.timestamp_ms < timestamp_ms):
candidates_for_expiration.append(snapshot)
Comment on lines +1156 to +1158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make more pythonic with comprehension?


# Sort candidates by timestamp (oldest first) for potential expiration
candidates_for_expiration.sort(key=lambda s: s.timestamp_ms)

# Apply min_snapshots_to_keep constraint
if min_snapshots_to_keep is not None:
total_snapshots = len(self._transaction.table_metadata.snapshots)
max_to_expire = total_snapshots - min_snapshots_to_keep
snapshots_to_expire = [candidate.snapshot_id for candidate in candidates_for_expiration[:max_to_expire]]
else:
snapshots_to_expire = [candidate.snapshot_id for candidate in candidates_for_expiration]

return snapshots_to_expire

def _get_expiration_properties(self) -> tuple[Optional[int], Optional[int], Optional[int]]:
"""Get the default expiration properties from table properties.

Returns:
Tuple of (max_snapshot_age_ms, min_snapshots_to_keep, max_ref_age_ms)
"""
from pyiceberg.table import TableProperties

properties = self._transaction.table_metadata.properties

max_snapshot_age = property_as_int(properties, TableProperties.MAX_SNAPSHOT_AGE_MS)
min_snapshots_to_keep = property_as_int(properties, TableProperties.MIN_SNAPSHOTS_TO_KEEP)
max_ref_age_ms = property_as_int(properties, "history.expire.max-ref-age-ms")

return max_snapshot_age, min_snapshots_to_keep, max_ref_age_ms
Loading