diff --git a/django_celery_results/migrations/0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more.py b/django_celery_results/migrations/0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more.py new file mode 100644 index 00000000..bcc825ca --- /dev/null +++ b/django_celery_results/migrations/0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more.py @@ -0,0 +1,33 @@ +# Generated by Django 5.2.4 on 2025-07-09 09:20 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_results', '0014_alter_taskresult_status'), + ] + + operations = [ + migrations.AddField( + model_name='chordcounter', + name='new_sub_tasks', + field=models.JSONField(default=None, help_text='JSON serialized list of task result tuples. use .group_result() to decode'), + ), + migrations.AddField( + model_name='taskresult', + name='new_meta', + field=models.JSONField(default=None, editable=False, help_text='JSON meta information about the task, such as information on child tasks', null=True, verbose_name='Task Meta Information'), + ), + migrations.AddField( + model_name='taskresult', + name='new_task_args', + field=models.JSONField(help_text='JSON representation of the positional arguments used with the task', null=True, verbose_name='Task Positional Arguments'), + ), + migrations.AddField( + model_name='taskresult', + name='new_task_kwargs', + field=models.JSONField(help_text='JSON representation of the named arguments used with the task', null=True, verbose_name='Task Named Arguments'), + ), + ] diff --git a/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py b/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py new file mode 100644 index 00000000..05e888db --- /dev/null +++ b/django_celery_results/migrations/0016_make_copy_of_taskresult_textfields.py @@ -0,0 +1,81 @@ +import json +import logging + +from django.db import migrations, transaction + +logger = logging.getLogger(__name__) + + +def safe_json_loads(value, default=None): + """Safely parse JSON string with fallback.""" + if not value: # Handles None, empty string, etc. + return default + return json.loads(value) + + +def make_copy_of_taskresult_textfields(apps, schema_editor): + TaskResult = apps.get_model('django_celery_results', 'TaskResult') + + total_count = TaskResult.objects.count() + logger.info(f"Starting migration for {total_count} TaskResult records") + + batch_size = 500 + processed_count = 0 + error_count = 0 + last_id = 0 + + while True: + with transaction.atomic(): + # Get next batch using cursor pagination + batch = list( + TaskResult.objects.filter(id__gt=last_id) + .order_by('id')[:batch_size] + ) + + if not batch: + break + + updates = [] + + for obj in batch: + try: + # Parse JSON fields with appropriate defaults + obj.new_task_args = safe_json_loads(obj.task_args) + obj.new_task_kwargs = safe_json_loads(obj.task_kwargs) + obj.new_meta = safe_json_loads(obj.meta) + + updates.append(obj) + + except Exception as e: + error_count += 1 + logger.error(f"Error processing TaskResult ID {obj.id}: {e}") + continue + + if updates: + TaskResult.objects.bulk_update( + updates, + ['new_task_args', 'new_task_kwargs', 'new_meta'] + ) + processed_count += len(updates) + + last_id = batch[-1].id + + # Progress logging + progress = (processed_count / total_count) * 100 if total_count > 0 else 0 + logger.info(f"Processed {processed_count}/{total_count} records ({progress:.1f}%)") + + logger.info(f"Migration completed. Successfully processed {processed_count} records, " + f"{error_count} errors encountered") + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_results', '0015_chordcounter_new_sub_tasks_taskresult_new_meta_and_more'), + ] + + operations = [ + migrations.RunPython( + make_copy_of_taskresult_textfields, + migrations.RunPython.noop + ) + ] diff --git a/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py b/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py new file mode 100644 index 00000000..e944ab8f --- /dev/null +++ b/django_celery_results/migrations/0017_make_copy_of_chordcounter_textfields.py @@ -0,0 +1,79 @@ +import json +import logging + +from django.db import migrations, transaction + +logger = logging.getLogger(__name__) + + +def safe_json_loads(value, default=None): + """Safely parse JSON string with fallback.""" + if not value: # Handles None, empty string, etc. + return default + return json.loads(value) + + +def make_copy_of_chordcounter_textfields(apps, schema_editor): + chord_counter = apps.get_model('django_celery_results', 'ChordCounter') + + total_count = chord_counter.objects.count() + logger.info(f"Starting migration for {total_count} ChordCounter records") + + batch_size = 500 + processed_count = 0 + error_count = 0 + last_id = 0 + + while True: + with transaction.atomic(): + # Get next batch using cursor pagination + batch = list( + chord_counter.objects.filter(id__gt=last_id) + .order_by('id')[:batch_size] + ) + + if not batch: + break + + updates = [] + + for obj in batch: + try: + # Parse JSON fields with appropriate defaults + obj.new_sub_tasks = safe_json_loads(obj.sub_tasks) + updates.append(obj) + + except Exception as e: + error_count += 1 + logger.error(f"Error processing ChordCounter ID {obj.id}: {e}") + continue + + if updates: + chord_counter.objects.bulk_update( + updates, + ['new_sub_tasks'] + ) + processed_count += len(updates) + + last_id = batch[-1].id + + # Progress logging + progress = (processed_count / total_count) * 100 if total_count > 0 else 0 + logger.info(f"Processed {processed_count}/{total_count} records ({progress:.1f}%)") + + logger.info(f"Migration completed. Successfully processed {processed_count} records, " + f"{error_count} errors encountered") + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_results', '0016_make_copy_of_taskresult_textfields'), + ] + + operations = [ + migrations.RunPython( + make_copy_of_chordcounter_textfields, + migrations.RunPython.noop + ) + ] diff --git a/django_celery_results/migrations/0018_remove_chordcounter_new_sub_tasks_and_more.py b/django_celery_results/migrations/0018_remove_chordcounter_new_sub_tasks_and_more.py new file mode 100644 index 00000000..9b900f3a --- /dev/null +++ b/django_celery_results/migrations/0018_remove_chordcounter_new_sub_tasks_and_more.py @@ -0,0 +1,50 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_results', '0017_make_copy_of_chordcounter_textfields'), + ] + + operations = [ + # Remove the old fields + migrations.RemoveField( + model_name='chordcounter', + name='sub_tasks', + ), + migrations.RemoveField( + model_name='taskresult', + name='meta', + ), + migrations.RemoveField( + model_name='taskresult', + name='task_args', + ), + migrations.RemoveField( + model_name='taskresult', + name='task_kwargs', + ), + + # Rename the new_ fields to their non-prefixed versions + migrations.RenameField( + model_name='chordcounter', + old_name='new_sub_tasks', + new_name='sub_tasks', + ), + migrations.RenameField( + model_name='taskresult', + old_name='new_meta', + new_name='meta', + ), + migrations.RenameField( + model_name='taskresult', + old_name='new_task_args', + new_name='task_args', + ), + migrations.RenameField( + model_name='taskresult', + old_name='new_task_kwargs', + new_name='task_kwargs', + ), + ] diff --git a/django_celery_results/models.py b/django_celery_results/models.py index b472a5af..5e5affa1 100644 --- a/django_celery_results/models.py +++ b/django_celery_results/models.py @@ -39,12 +39,12 @@ class TaskResult(models.Model): ), verbose_name=_('Task Name'), help_text=_('Name of the Task which was run')) - task_args = models.TextField( + task_args = models.JSONField( null=True, verbose_name=_('Task Positional Arguments'), help_text=_('JSON representation of the positional arguments ' 'used with the task')) - task_kwargs = models.TextField( + task_kwargs = models.JSONField( null=True, verbose_name=_('Task Named Arguments'), help_text=_('JSON representation of the named arguments ' @@ -86,7 +86,7 @@ class TaskResult(models.Model): blank=True, null=True, verbose_name=_('Traceback'), help_text=_('Text of the traceback if the task generated one')) - meta = models.TextField( + meta = models.JSONField( null=True, default=None, editable=False, verbose_name=_('Task Meta Information'), help_text=_('JSON meta information about the task, ' @@ -148,7 +148,8 @@ class ChordCounter(models.Model): verbose_name=_("Group ID"), help_text=_("Celery ID for the Chord header group"), ) - sub_tasks = models.TextField( + sub_tasks = models.JSONField( + default=None, help_text=_( "JSON serialized list of task result tuples. " "use .group_result() to decode"