diff --git a/docker-compose.override.unit_tests_cicd.yml b/docker-compose.override.unit_tests_cicd.yml
index 62f59d13769..e12e3142942 100644
--- a/docker-compose.override.unit_tests_cicd.yml
+++ b/docker-compose.override.unit_tests_cicd.yml
@@ -15,6 +15,7 @@ services:
environment:
PYTHONWARNINGS: error # We are strict about Warnings during testing
DD_DEBUG: 'True'
+ DD_TEST: 'True'
DD_LOG_LEVEL: 'ERROR'
DD_TEST_DATABASE_NAME: ${DD_TEST_DATABASE_NAME:-test_defectdojo}
DD_DATABASE_NAME: ${DD_TEST_DATABASE_NAME:-test_defectdojo}
diff --git a/dojo/filters.py b/dojo/filters.py
index 4b22d11560f..69bde7d0923 100644
--- a/dojo/filters.py
+++ b/dojo/filters.py
@@ -2043,6 +2043,78 @@ def set_related_object_fields(self):
self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View)
+class DynamicFindingGroupsFilter(FilterSet):
+ name = CharFilter(lookup_expr="icontains", label="Name")
+ severity = ChoiceFilter(
+ choices=[
+ ("Low", "Low"),
+ ("Medium", "Medium"),
+ ("High", "High"),
+ ("Critical", "Critical"),
+ ],
+ label="Min Severity",
+ )
+ engagement = ModelMultipleChoiceFilter(queryset=Engagement.objects.none(), label="Engagement")
+ product = ModelMultipleChoiceFilter(queryset=Product.objects.none(), label="Product")
+
+ class Meta:
+ model = Finding
+ fields = ["name", "severity", "engagement", "product"]
+
+ def __init__(self, *args, **kwargs):
+ self.user = kwargs.pop("user", None)
+ self.pid = kwargs.pop("pid", None)
+ super().__init__(*args, **kwargs)
+ self.set_related_object_fields()
+
+ def set_related_object_fields(self):
+ if self.pid is not None:
+ self.form.fields["engagement"].queryset = Engagement.objects.filter(product_id=self.pid)
+ if "product" in self.form.fields:
+ del self.form.fields["product"]
+ else:
+ self.form.fields["product"].queryset = get_authorized_products(Permissions.Product_View)
+ self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View)
+
+
+class DynamicFindingGroupsFindingsFilter(FilterSet):
+ name = CharFilter(lookup_expr="icontains", label="Name")
+ severity = MultipleChoiceFilter(
+ choices=[
+ ("Low", "Low"),
+ ("Medium", "Medium"),
+ ("High", "High"),
+ ("Critical", "Critical"),
+ ],
+ label="Severity",
+ )
+ vuln_id_from_tool = CharFilter(lookup_expr="icontains", label="Vulnerability Id From Tool")
+ reporter = ModelMultipleChoiceFilter(queryset=Dojo_User.objects.none(), label="Reporter")
+ active = ChoiceFilter(choices=[("Yes", "Yes"), ("No", "No")], label="Active")
+ engagement = ModelMultipleChoiceFilter(queryset=Engagement.objects.none(), label="Engagement")
+ product = ModelMultipleChoiceFilter(queryset=Product.objects.none(), label="Product")
+
+ class Meta:
+ model = Finding
+ fields = ["name", "severity", "vuln_id_from_tool", "reporter", "active", "engagement", "product"]
+
+ def __init__(self, *args, **kwargs):
+ self.user = kwargs.pop("user", None)
+ self.pid = kwargs.pop("pid", None)
+ super().__init__(*args, **kwargs)
+ self.set_related_object_fields()
+
+ def set_related_object_fields(self):
+ if self.pid is not None:
+ self.form.fields["engagement"].queryset = Engagement.objects.filter(product_id=self.pid)
+ if "product" in self.form.fields:
+ del self.form.fields["product"]
+ else:
+ self.form.fields["product"].queryset = get_authorized_products(Permissions.Product_View)
+ self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View)
+ self.form.fields["reporter"].queryset = get_authorized_users(Permissions.Finding_View)
+
+
class AcceptedFindingFilter(FindingFilter):
risk_acceptance__created__date = DateRangeFilter(label="Acceptance Date")
risk_acceptance__owner = ModelMultipleChoiceFilter(
diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py
index 3f6a597a542..74df95adea8 100644
--- a/dojo/finding/helper.py
+++ b/dojo/finding/helper.py
@@ -15,6 +15,7 @@
from dojo.decorators import dojo_async_task, dojo_model_from_id, dojo_model_to_id
from dojo.endpoint.utils import save_endpoints_to_add
from dojo.file_uploads.helper import delete_related_files
+from dojo.finding_group.redis import DynamicFindingGroups
from dojo.models import (
Endpoint,
Endpoint_Status,
@@ -416,6 +417,8 @@ def finding_pre_delete(sender, instance, **kwargs):
delete_related_notes(instance)
delete_related_files(instance)
+ DynamicFindingGroups.set_last_finding_change()
+
def finding_delete(instance, **kwargs):
logger.debug("finding delete, instance: %s", instance.id)
@@ -446,6 +449,8 @@ def finding_delete(instance, **kwargs):
logger.debug("finding delete: clearing found by")
instance.found_by.clear()
+ DynamicFindingGroups.set_last_finding_change()
+
@receiver(post_delete, sender=Finding)
def finding_post_delete(sender, instance, **kwargs):
diff --git a/dojo/finding_group/README.md b/dojo/finding_group/README.md
new file mode 100644
index 00000000000..77c2f3a867b
--- /dev/null
+++ b/dojo/finding_group/README.md
@@ -0,0 +1,73 @@
+# Dynamic Finding Groups
+This module manages dynamic grouping of Findings in DefectDojo. Findings can be grouped using different strategies (called GroupModes) such as:
+- Grouping by the vuln_id_from_tool
+- Grouping by the Finding title
+- Grouping by the associated CVE
+
+The grouping is user-configurable through the UI and relies on Redis for fast storage and retrieval of groups.
+
+## How it works
+When a user selects a grouping mode, the system builds and stores finding groups in Redis. These groups are refreshed automatically whenever new Findings are added or existing ones are modified.
+
+### Redis is used to:
+- Store the mapping between Findings and their groups.
+- Store the serialized representation of each DynamicFindingGroups object.
+- Manage timestamps that help us detect if the stored groups are outdated.
+
+### Two global keys are important here:
+- finding_groups_last_finding_change: Updated whenever a Finding is created/updated.
+- finding_groups_last_update: Stores the last time a specific GroupMode was rebuilt.
+
+### When we rebuild groups:
+Group rebuilding occurs in the following cases:
+- The groups are missing in Redis, or
+- The timestamps `finding_groups_last_finding_change` and `finding_groups_last_update` do not match.
+
+In practice, whenever a change occurs, the value of `last_finding_change` becomes more recent than `last_update`. At that point, the groups are rebuilt, and `last_update` is updated to match `last_finding_change`.
+
+The `last_update` entry stores the timestamp per mode. Whenever a user opens the tab for a given mode, the system compares the timestamps. If they differ, the groups are rebuilt; otherwise, the existing groups are reused.
+
+## Adding a new GroupMode
+To add a new grouping strategy:
+
+1. Extend the GroupMode enum. Add a new entry, for example:
+```python
+class GroupMode(StrEnum):
+ VULN_ID_FROM_TOOL = "vuln_id_from_tool"
+ TITLE = "title"
+ CVE = "cve"
+ CUSTOM_TAG = "custom_tag" # ← New mode
+```
+
+2. Update `DynamicFindingGroups.get_group_names`. Define how the Finding should be grouped for the new mode:
+```python
+if mode == GroupMode.CUSTOM_TAG:
+ return finding.custom_tags.all().values_list("name", flat=True)
+```
+
+3. Expose the mode in the HTML select box. Edit `finding_groups_dynamic_list_snippet.html` to allow the user to select it:
+```html
+
+```
+
+## User selection in the UI
+Users must explicitly choose a grouping mode in the UI for this feature to take effect.
+The selection is available in `finding_groups_dynamic_list_snippet.html`:
+```html
+
+```
+If no mode is selected, **Redis will not be used** and no dynamic finding groups will be built.
+
+## Summary
+- Redis stores groups and manages synchronization via timestamps.
+- Groups are rebuilt only when necessary.
+- Adding a new GroupMode requires extending the enum, defining the grouping logic, and updating the HTML select box.
+- Users must explicitly select a mode in the UI; otherwise, grouping is disabled.
\ No newline at end of file
diff --git a/dojo/finding_group/redis.py b/dojo/finding_group/redis.py
new file mode 100644
index 00000000000..543a01317e1
--- /dev/null
+++ b/dojo/finding_group/redis.py
@@ -0,0 +1,226 @@
+import base64
+import json
+import logging
+import os
+from dataclasses import asdict, dataclass, field
+from datetime import datetime
+from enum import StrEnum
+from functools import lru_cache
+from typing import Self
+
+import redis
+from django.conf import settings
+from django.utils.functional import cached_property
+
+from dojo.models import Finding
+
+logger = logging.getLogger(__name__)
+
+DD_TEST = os.getenv("DD_TEST", "False").lower() == "true"
+USER_MODES_KEY = "finding_groups_user_modes"
+LAST_FINDING_CHANGE = "finding_groups_last_finding_change"
+LAST_FINDING_UPDATE = "finding_groups_last_update"
+
+
+class GroupMode(StrEnum):
+ VULN_ID_FROM_TOOL = "vuln_id_from_tool"
+ TITLE = "title"
+ CVE = "cve"
+
+
+@dataclass
+class DynamicFindingGroups:
+ finding_group_id: str
+ name: str = ""
+ severity: str = "Info"
+ main_finding_id: int | None = None
+ finding_ids: set[int] = field(default_factory=set)
+
+ def to_dict(self) -> dict:
+ data = asdict(self)
+ data["finding_ids"] = list(data["finding_ids"])
+ return data
+
+ @staticmethod
+ def from_dict(data: dict) -> Self:
+ data["finding_ids"] = set(data.get("finding_ids", []))
+ return DynamicFindingGroups(**data)
+
+ @staticmethod
+ def load_from_id(finding_group_id: str, fg_key: str) -> Self | None:
+ redis_client = get_redis_client()
+ finding_group_data = redis_client.hget(fg_key, finding_group_id)
+ if finding_group_data:
+ return DynamicFindingGroups.from_dict(json.loads(finding_group_data))
+ return None
+
+ def update_sev_sla(self, finding: Finding) -> None:
+ if Finding.get_number_severity(finding.severity) > Finding.get_number_severity(self.severity):
+ self.severity = finding.severity
+ self.main_finding_id = finding.id
+
+ def add(self, finding: Finding) -> None:
+ self.update_sev_sla(finding)
+ self.finding_ids.add(finding.id)
+
+ # This method is used when we filter findings in a finding group
+ def reconfig_finding_group(self) -> None:
+ self.severity = "Info"
+ findings = Finding.objects.filter(id__in=self.finding_ids)
+ for finding in findings:
+ self.update_sev_sla(finding)
+
+ @staticmethod
+ def get_group_names(finding: Finding, mode: GroupMode) -> list[str] | None:
+ if mode == GroupMode.VULN_ID_FROM_TOOL:
+ if finding.vuln_id_from_tool:
+ return [finding.vuln_id_from_tool]
+ if mode == GroupMode.TITLE:
+ if finding.title:
+ return [finding.title]
+ if mode == GroupMode.CVE:
+ cves = [
+ cve for cve in finding.vulnerability_id_set.values_list("vulnerability_id", flat=True)
+ if cve
+ ]
+ if cves:
+ return cves
+ return None
+
+ @staticmethod
+ def get_fg_key(mode: GroupMode) -> str:
+ return f"finding_groups_{mode.value}"
+
+ @staticmethod
+ def get_id_map_key(mode: GroupMode) -> str:
+ return f"finding_groups_id_to_finding_group_{mode.value}"
+
+ @staticmethod
+ def set_last_finding_change() -> None:
+ if DD_TEST:
+ logger.info("Redis is not used in test environment, skipping.")
+ return
+ redis_client = get_redis_client()
+ redis_client.set(LAST_FINDING_CHANGE, datetime.now().isoformat())
+
+ @staticmethod
+ def set_last_update(mode: GroupMode, timestamp: datetime | None = None) -> None:
+ if timestamp is None:
+ return
+ redis_client = get_redis_client()
+ redis_client.hset(LAST_FINDING_UPDATE, mode.value, timestamp.isoformat())
+
+ @staticmethod
+ def add_finding(finding: Finding, mode: GroupMode) -> None:
+ finding_groups = DynamicFindingGroups.get_group_names(finding, mode)
+ if not finding_groups:
+ return
+ redis_client = get_redis_client()
+ for finding_group_name in finding_groups:
+ finding_group_id = base64.b64encode(finding_group_name.encode()).decode()
+ fg_key = DynamicFindingGroups.get_fg_key(mode)
+ id_map_key = DynamicFindingGroups.get_id_map_key(mode)
+
+ finding_group = DynamicFindingGroups.load_from_id(finding_group_id, fg_key)
+ if not finding_group:
+ finding_group = DynamicFindingGroups(
+ finding_group_id=finding_group_id,
+ name=finding_group_name,
+ )
+
+ if finding.id not in finding_group.finding_ids:
+ finding_group.add(finding)
+
+ redis_client.hset(fg_key, finding_group_id, json.dumps(finding_group.to_dict()))
+ group_ids_raw = redis_client.hget(id_map_key, finding.id)
+ group_ids = json.loads(group_ids_raw) if group_ids_raw else []
+ if finding_group_id not in group_ids:
+ group_ids.append(finding_group_id)
+ redis_client.hset(id_map_key, finding.id, json.dumps(group_ids))
+
+ @cached_property
+ def sla_days_remaining_internal(self):
+ findings = Finding.objects.filter(id__in=self.finding_ids, active=True)
+ if not findings:
+ return None
+ return min([find.sla_days_remaining() for find in findings if find.sla_days_remaining()], default=None)
+
+ @property
+ def sla_days_remaining(self) -> int | None:
+ return self.sla_days_remaining_internal
+
+
+@lru_cache(maxsize=1)
+def get_redis_client() -> redis.Redis:
+ host = getattr(settings, "REDIS_HOST", "redis")
+ port = getattr(settings, "REDIS_PORT", 6379)
+ return redis.Redis(host=host, port=port, decode_responses=True)
+
+
+def get_user_mode(user_id: int) -> GroupMode | None:
+ redis_client = get_redis_client()
+ value = redis_client.hget(USER_MODES_KEY, str(user_id))
+ if value and value not in [m.value for m in GroupMode]:
+ logger.warning(f"Invalid group mode '{value}' found in Redis for user {user_id}, resetting to None.")
+ redis_client.hdel(USER_MODES_KEY, str(user_id))
+ return None
+ return GroupMode(value) if value else None
+
+
+def set_user_mode(user_id: int, mode: GroupMode) -> None:
+ redis_client = get_redis_client()
+ redis_client.hset(USER_MODES_KEY, str(user_id), mode.value)
+ logger.info(f"User {user_id} dynamic finding groups mode set to {mode.value}")
+
+
+def load_or_rebuild_finding_groups(mode: GroupMode) -> dict[str, DynamicFindingGroups]:
+ redis_client = get_redis_client()
+ fg_key = DynamicFindingGroups.get_fg_key(mode)
+ id_map_key = DynamicFindingGroups.get_id_map_key(mode)
+
+ if not redis_client.exists(LAST_FINDING_CHANGE):
+ DynamicFindingGroups.set_last_finding_change()
+ last_finding_change_raw = redis_client.get(LAST_FINDING_CHANGE)
+ try:
+ last_finding_change_time = datetime.fromisoformat(last_finding_change_raw)
+ except ValueError:
+ logger.warning(f"Invalid datetime format in Redis for {LAST_FINDING_CHANGE}: {last_finding_change_raw}, resetting last finding change.")
+ DynamicFindingGroups.set_last_finding_change()
+ last_finding_change_raw = redis_client.get(LAST_FINDING_CHANGE)
+ last_finding_change_time = datetime.fromisoformat(last_finding_change_raw) if last_finding_change_raw else None
+
+ try:
+ last_groups_update_time = redis_client.hget(LAST_FINDING_UPDATE, mode.value)
+ last_groups_update_time = datetime.fromisoformat(last_groups_update_time) if last_groups_update_time else None
+ except ValueError:
+ logger.warning(f"Invalid datetime format in Redis for {LAST_FINDING_UPDATE}: {last_groups_update_time}")
+ last_groups_update_time = None
+
+ # Check if finding_groups and id_map exist in Redis
+ # Check if last update is the same as last finding change
+ # If not, rebuild them
+ if (
+ not redis_client.exists(fg_key)
+ or not redis_client.exists(id_map_key)
+ or last_groups_update_time != last_finding_change_time
+ ):
+ if not last_finding_change_time:
+ logger.warning("Last finding change is not set, setting it to now.")
+ elif last_groups_update_time and last_finding_change_time < last_groups_update_time:
+ logger.warning("Last finding change is older than last update, they should be equal or last finding change should be newer.")
+ redis_client.delete(fg_key, id_map_key)
+ for finding in Finding.objects.all():
+ DynamicFindingGroups.add_finding(finding, mode)
+ DynamicFindingGroups.set_last_update(mode, last_finding_change_time)
+
+ return _load_finding_groups_from_redis(fg_key, redis_client)
+
+
+def _load_finding_groups_from_redis(fg_key: str, redis_client: redis.Redis) -> dict[str, DynamicFindingGroups]:
+ finding_groups_data = redis_client.hgetall(fg_key)
+ if finding_groups_data:
+ return {
+ key: DynamicFindingGroups.from_dict(json.loads(value))
+ for key, value in finding_groups_data.items()
+ }
+ return {}
diff --git a/dojo/finding_group/urls.py b/dojo/finding_group/urls.py
index 938d95ba9e9..b6280cb219e 100644
--- a/dojo/finding_group/urls.py
+++ b/dojo/finding_group/urls.py
@@ -1,6 +1,6 @@
from django.urls import re_path
-from dojo.finding_group import views
+from dojo.finding_group import views, views_dynamic
urlpatterns = [
# finding group
@@ -9,8 +9,14 @@
re_path(r"^finding_group/(?P\d+)/jira/push$", views.push_to_jira, name="finding_group_push_to_jira"),
re_path(r"^finding_group/(?P\d+)/jira/unlink$", views.unlink_jira, name="finding_group_unlink_jira"),
- # finding group list views
+ # static finding group list views
re_path(r"^finding_group/all$", views.ListFindingGroups.as_view(), name="all_finding_groups"),
re_path(r"^finding_group/open$", views.ListOpenFindingGroups.as_view(), name="open_finding_groups"),
re_path(r"^finding_group/closed$", views.ListClosedFindingGroups.as_view(), name="closed_finding_groups"),
+
+ # dynamic finding group list views
+ re_path(r"^dynamic_finding_group/all$", views_dynamic.ListDynamicFindingGroups.as_view(), name="all_dynamic_finding_groups"),
+ re_path(r"^dynamic_finding_group/open$", views_dynamic.ListOpenDynamicFindingGroups.as_view(), name="open_dynamic_finding_groups"),
+ re_path(r"^dynamic_finding_group/closed$", views_dynamic.ListClosedDynamicFindingGroups.as_view(), name="closed_dynamic_finding_groups"),
+ re_path(r"^dynamic_finding_group/(?P[^/]+)/findings$", views_dynamic.DynamicFindingGroupsFindings.as_view(), name="dynamic_finding_group_findings"),
]
diff --git a/dojo/finding_group/views_dynamic.py b/dojo/finding_group/views_dynamic.py
new file mode 100644
index 00000000000..00b1775d85c
--- /dev/null
+++ b/dojo/finding_group/views_dynamic.py
@@ -0,0 +1,282 @@
+import logging
+
+from django.core.paginator import Paginator
+from django.http import HttpRequest
+from django.shortcuts import render
+from django.views import View
+
+from dojo.authorization.roles_permissions import Permissions
+from dojo.filters import DynamicFindingGroupsFilter, DynamicFindingGroupsFindingsFilter
+from dojo.finding_group.redis import (
+ GroupMode,
+ get_user_mode,
+ load_or_rebuild_finding_groups,
+ set_user_mode,
+)
+from dojo.forms import FindingBulkUpdateForm
+from dojo.models import Finding, Global_Role
+from dojo.product.queries import get_authorized_products
+from dojo.utils import add_breadcrumb
+
+logger = logging.getLogger(__name__)
+
+
+def paginate_queryset(queryset, request: HttpRequest):
+ page_size = request.GET.get("page_size", 25) # Default is 25
+ paginator = Paginator(queryset, page_size)
+ page_number = request.GET.get("page")
+ return paginator.get_page(page_number)
+
+
+class ListDynamicFindingGroups(View):
+ filter_name = "All"
+
+ def get_template(self):
+ return "dojo/finding_groups_dynamic_list.html"
+
+ def order_field(self, request: HttpRequest, finding_groups_findings_list):
+ order_field = request.GET.get("o")
+ if order_field:
+ reverse_order = order_field.startswith("-")
+ if reverse_order:
+ order_field = order_field[1:]
+ if order_field == "name":
+ finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.name, reverse=reverse_order)
+ elif order_field == "findings_count":
+ finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: len(x.finding_ids), reverse=reverse_order)
+ return finding_groups_findings_list
+
+ def filters(self, request: HttpRequest):
+ name_filter = request.GET.get("name", "").lower()
+ min_severity_filter = request.GET.get("severity")
+ engagement_filter = request.GET.getlist("engagement")
+ product_filter = request.GET.getlist("product")
+ return name_filter, min_severity_filter, engagement_filter, product_filter
+
+ def filter_finding_group(self, finding_group, request: HttpRequest):
+ name_filter, min_severity_filter, engagement_filter, product_filter = self.filters(request)
+ add_finding_group = True
+ if product_filter:
+ finding_group.finding_ids = set(finding_group.finding_ids) & set(
+ Finding.objects.filter(test__engagement__product__id__in=product_filter).values_list("id", flat=True),
+ )
+ if engagement_filter:
+ finding_group.finding_ids = set(finding_group.finding_ids) & set(
+ Finding.objects.filter(test__engagement__id__in=engagement_filter).values_list("id", flat=True),
+ )
+ finding_group.reconfig_finding_group()
+ if name_filter and name_filter not in finding_group.name.lower():
+ add_finding_group = False
+ if min_severity_filter and Finding.get_number_severity(finding_group.severity) < Finding.get_number_severity(min_severity_filter):
+ add_finding_group = False
+ if not finding_group.finding_ids:
+ add_finding_group = False
+ return add_finding_group
+
+ def get_findings(self, products):
+ finding_group_fids = {
+ fid for finding_group in self.finding_groups_map.values() for fid in finding_group.finding_ids
+ }
+ filters = {"id__in": finding_group_fids}
+ if products:
+ filters["test__engagement__product__in"] = products
+ user_findings_qs = Finding.objects.filter(**filters)
+ user_fids = set(user_findings_qs.values_list("id", flat=True))
+ active_fids = set(
+ user_findings_qs.filter(active=True).values_list("id", flat=True),
+ )
+ return user_fids, active_fids
+
+ def get_finding_groups(self, request: HttpRequest, products=None):
+ """
+ Retrieve all dynamic finding groups for the current user.
+
+ Steps:
+ 1. Retrieve finding IDs relevant for the user (optionally filtered by products).
+ 2. Iterate over all finding groups in self.finding_groups_map.
+ 3. For each group:
+ - Restrict the group's findings to those the user can see.
+ - Apply additional filters based on the request.
+ - No additional filtering for active findings.
+ 4. Append groups that pass all filters to the result list.
+ 5. Order the resulting list according to the request via order_field and return.
+ """
+ user_fids, _ = self.get_findings(products)
+ list_finding_group = []
+ for finding_group in self.finding_groups_map.values():
+ finding_group.finding_ids = set(finding_group.finding_ids) & user_fids
+ if self.filter_finding_group(finding_group, request):
+ list_finding_group.append(finding_group)
+ return self.order_field(request, list_finding_group)
+
+ def get(self, request: HttpRequest):
+ global_role = Global_Role.objects.filter(user=request.user).first()
+ products = get_authorized_products(Permissions.Product_View)
+ mode_str = request.GET.get("mode", None)
+ user_id = request.user.id
+ if mode_str:
+ try:
+ mode = GroupMode(mode_str)
+ set_user_mode(user_id, mode)
+ except ValueError:
+ if mode_str is not None:
+ logger.warning(f"Invalid mode: {mode_str}")
+ mode = get_user_mode(user_id)
+ else:
+ mode = get_user_mode(user_id)
+ self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {}
+ if request.user.is_superuser or (global_role and global_role.role):
+ finding_groups = self.get_finding_groups(request)
+ elif products.exists():
+ finding_groups = self.get_finding_groups(request, products)
+ paginated_finding_groups = paginate_queryset(finding_groups, request)
+
+ context = {
+ "filter_name": self.filter_name,
+ "mode": mode.value if mode else None,
+ "filtered": DynamicFindingGroupsFilter(request.GET),
+ "finding_groups": paginated_finding_groups,
+ }
+
+ add_breadcrumb(title="Dynamic Finding Group", top_level=not len(request.GET), request=request)
+ return render(request, self.get_template(), context)
+
+
+class ListOpenDynamicFindingGroups(ListDynamicFindingGroups):
+ filter_name = "Open"
+
+ def get_finding_groups(self, request: HttpRequest, products=None):
+ """
+ Retrieve dynamic finding groups containing at least one active finding.
+
+ Steps:
+ 1. Retrieve finding IDs relevant for the user and the active subset.
+ 2. Iterate over all finding groups in self.finding_groups_map.
+ 3. For each group:
+ - Restrict the group's findings to those the user can see.
+ - Apply additional filters based on the request.
+ - Keep only groups with at least one active finding.
+ 4. Append groups that pass all filters to the result list.
+ 5. Order the resulting list according to the request via order_field and return.
+ """
+ user_fids, active_fids = self.get_findings(products)
+ list_finding_group = []
+ for finding_group in self.finding_groups_map.values():
+ finding_group.finding_ids = set(finding_group.finding_ids) & user_fids
+ if self.filter_finding_group(finding_group, request):
+ if finding_group.finding_ids & active_fids:
+ list_finding_group.append(finding_group)
+ return self.order_field(request, list_finding_group)
+
+
+class ListClosedDynamicFindingGroups(ListDynamicFindingGroups):
+ filter_name = "Closed"
+
+ def get_finding_groups(self, request: HttpRequest, products=None):
+ """
+ Retrieve dynamic finding groups containing no active findings.
+
+ Steps:
+ 1. Retrieve finding IDs relevant for the user and the active subset.
+ 2. Iterate over all finding groups in self.finding_groups_map.
+ 3. For each group:
+ - Restrict the group's findings to those the user can see.
+ - Apply additional filters based on the request.
+ - Keep only groups with no active findings.
+ 4. Append groups that pass all filters to the result list.
+ 5. Order the resulting list according to the request via order_field and return.
+ """
+ user_fids, active_fids = self.get_findings(products)
+ list_finding_group = []
+ for finding_group in self.finding_groups_map.values():
+ finding_group.finding_ids = set(finding_group.finding_ids) & user_fids
+ if self.filter_finding_group(finding_group, request):
+ if not (finding_group.finding_ids & active_fids):
+ list_finding_group.append(finding_group)
+ return self.order_field(request, list_finding_group)
+
+
+class DynamicFindingGroupsFindings(View):
+ def get_template(self):
+ return "dojo/finding_group_dynamic_findings.html"
+
+ def order_field(self, request: HttpRequest, finding_groups_findings_list):
+ order_field = request.GET.get("o")
+ if order_field:
+ reverse_order = order_field.startswith("-")
+ if reverse_order:
+ order_field = order_field[1:]
+ if order_field == "title":
+ finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.title, reverse=reverse_order)
+ elif order_field == "found_by":
+ finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.found_by.count(), reverse=reverse_order)
+ return finding_groups_findings_list
+
+ def filters(self, request: HttpRequest):
+ name_filter = request.GET.get("name", "").lower()
+ severity_filter = request.GET.getlist("severity")
+ vuln_id_from_tool_filter = request.GET.get("vuln_id_from_tool")
+ reporter_filter = request.GET.getlist("reporter")
+ active_filter = request.GET.get("active")
+ engagement_filter = request.GET.getlist("engagement")
+ product_filter = request.GET.getlist("product")
+ return name_filter, severity_filter, vuln_id_from_tool_filter, reporter_filter, active_filter, engagement_filter, product_filter
+
+ def filter_findings(self, findings, request: HttpRequest):
+ name_filter, severity_filter, vuln_id_from_tool_filter, reporter_filter, active_filter, engagement_filter, product_filter = self.filters(request)
+ filter_kwargs = {}
+ if name_filter:
+ filter_kwargs["title__icontains"] = name_filter
+ if severity_filter:
+ filter_kwargs["severity__in"] = severity_filter
+ if vuln_id_from_tool_filter:
+ filter_kwargs["vuln_id_from_tool__icontains"] = vuln_id_from_tool_filter
+ if reporter_filter:
+ filter_kwargs["reporter__id__in"] = reporter_filter
+ if active_filter:
+ filter_kwargs["active"] = (active_filter == "Yes")
+ if engagement_filter:
+ filter_kwargs["test__engagement__id__in"] = engagement_filter
+ if product_filter:
+ filter_kwargs["test__engagement__product__id__in"] = product_filter
+ return findings.filter(**filter_kwargs)
+
+ def get_findings(self, request: HttpRequest, products=None):
+ finding_group = self.finding_groups_map.get(self.finding_group_id)
+
+ # When the finding_group not exists
+ if not finding_group:
+ return None, []
+
+ list_findings = finding_group.finding_ids
+ if products:
+ findings = Finding.objects.filter(id__in=list_findings, test__engagement__product__in=products)
+ else:
+ findings = Finding.objects.filter(id__in=list_findings)
+ findings = self.filter_findings(findings, request)
+ return finding_group.name, self.order_field(request, findings)
+
+ def get(self, request: HttpRequest, finding_group_id: int):
+ self.finding_group_id = finding_group_id
+ global_role = Global_Role.objects.filter(user=request.user).first()
+ products = get_authorized_products(Permissions.Product_View)
+ mode = get_user_mode(request.user.id)
+ self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {}
+ if request.user.is_superuser or (global_role and global_role.role):
+ finding_group_name, findings = self.get_findings(request)
+ elif products.exists():
+ finding_group_name, findings = self.get_findings(request, products)
+ else:
+ finding_group_name = None
+ paginated_findings = paginate_queryset(findings, request)
+
+ context = {
+ "finding_group": finding_group_name,
+ "filtered": DynamicFindingGroupsFindingsFilter(request.GET),
+ "finding_group_id": self.finding_group_id,
+ "findings": paginated_findings,
+ "bulk_edit_form": FindingBulkUpdateForm(request.GET),
+ }
+
+ add_breadcrumb(title="Dynamic Finding Group Findings", top_level=not len(request.GET), request=request)
+ return render(request, self.get_template(), context)
diff --git a/dojo/models.py b/dojo/models.py
index 9d3a238d9ca..1de85cbde37 100644
--- a/dojo/models.py
+++ b/dojo/models.py
@@ -2825,6 +2825,9 @@ def save(self, dedupe_option=True, rules_option=True, product_grading_option=Tru
else:
logger.debug("no options selected that require finding post processing")
+ from dojo.finding_group.redis import DynamicFindingGroups
+ DynamicFindingGroups.set_last_finding_change()
+
def get_absolute_url(self):
from django.urls import reverse
return reverse("view_finding", args=[str(self.id)])
@@ -3480,6 +3483,11 @@ class Vulnerability_Id(models.Model):
def __str__(self):
return self.vulnerability_id
+ def save(self, *args, **kwargs):
+ super().save(*args, **kwargs)
+ from dojo.finding_group.redis import DynamicFindingGroups
+ DynamicFindingGroups.set_last_finding_change()
+
def get_absolute_url(self):
from django.urls import reverse
return reverse("view_finding", args=[str(self.finding.id)])
diff --git a/dojo/static/dojo/css/dojo.css b/dojo/static/dojo/css/dojo.css
index deba72474c9..7a102755ed5 100644
--- a/dojo/static/dojo/css/dojo.css
+++ b/dojo/static/dojo/css/dojo.css
@@ -1911,4 +1911,21 @@ input[type=number]::-webkit-outer-spin-button {
color: ButtonText;
}
}
+
+.submenu-header {
+ font-weight: bold;
+ padding: 5px 15px;
+ color: #666;
+ background-color: #f5f5f5;
+}
+
+.submenu-divider {
+ border-top: 1px solid #ddd;
+ margin: 5px 0;
+}
+
+.nav-second-level li a:hover {
+ background-color: #e9ecef;
+ color: #000;
+}
\ No newline at end of file
diff --git a/dojo/templates/base.html b/dojo/templates/base.html
index 007612ce741..1afd72ae92d 100644
--- a/dojo/templates/base.html
+++ b/dojo/templates/base.html
@@ -339,12 +339,13 @@
+ {% if finding_group %}
+ {% blocktrans %}Findings for Dynamic Finding Group: {{ finding_group }}{% endblocktrans %}
+ {% else %}
+ {% blocktrans %}The finding_group ID {{ finding_group_id }} does not match any known finding_group.{% endblocktrans %}
+ {% endif %}
+
+
+
+
+
+
+
+ {% include "dojo/filter_snippet.html" with form=filtered.form %}
+
+
+ {% if findings %}
+
{% include "dojo/paging_snippet.html" with page=findings page_size=True %}
+ {% if not product_tab or product_tab and product_tab.product|has_object_permission:"Finding_Edit" %}
+
+ {% if not product_tab or product_tab and product_tab.product|has_object_permission:"Finding_Edit" %}
+
+ {% endif %}
+
+ {% if product_tab and not 'DISABLE_FINDING_MERGE'|setting_enabled %}
+
+ {% endif %}
+ {% if not product_tab or product_tab and product_tab.product|has_object_permission:"Finding_Delete" %}
+
+ {% endif %}
+
+
+
{% trans "Choose wisely..." %}
+
+ {% if product_tab %}
+
+
+
+
+ {% endif %}
+
+
+
+
+ {% block header %}
+ {% if not product_tab or product_tab and product_tab.product|has_object_permission:"Finding_Edit" %}
+