-
Notifications
You must be signed in to change notification settings - Fork 37
Implement database reconnection with automatic retry logic #1315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
450e48d
Add database reconnection integration test
davidkopp 6e79a3e
Implement database reconnection with automatic retry logic
davidkopp 8bb0b4b
Simplify database retry configuration
davidkopp a6597b2
Move log_with_timestamp helper to test_functions.py
davidkopp a39e2ff
Add unit tests for with_db_retry decorator
davidkopp 19eff9e
Cleanup
davidkopp cb877ec
Add integration tests for DB query methods
davidkopp 16657ec
Add retry decorator to copy_from and import_csv method
davidkopp b6086a9
Enhance run_until function with generator-based pause/resume capability
davidkopp 8d33a94
Simplified test case by using run_steps instead of threading
davidkopp 1369e24
Cleanup
davidkopp 61b5ba6
Disable connection health check explicitely to None [skip ci]
davidkopp 9fd6cc6
Introducing query_multi
ArneTR File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
import unittest | ||
from unittest.mock import Mock, patch | ||
import io | ||
import psycopg | ||
from lib.db import with_db_retry, DB | ||
|
||
|
||
class TestWithDbRetryDecorator(unittest.TestCase): | ||
"""Test the @with_db_retry decorator using mocks to simulate various error conditions. | ||
|
||
These tests verify the retry logic, timeout behavior, and error classification | ||
without requiring actual database connections. | ||
""" | ||
|
||
def setUp(self): | ||
class MockDB: | ||
def __init__(self): | ||
self._pool = Mock() | ||
|
||
def _create_pool(self): | ||
self._pool = Mock() | ||
|
||
@with_db_retry | ||
def failing_method(self): | ||
raise psycopg.OperationalError("connection refused") | ||
|
||
@with_db_retry | ||
def non_retryable_method(self): | ||
raise psycopg.DatabaseError("syntax error at or near") | ||
|
||
self.mock_db = MockDB() | ||
|
||
@patch('time.time') | ||
@patch('time.sleep') | ||
def test_retry_on_retryable_errors(self, mock_sleep, mock_time): | ||
# Mock time progression: start=0, while_loop=1, elapsed_check=2, timeout_while=350, final_msg=350 | ||
mock_time.side_effect = [0, 1, 2, 350, 350] | ||
|
||
with self.assertRaises(psycopg.OperationalError): | ||
self.mock_db.failing_method() | ||
|
||
# Verify that sleep was called (indicating a retry attempt) | ||
self.assertTrue(mock_sleep.called) | ||
|
||
def test_non_retryable_errors(self): | ||
with self.assertRaises(psycopg.DatabaseError) as cm: | ||
self.mock_db.non_retryable_method() | ||
|
||
self.assertIn("syntax error", str(cm.exception)) | ||
|
||
@patch('time.time') | ||
@patch('time.sleep') | ||
@patch('builtins.print') | ||
def test_timeout_behavior(self, mock_print, mock_sleep, mock_time): | ||
# Mock time: start=0, while_check=1, elapsed_check=350 (timeout) | ||
mock_time.side_effect = [0, 1, 350] | ||
|
||
with self.assertRaises(psycopg.OperationalError) as cm: | ||
self.mock_db.failing_method() | ||
|
||
# Original error is raised | ||
self.assertIn("connection refused", str(cm.exception)) | ||
|
||
# But timeout message is printed | ||
timeout_call = None | ||
for call in mock_print.call_args_list: | ||
if "Database retry timeout" in str(call): | ||
timeout_call = call | ||
break | ||
self.assertIsNotNone(timeout_call) | ||
|
||
# Sleep should not be called since timeout occurs before sleep | ||
self.assertFalse(mock_sleep.called) | ||
|
||
|
||
class TestDbIntegration(unittest.TestCase): | ||
"""Integration tests for DB class methods using real database connections. | ||
|
||
These tests verify actual database operations against a test PostgreSQL database without mocking. | ||
""" | ||
|
||
def setUp(self): | ||
self.db = DB() | ||
self.table_name = "test_integration_table" | ||
|
||
def test_basic_query_execution(self): | ||
result = self.db.query(f"CREATE TABLE {self.table_name} (id INT, name TEXT)") | ||
self.assertIn("CREATE TABLE", result) | ||
|
||
def test_fetch_one_operation(self): | ||
self.db.query(f"CREATE TABLE {self.table_name} (id INT, name TEXT)") | ||
self.db.query(f"INSERT INTO {self.table_name} VALUES (1, 'test')") | ||
|
||
result = self.db.fetch_one(f"SELECT id, name FROM {self.table_name} WHERE id = 1") | ||
self.assertEqual(result[0], 1) | ||
self.assertEqual(result[1], 'test') | ||
|
||
def test_fetch_all_operation(self): | ||
self.db.query(f"CREATE TABLE {self.table_name} (id INT, name TEXT)") | ||
self.db.query(f"INSERT INTO {self.table_name} VALUES (1, 'test1'), (2, 'test2')") | ||
|
||
results = self.db.fetch_all(f"SELECT id, name FROM {self.table_name} ORDER BY id") | ||
self.assertEqual(len(results), 2) | ||
self.assertEqual(results[0][0], 1) | ||
self.assertEqual(results[1][0], 2) | ||
|
||
def test_parameter_binding(self): | ||
self.db.query(f"CREATE TABLE {self.table_name} (id INT, name TEXT)") | ||
|
||
self.db.query(f"INSERT INTO {self.table_name} VALUES (%s, %s)", (1, 'param_test')) | ||
result = self.db.fetch_one(f"SELECT name FROM {self.table_name} WHERE id = %s", (1,)) | ||
self.assertEqual(result[0], 'param_test') | ||
|
||
def test_fetch_mode_dict(self): | ||
self.db.query(f"CREATE TABLE {self.table_name} (id INT, name TEXT)") | ||
self.db.query(f"INSERT INTO {self.table_name} VALUES (1, 'dict_test')") | ||
|
||
result = self.db.fetch_one(f"SELECT id, name FROM {self.table_name}", fetch_mode='dict') | ||
self.assertIsInstance(result, dict) | ||
self.assertEqual(result['id'], 1) | ||
self.assertEqual(result['name'], 'dict_test') | ||
|
||
def test_error_handling_invalid_sql(self): | ||
with self.assertRaises(psycopg.DatabaseError): | ||
self.db.query("INVALID SQL STATEMENT") | ||
|
||
def test_copy_from_csv_data(self): | ||
self.db.query(f"CREATE TABLE {self.table_name} (id INT, name TEXT, value NUMERIC)") | ||
|
||
csv_data = io.StringIO("1,test1,10.5\n2,test2,20.7\n") | ||
columns = ['id', 'name', 'value'] | ||
|
||
self.db.copy_from(csv_data, self.table_name, columns) | ||
|
||
results = self.db.fetch_all(f"SELECT id, name, value FROM {self.table_name} ORDER BY id") | ||
self.assertEqual(len(results), 2) | ||
self.assertEqual(results[0][0], 1) | ||
self.assertEqual(results[0][1], 'test1') | ||
self.assertEqual(results[1][0], 2) | ||
self.assertEqual(results[1][1], 'test2') | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function looks good but I think it has a design issue.
If the connection is not on auto-commit you might be re-connection and retrying a part of the query only. This can be tricky problem to debug.
Example code :
In order to find a solution here I believe either the queries need to be buffered ... or we could decide to still fail ... this however would defeat the purpose of this PR entierely i feel.
Currently unclear what the best implementation is without a clear view how complex a query buffering would be. Please investigate / make a concept.
Als the
copy()
statement is currently not profiting from the statement. This would need to be covered too.To extend this question: Are there any other query methods that can be covered we should think of to future-proof?
How does the DB behave on keep-alive queries? Will they also lead to a reconnect?
To my understanding the postgres adapter does regular keep-alives. If they fail it will also complain. Can we hook into this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1. Multi-Statement Queries
The issue you describe is only relevant, if GMT uses multi-statement queries, correct?
I did not find any usage of the multi-statement query feature. Did I overlook something?
If multi-statement queries are really not relevant for GMT, the easiest solution would be to remove the following code block:
And to consider enabling the
autocommit
feature.If the multi-statement query feature is needed, I would do a more comprehensive analysis on how to become transactional.
2. Cover copy method
Done.
3. PostgreSQL keep-alive queries?
There are no automatic keep-alive queries:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amendment to the 3. point "PostgreSQL keep-alive queries?"
psycopg3 includes a built-in
ConnectionPool
retry mechanisms. It is disabled by default, but can be enabled with the parametercheck
:Comparison
How does it compare to the custom
@with_db_retry
decorator implementation?psycopg3 Built-in Limitations
Measurement Interference:
check=ConnectionPool.check_connection
executesconn.execute("")
on every connection borrowLimited Error Scope:
cur.execute()
)PoolTimeout
exceptions with limited contextCustom
@with_db_retry
AdvantagesMeasurement Integrity:
Superior Error Handling:
cur.execute()
Similar Retry Performance:
Both mechanisms use comparable exponential backoff (1s, 2s, 4s, 8s...) with jitter.
Conclusion
Use the custom
@with_db_retry
mechanism because:To be explicit that we don't want to use the check functionality of psycopg3, we should set it to
None
: