-
Notifications
You must be signed in to change notification settings - Fork 358
feat: add snapshot expiration methods with retention strategies #2369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ForeverAngry
wants to merge
7
commits into
apache:main
Choose a base branch
from
ForeverAngry:feat/retention-strategies
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+449
−0
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
3da1528
feat: add snapshot expiration methods with retention strategies
ForeverAngry a8fe03d
chore: documented new retention methods
ForeverAngry a1a126f
test: enhance snapshot expiration tests with retention policy validat…
ForeverAngry 2c6eb0b
feat: add context manager examples for snapshot expiration with reten…
ForeverAngry 855e22a
chore: update snapshot.py for code cleanup and organization
ForeverAngry 29ae3a8
Fixed: doc string for `retain_last_n` to properly describe the function.
ForeverAngry ad556e3
Re-trigger CI/CD after fixing linting issues
ForeverAngry File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1005,3 +1005,183 @@ def older_than(self, dt: datetime) -> "ExpireSnapshots": | |
if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids: | ||
self._snapshot_ids_to_expire.add(snapshot.snapshot_id) | ||
return self | ||
|
||
def older_than_with_retention( | ||
self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None | ||
) -> ExpireSnapshots: | ||
"""Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies. | ||
|
||
Args: | ||
timestamp_ms: Only snapshots with timestamp_ms < this value will be expired. | ||
retain_last_n: Always keep the last N snapshots regardless of age. | ||
min_snapshots_to_keep: Minimum number of snapshots to keep in total. | ||
|
||
Returns: | ||
This for method chaining. | ||
""" | ||
snapshots_to_expire = self._get_snapshots_to_expire_with_retention( | ||
timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep | ||
) | ||
self._snapshot_ids_to_expire.update(snapshots_to_expire) | ||
return self | ||
|
||
def with_retention_policy( | ||
self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None | ||
) -> ExpireSnapshots: | ||
"""Comprehensive snapshot expiration with multiple retention strategies. | ||
|
||
This method provides a unified interface for snapshot expiration with various | ||
retention policies to ensure operational resilience while allowing space reclamation. | ||
|
||
The method will use table properties as defaults if they are set: | ||
- history.expire.max-snapshot-age-ms: Default for timestamp_ms if not provided | ||
- history.expire.min-snapshots-to-keep: Default for min_snapshots_to_keep if not provided | ||
- history.expire.max-ref-age-ms: Used for ref expiration (branches/tags) | ||
|
||
Args: | ||
timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. | ||
If None, will use history.expire.max-snapshot-age-ms table property if set. | ||
retain_last_n: Always keep the last N snapshots regardless of age. | ||
Useful when regular snapshot creation occurs and users want to keep | ||
the last few for rollback purposes. | ||
min_snapshots_to_keep: Minimum number of snapshots to keep in total. | ||
Acts as a guardrail to prevent aggressive expiration logic. | ||
If None, will use history.expire.min-snapshots-to-keep table property if set. | ||
|
||
Returns: | ||
This for method chaining. | ||
|
||
Raises: | ||
ValueError: If retain_last_n or min_snapshots_to_keep is less than 1. | ||
|
||
Examples: | ||
# Use table property defaults | ||
table.expire_snapshots().with_retention_policy().commit() | ||
|
||
# Override defaults with explicit values | ||
table.expire_snapshots().with_retention_policy( | ||
timestamp_ms=1234567890000, | ||
retain_last_n=10, | ||
min_snapshots_to_keep=5 | ||
).commit() | ||
""" | ||
# Get default values from table properties | ||
default_max_age, default_min_snapshots, _ = self._get_expiration_properties() | ||
|
||
# Use defaults from table properties if not explicitly provided | ||
if timestamp_ms is None: | ||
timestamp_ms = default_max_age | ||
|
||
if min_snapshots_to_keep is None: | ||
min_snapshots_to_keep = default_min_snapshots | ||
|
||
# If no expiration criteria are provided, don't expire anything | ||
if timestamp_ms is None and retain_last_n is None and min_snapshots_to_keep is None: | ||
return self | ||
|
||
if retain_last_n is not None and retain_last_n < 1: | ||
raise ValueError("retain_last_n must be at least 1") | ||
|
||
if min_snapshots_to_keep is not None and min_snapshots_to_keep < 1: | ||
raise ValueError("min_snapshots_to_keep must be at least 1") | ||
|
||
snapshots_to_expire = self._get_snapshots_to_expire_with_retention( | ||
timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep | ||
) | ||
self._snapshot_ids_to_expire.update(snapshots_to_expire) | ||
return self | ||
|
||
def retain_last_n(self, n: int) -> ExpireSnapshots: | ||
"""Keep only the last N snapshots globally across all branches, expiring all others. | ||
|
||
Note: This method considers snapshots globally across the entire table history, | ||
not per-branch. Protected snapshots (branch/tag heads) are always preserved | ||
regardless of the retention count. | ||
|
||
Args: | ||
n: Number of most recent snapshots to keep globally. | ||
|
||
Returns: | ||
This for method chaining. | ||
|
||
Raises: | ||
ValueError: If n is less than 1. | ||
""" | ||
if n < 1: | ||
raise ValueError("Number of snapshots to retain must be at least 1") | ||
|
||
snapshots_to_keep = self._get_snapshots_to_keep(retain_last_n=n) | ||
snapshots_to_expire = [ | ||
id for snapshot in self._transaction.table_metadata.snapshots if (id := snapshot.snapshot_id) not in snapshots_to_keep | ||
] | ||
|
||
self._snapshot_ids_to_expire.update(snapshots_to_expire) | ||
return self | ||
|
||
def _get_snapshots_to_keep(self, retain_last_n: Optional[int] = None) -> Set[int]: | ||
"""Get set of snapshot IDs that should be kept based on protection and retention rules. | ||
|
||
Args: | ||
retain_last_n: Number of most recent snapshots to keep. | ||
|
||
Returns: | ||
Set of snapshot IDs to keep. | ||
""" | ||
snapshots_to_keep = self._get_protected_snapshot_ids() | ||
|
||
if retain_last_n is not None: | ||
# Sort snapshots by timestamp (most recent first), and get most recent N | ||
sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) | ||
snapshots_to_keep.update(snapshot.snapshot_id for snapshot in sorted_snapshots[:retain_last_n]) | ||
|
||
return snapshots_to_keep | ||
|
||
def _get_snapshots_to_expire_with_retention( | ||
self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None | ||
) -> List[int]: | ||
"""Get snapshots to expire considering retention strategies. | ||
|
||
Args: | ||
timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. | ||
retain_last_n: Always keep the last N snapshots regardless of age. | ||
min_snapshots_to_keep: Minimum number of snapshots to keep in total. | ||
|
||
Returns: | ||
List of snapshot IDs to expire. | ||
""" | ||
snapshots_to_keep = self._get_snapshots_to_keep(retain_last_n=retain_last_n) | ||
|
||
# Apply timestamp constraint | ||
candidates_for_expiration = [] | ||
for snapshot in self._transaction.table_metadata.snapshots: | ||
if snapshot.snapshot_id not in snapshots_to_keep and (timestamp_ms is None or snapshot.timestamp_ms < timestamp_ms): | ||
candidates_for_expiration.append(snapshot) | ||
Comment on lines
+1156
to
+1158
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make more pythonic with comprehension? |
||
|
||
# Sort candidates by timestamp (oldest first) for potential expiration | ||
candidates_for_expiration.sort(key=lambda s: s.timestamp_ms) | ||
|
||
# Apply min_snapshots_to_keep constraint | ||
if min_snapshots_to_keep is not None: | ||
total_snapshots = len(self._transaction.table_metadata.snapshots) | ||
max_to_expire = total_snapshots - min_snapshots_to_keep | ||
snapshots_to_expire = [candidate.snapshot_id for candidate in candidates_for_expiration[:max_to_expire]] | ||
else: | ||
snapshots_to_expire = [candidate.snapshot_id for candidate in candidates_for_expiration] | ||
|
||
return snapshots_to_expire | ||
|
||
def _get_expiration_properties(self) -> tuple[Optional[int], Optional[int], Optional[int]]: | ||
"""Get the default expiration properties from table properties. | ||
|
||
Returns: | ||
Tuple of (max_snapshot_age_ms, min_snapshots_to_keep, max_ref_age_ms) | ||
""" | ||
from pyiceberg.table import TableProperties | ||
|
||
properties = self._transaction.table_metadata.properties | ||
|
||
max_snapshot_age = property_as_int(properties, TableProperties.MAX_SNAPSHOT_AGE_MS) | ||
min_snapshots_to_keep = property_as_int(properties, TableProperties.MIN_SNAPSHOTS_TO_KEEP) | ||
max_ref_age_ms = property_as_int(properties, "history.expire.max-ref-age-ms") | ||
|
||
return max_snapshot_age, min_snapshots_to_keep, max_ref_age_ms |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.