diff --git a/docker-compose.override.unit_tests_cicd.yml b/docker-compose.override.unit_tests_cicd.yml index 62f59d13769..e12e3142942 100644 --- a/docker-compose.override.unit_tests_cicd.yml +++ b/docker-compose.override.unit_tests_cicd.yml @@ -15,6 +15,7 @@ services: environment: PYTHONWARNINGS: error # We are strict about Warnings during testing DD_DEBUG: 'True' + DD_TEST: 'True' DD_LOG_LEVEL: 'ERROR' DD_TEST_DATABASE_NAME: ${DD_TEST_DATABASE_NAME:-test_defectdojo} DD_DATABASE_NAME: ${DD_TEST_DATABASE_NAME:-test_defectdojo} diff --git a/dojo/filters.py b/dojo/filters.py index 4b22d11560f..69bde7d0923 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -2043,6 +2043,78 @@ def set_related_object_fields(self): self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View) +class DynamicFindingGroupsFilter(FilterSet): + name = CharFilter(lookup_expr="icontains", label="Name") + severity = ChoiceFilter( + choices=[ + ("Low", "Low"), + ("Medium", "Medium"), + ("High", "High"), + ("Critical", "Critical"), + ], + label="Min Severity", + ) + engagement = ModelMultipleChoiceFilter(queryset=Engagement.objects.none(), label="Engagement") + product = ModelMultipleChoiceFilter(queryset=Product.objects.none(), label="Product") + + class Meta: + model = Finding + fields = ["name", "severity", "engagement", "product"] + + def __init__(self, *args, **kwargs): + self.user = kwargs.pop("user", None) + self.pid = kwargs.pop("pid", None) + super().__init__(*args, **kwargs) + self.set_related_object_fields() + + def set_related_object_fields(self): + if self.pid is not None: + self.form.fields["engagement"].queryset = Engagement.objects.filter(product_id=self.pid) + if "product" in self.form.fields: + del self.form.fields["product"] + else: + self.form.fields["product"].queryset = get_authorized_products(Permissions.Product_View) + self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View) + + +class DynamicFindingGroupsFindingsFilter(FilterSet): + name = CharFilter(lookup_expr="icontains", label="Name") + severity = MultipleChoiceFilter( + choices=[ + ("Low", "Low"), + ("Medium", "Medium"), + ("High", "High"), + ("Critical", "Critical"), + ], + label="Severity", + ) + vuln_id_from_tool = CharFilter(lookup_expr="icontains", label="Vulnerability Id From Tool") + reporter = ModelMultipleChoiceFilter(queryset=Dojo_User.objects.none(), label="Reporter") + active = ChoiceFilter(choices=[("Yes", "Yes"), ("No", "No")], label="Active") + engagement = ModelMultipleChoiceFilter(queryset=Engagement.objects.none(), label="Engagement") + product = ModelMultipleChoiceFilter(queryset=Product.objects.none(), label="Product") + + class Meta: + model = Finding + fields = ["name", "severity", "vuln_id_from_tool", "reporter", "active", "engagement", "product"] + + def __init__(self, *args, **kwargs): + self.user = kwargs.pop("user", None) + self.pid = kwargs.pop("pid", None) + super().__init__(*args, **kwargs) + self.set_related_object_fields() + + def set_related_object_fields(self): + if self.pid is not None: + self.form.fields["engagement"].queryset = Engagement.objects.filter(product_id=self.pid) + if "product" in self.form.fields: + del self.form.fields["product"] + else: + self.form.fields["product"].queryset = get_authorized_products(Permissions.Product_View) + self.form.fields["engagement"].queryset = get_authorized_engagements(Permissions.Engagement_View) + self.form.fields["reporter"].queryset = get_authorized_users(Permissions.Finding_View) + + class AcceptedFindingFilter(FindingFilter): risk_acceptance__created__date = DateRangeFilter(label="Acceptance Date") risk_acceptance__owner = ModelMultipleChoiceFilter( diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index 3f6a597a542..74df95adea8 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -15,6 +15,7 @@ from dojo.decorators import dojo_async_task, dojo_model_from_id, dojo_model_to_id from dojo.endpoint.utils import save_endpoints_to_add from dojo.file_uploads.helper import delete_related_files +from dojo.finding_group.redis import DynamicFindingGroups from dojo.models import ( Endpoint, Endpoint_Status, @@ -416,6 +417,8 @@ def finding_pre_delete(sender, instance, **kwargs): delete_related_notes(instance) delete_related_files(instance) + DynamicFindingGroups.set_last_finding_change() + def finding_delete(instance, **kwargs): logger.debug("finding delete, instance: %s", instance.id) @@ -446,6 +449,8 @@ def finding_delete(instance, **kwargs): logger.debug("finding delete: clearing found by") instance.found_by.clear() + DynamicFindingGroups.set_last_finding_change() + @receiver(post_delete, sender=Finding) def finding_post_delete(sender, instance, **kwargs): diff --git a/dojo/finding_group/README.md b/dojo/finding_group/README.md new file mode 100644 index 00000000000..77c2f3a867b --- /dev/null +++ b/dojo/finding_group/README.md @@ -0,0 +1,73 @@ +# Dynamic Finding Groups +This module manages dynamic grouping of Findings in DefectDojo. Findings can be grouped using different strategies (called GroupModes) such as: +- Grouping by the vuln_id_from_tool +- Grouping by the Finding title +- Grouping by the associated CVE + +The grouping is user-configurable through the UI and relies on Redis for fast storage and retrieval of groups. + +## How it works +When a user selects a grouping mode, the system builds and stores finding groups in Redis. These groups are refreshed automatically whenever new Findings are added or existing ones are modified. + +### Redis is used to: +- Store the mapping between Findings and their groups. +- Store the serialized representation of each DynamicFindingGroups object. +- Manage timestamps that help us detect if the stored groups are outdated. + +### Two global keys are important here: +- finding_groups_last_finding_change: Updated whenever a Finding is created/updated. +- finding_groups_last_update: Stores the last time a specific GroupMode was rebuilt. + +### When we rebuild groups: +Group rebuilding occurs in the following cases: +- The groups are missing in Redis, or +- The timestamps `finding_groups_last_finding_change` and `finding_groups_last_update` do not match. + +In practice, whenever a change occurs, the value of `last_finding_change` becomes more recent than `last_update`. At that point, the groups are rebuilt, and `last_update` is updated to match `last_finding_change`. + +The `last_update` entry stores the timestamp per mode. Whenever a user opens the tab for a given mode, the system compares the timestamps. If they differ, the groups are rebuilt; otherwise, the existing groups are reused. + +## Adding a new GroupMode +To add a new grouping strategy: + +1. Extend the GroupMode enum. Add a new entry, for example: +```python +class GroupMode(StrEnum): + VULN_ID_FROM_TOOL = "vuln_id_from_tool" + TITLE = "title" + CVE = "cve" + CUSTOM_TAG = "custom_tag" # ← New mode +``` + +2. Update `DynamicFindingGroups.get_group_names`. Define how the Finding should be grouped for the new mode: +```python +if mode == GroupMode.CUSTOM_TAG: + return finding.custom_tags.all().values_list("name", flat=True) +``` + +3. Expose the mode in the HTML select box. Edit `finding_groups_dynamic_list_snippet.html` to allow the user to select it: +```html + +``` + +## User selection in the UI +Users must explicitly choose a grouping mode in the UI for this feature to take effect. +The selection is available in `finding_groups_dynamic_list_snippet.html`: +```html +
+ + +
+``` +If no mode is selected, **Redis will not be used** and no dynamic finding groups will be built. + +## Summary +- Redis stores groups and manages synchronization via timestamps. +- Groups are rebuilt only when necessary. +- Adding a new GroupMode requires extending the enum, defining the grouping logic, and updating the HTML select box. +- Users must explicitly select a mode in the UI; otherwise, grouping is disabled. \ No newline at end of file diff --git a/dojo/finding_group/redis.py b/dojo/finding_group/redis.py new file mode 100644 index 00000000000..543a01317e1 --- /dev/null +++ b/dojo/finding_group/redis.py @@ -0,0 +1,226 @@ +import base64 +import json +import logging +import os +from dataclasses import asdict, dataclass, field +from datetime import datetime +from enum import StrEnum +from functools import lru_cache +from typing import Self + +import redis +from django.conf import settings +from django.utils.functional import cached_property + +from dojo.models import Finding + +logger = logging.getLogger(__name__) + +DD_TEST = os.getenv("DD_TEST", "False").lower() == "true" +USER_MODES_KEY = "finding_groups_user_modes" +LAST_FINDING_CHANGE = "finding_groups_last_finding_change" +LAST_FINDING_UPDATE = "finding_groups_last_update" + + +class GroupMode(StrEnum): + VULN_ID_FROM_TOOL = "vuln_id_from_tool" + TITLE = "title" + CVE = "cve" + + +@dataclass +class DynamicFindingGroups: + finding_group_id: str + name: str = "" + severity: str = "Info" + main_finding_id: int | None = None + finding_ids: set[int] = field(default_factory=set) + + def to_dict(self) -> dict: + data = asdict(self) + data["finding_ids"] = list(data["finding_ids"]) + return data + + @staticmethod + def from_dict(data: dict) -> Self: + data["finding_ids"] = set(data.get("finding_ids", [])) + return DynamicFindingGroups(**data) + + @staticmethod + def load_from_id(finding_group_id: str, fg_key: str) -> Self | None: + redis_client = get_redis_client() + finding_group_data = redis_client.hget(fg_key, finding_group_id) + if finding_group_data: + return DynamicFindingGroups.from_dict(json.loads(finding_group_data)) + return None + + def update_sev_sla(self, finding: Finding) -> None: + if Finding.get_number_severity(finding.severity) > Finding.get_number_severity(self.severity): + self.severity = finding.severity + self.main_finding_id = finding.id + + def add(self, finding: Finding) -> None: + self.update_sev_sla(finding) + self.finding_ids.add(finding.id) + + # This method is used when we filter findings in a finding group + def reconfig_finding_group(self) -> None: + self.severity = "Info" + findings = Finding.objects.filter(id__in=self.finding_ids) + for finding in findings: + self.update_sev_sla(finding) + + @staticmethod + def get_group_names(finding: Finding, mode: GroupMode) -> list[str] | None: + if mode == GroupMode.VULN_ID_FROM_TOOL: + if finding.vuln_id_from_tool: + return [finding.vuln_id_from_tool] + if mode == GroupMode.TITLE: + if finding.title: + return [finding.title] + if mode == GroupMode.CVE: + cves = [ + cve for cve in finding.vulnerability_id_set.values_list("vulnerability_id", flat=True) + if cve + ] + if cves: + return cves + return None + + @staticmethod + def get_fg_key(mode: GroupMode) -> str: + return f"finding_groups_{mode.value}" + + @staticmethod + def get_id_map_key(mode: GroupMode) -> str: + return f"finding_groups_id_to_finding_group_{mode.value}" + + @staticmethod + def set_last_finding_change() -> None: + if DD_TEST: + logger.info("Redis is not used in test environment, skipping.") + return + redis_client = get_redis_client() + redis_client.set(LAST_FINDING_CHANGE, datetime.now().isoformat()) + + @staticmethod + def set_last_update(mode: GroupMode, timestamp: datetime | None = None) -> None: + if timestamp is None: + return + redis_client = get_redis_client() + redis_client.hset(LAST_FINDING_UPDATE, mode.value, timestamp.isoformat()) + + @staticmethod + def add_finding(finding: Finding, mode: GroupMode) -> None: + finding_groups = DynamicFindingGroups.get_group_names(finding, mode) + if not finding_groups: + return + redis_client = get_redis_client() + for finding_group_name in finding_groups: + finding_group_id = base64.b64encode(finding_group_name.encode()).decode() + fg_key = DynamicFindingGroups.get_fg_key(mode) + id_map_key = DynamicFindingGroups.get_id_map_key(mode) + + finding_group = DynamicFindingGroups.load_from_id(finding_group_id, fg_key) + if not finding_group: + finding_group = DynamicFindingGroups( + finding_group_id=finding_group_id, + name=finding_group_name, + ) + + if finding.id not in finding_group.finding_ids: + finding_group.add(finding) + + redis_client.hset(fg_key, finding_group_id, json.dumps(finding_group.to_dict())) + group_ids_raw = redis_client.hget(id_map_key, finding.id) + group_ids = json.loads(group_ids_raw) if group_ids_raw else [] + if finding_group_id not in group_ids: + group_ids.append(finding_group_id) + redis_client.hset(id_map_key, finding.id, json.dumps(group_ids)) + + @cached_property + def sla_days_remaining_internal(self): + findings = Finding.objects.filter(id__in=self.finding_ids, active=True) + if not findings: + return None + return min([find.sla_days_remaining() for find in findings if find.sla_days_remaining()], default=None) + + @property + def sla_days_remaining(self) -> int | None: + return self.sla_days_remaining_internal + + +@lru_cache(maxsize=1) +def get_redis_client() -> redis.Redis: + host = getattr(settings, "REDIS_HOST", "redis") + port = getattr(settings, "REDIS_PORT", 6379) + return redis.Redis(host=host, port=port, decode_responses=True) + + +def get_user_mode(user_id: int) -> GroupMode | None: + redis_client = get_redis_client() + value = redis_client.hget(USER_MODES_KEY, str(user_id)) + if value and value not in [m.value for m in GroupMode]: + logger.warning(f"Invalid group mode '{value}' found in Redis for user {user_id}, resetting to None.") + redis_client.hdel(USER_MODES_KEY, str(user_id)) + return None + return GroupMode(value) if value else None + + +def set_user_mode(user_id: int, mode: GroupMode) -> None: + redis_client = get_redis_client() + redis_client.hset(USER_MODES_KEY, str(user_id), mode.value) + logger.info(f"User {user_id} dynamic finding groups mode set to {mode.value}") + + +def load_or_rebuild_finding_groups(mode: GroupMode) -> dict[str, DynamicFindingGroups]: + redis_client = get_redis_client() + fg_key = DynamicFindingGroups.get_fg_key(mode) + id_map_key = DynamicFindingGroups.get_id_map_key(mode) + + if not redis_client.exists(LAST_FINDING_CHANGE): + DynamicFindingGroups.set_last_finding_change() + last_finding_change_raw = redis_client.get(LAST_FINDING_CHANGE) + try: + last_finding_change_time = datetime.fromisoformat(last_finding_change_raw) + except ValueError: + logger.warning(f"Invalid datetime format in Redis for {LAST_FINDING_CHANGE}: {last_finding_change_raw}, resetting last finding change.") + DynamicFindingGroups.set_last_finding_change() + last_finding_change_raw = redis_client.get(LAST_FINDING_CHANGE) + last_finding_change_time = datetime.fromisoformat(last_finding_change_raw) if last_finding_change_raw else None + + try: + last_groups_update_time = redis_client.hget(LAST_FINDING_UPDATE, mode.value) + last_groups_update_time = datetime.fromisoformat(last_groups_update_time) if last_groups_update_time else None + except ValueError: + logger.warning(f"Invalid datetime format in Redis for {LAST_FINDING_UPDATE}: {last_groups_update_time}") + last_groups_update_time = None + + # Check if finding_groups and id_map exist in Redis + # Check if last update is the same as last finding change + # If not, rebuild them + if ( + not redis_client.exists(fg_key) + or not redis_client.exists(id_map_key) + or last_groups_update_time != last_finding_change_time + ): + if not last_finding_change_time: + logger.warning("Last finding change is not set, setting it to now.") + elif last_groups_update_time and last_finding_change_time < last_groups_update_time: + logger.warning("Last finding change is older than last update, they should be equal or last finding change should be newer.") + redis_client.delete(fg_key, id_map_key) + for finding in Finding.objects.all(): + DynamicFindingGroups.add_finding(finding, mode) + DynamicFindingGroups.set_last_update(mode, last_finding_change_time) + + return _load_finding_groups_from_redis(fg_key, redis_client) + + +def _load_finding_groups_from_redis(fg_key: str, redis_client: redis.Redis) -> dict[str, DynamicFindingGroups]: + finding_groups_data = redis_client.hgetall(fg_key) + if finding_groups_data: + return { + key: DynamicFindingGroups.from_dict(json.loads(value)) + for key, value in finding_groups_data.items() + } + return {} diff --git a/dojo/finding_group/urls.py b/dojo/finding_group/urls.py index 938d95ba9e9..b6280cb219e 100644 --- a/dojo/finding_group/urls.py +++ b/dojo/finding_group/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from dojo.finding_group import views +from dojo.finding_group import views, views_dynamic urlpatterns = [ # finding group @@ -9,8 +9,14 @@ re_path(r"^finding_group/(?P\d+)/jira/push$", views.push_to_jira, name="finding_group_push_to_jira"), re_path(r"^finding_group/(?P\d+)/jira/unlink$", views.unlink_jira, name="finding_group_unlink_jira"), - # finding group list views + # static finding group list views re_path(r"^finding_group/all$", views.ListFindingGroups.as_view(), name="all_finding_groups"), re_path(r"^finding_group/open$", views.ListOpenFindingGroups.as_view(), name="open_finding_groups"), re_path(r"^finding_group/closed$", views.ListClosedFindingGroups.as_view(), name="closed_finding_groups"), + + # dynamic finding group list views + re_path(r"^dynamic_finding_group/all$", views_dynamic.ListDynamicFindingGroups.as_view(), name="all_dynamic_finding_groups"), + re_path(r"^dynamic_finding_group/open$", views_dynamic.ListOpenDynamicFindingGroups.as_view(), name="open_dynamic_finding_groups"), + re_path(r"^dynamic_finding_group/closed$", views_dynamic.ListClosedDynamicFindingGroups.as_view(), name="closed_dynamic_finding_groups"), + re_path(r"^dynamic_finding_group/(?P[^/]+)/findings$", views_dynamic.DynamicFindingGroupsFindings.as_view(), name="dynamic_finding_group_findings"), ] diff --git a/dojo/finding_group/views_dynamic.py b/dojo/finding_group/views_dynamic.py new file mode 100644 index 00000000000..00b1775d85c --- /dev/null +++ b/dojo/finding_group/views_dynamic.py @@ -0,0 +1,282 @@ +import logging + +from django.core.paginator import Paginator +from django.http import HttpRequest +from django.shortcuts import render +from django.views import View + +from dojo.authorization.roles_permissions import Permissions +from dojo.filters import DynamicFindingGroupsFilter, DynamicFindingGroupsFindingsFilter +from dojo.finding_group.redis import ( + GroupMode, + get_user_mode, + load_or_rebuild_finding_groups, + set_user_mode, +) +from dojo.forms import FindingBulkUpdateForm +from dojo.models import Finding, Global_Role +from dojo.product.queries import get_authorized_products +from dojo.utils import add_breadcrumb + +logger = logging.getLogger(__name__) + + +def paginate_queryset(queryset, request: HttpRequest): + page_size = request.GET.get("page_size", 25) # Default is 25 + paginator = Paginator(queryset, page_size) + page_number = request.GET.get("page") + return paginator.get_page(page_number) + + +class ListDynamicFindingGroups(View): + filter_name = "All" + + def get_template(self): + return "dojo/finding_groups_dynamic_list.html" + + def order_field(self, request: HttpRequest, finding_groups_findings_list): + order_field = request.GET.get("o") + if order_field: + reverse_order = order_field.startswith("-") + if reverse_order: + order_field = order_field[1:] + if order_field == "name": + finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.name, reverse=reverse_order) + elif order_field == "findings_count": + finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: len(x.finding_ids), reverse=reverse_order) + return finding_groups_findings_list + + def filters(self, request: HttpRequest): + name_filter = request.GET.get("name", "").lower() + min_severity_filter = request.GET.get("severity") + engagement_filter = request.GET.getlist("engagement") + product_filter = request.GET.getlist("product") + return name_filter, min_severity_filter, engagement_filter, product_filter + + def filter_finding_group(self, finding_group, request: HttpRequest): + name_filter, min_severity_filter, engagement_filter, product_filter = self.filters(request) + add_finding_group = True + if product_filter: + finding_group.finding_ids = set(finding_group.finding_ids) & set( + Finding.objects.filter(test__engagement__product__id__in=product_filter).values_list("id", flat=True), + ) + if engagement_filter: + finding_group.finding_ids = set(finding_group.finding_ids) & set( + Finding.objects.filter(test__engagement__id__in=engagement_filter).values_list("id", flat=True), + ) + finding_group.reconfig_finding_group() + if name_filter and name_filter not in finding_group.name.lower(): + add_finding_group = False + if min_severity_filter and Finding.get_number_severity(finding_group.severity) < Finding.get_number_severity(min_severity_filter): + add_finding_group = False + if not finding_group.finding_ids: + add_finding_group = False + return add_finding_group + + def get_findings(self, products): + finding_group_fids = { + fid for finding_group in self.finding_groups_map.values() for fid in finding_group.finding_ids + } + filters = {"id__in": finding_group_fids} + if products: + filters["test__engagement__product__in"] = products + user_findings_qs = Finding.objects.filter(**filters) + user_fids = set(user_findings_qs.values_list("id", flat=True)) + active_fids = set( + user_findings_qs.filter(active=True).values_list("id", flat=True), + ) + return user_fids, active_fids + + def get_finding_groups(self, request: HttpRequest, products=None): + """ + Retrieve all dynamic finding groups for the current user. + + Steps: + 1. Retrieve finding IDs relevant for the user (optionally filtered by products). + 2. Iterate over all finding groups in self.finding_groups_map. + 3. For each group: + - Restrict the group's findings to those the user can see. + - Apply additional filters based on the request. + - No additional filtering for active findings. + 4. Append groups that pass all filters to the result list. + 5. Order the resulting list according to the request via order_field and return. + """ + user_fids, _ = self.get_findings(products) + list_finding_group = [] + for finding_group in self.finding_groups_map.values(): + finding_group.finding_ids = set(finding_group.finding_ids) & user_fids + if self.filter_finding_group(finding_group, request): + list_finding_group.append(finding_group) + return self.order_field(request, list_finding_group) + + def get(self, request: HttpRequest): + global_role = Global_Role.objects.filter(user=request.user).first() + products = get_authorized_products(Permissions.Product_View) + mode_str = request.GET.get("mode", None) + user_id = request.user.id + if mode_str: + try: + mode = GroupMode(mode_str) + set_user_mode(user_id, mode) + except ValueError: + if mode_str is not None: + logger.warning(f"Invalid mode: {mode_str}") + mode = get_user_mode(user_id) + else: + mode = get_user_mode(user_id) + self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {} + if request.user.is_superuser or (global_role and global_role.role): + finding_groups = self.get_finding_groups(request) + elif products.exists(): + finding_groups = self.get_finding_groups(request, products) + paginated_finding_groups = paginate_queryset(finding_groups, request) + + context = { + "filter_name": self.filter_name, + "mode": mode.value if mode else None, + "filtered": DynamicFindingGroupsFilter(request.GET), + "finding_groups": paginated_finding_groups, + } + + add_breadcrumb(title="Dynamic Finding Group", top_level=not len(request.GET), request=request) + return render(request, self.get_template(), context) + + +class ListOpenDynamicFindingGroups(ListDynamicFindingGroups): + filter_name = "Open" + + def get_finding_groups(self, request: HttpRequest, products=None): + """ + Retrieve dynamic finding groups containing at least one active finding. + + Steps: + 1. Retrieve finding IDs relevant for the user and the active subset. + 2. Iterate over all finding groups in self.finding_groups_map. + 3. For each group: + - Restrict the group's findings to those the user can see. + - Apply additional filters based on the request. + - Keep only groups with at least one active finding. + 4. Append groups that pass all filters to the result list. + 5. Order the resulting list according to the request via order_field and return. + """ + user_fids, active_fids = self.get_findings(products) + list_finding_group = [] + for finding_group in self.finding_groups_map.values(): + finding_group.finding_ids = set(finding_group.finding_ids) & user_fids + if self.filter_finding_group(finding_group, request): + if finding_group.finding_ids & active_fids: + list_finding_group.append(finding_group) + return self.order_field(request, list_finding_group) + + +class ListClosedDynamicFindingGroups(ListDynamicFindingGroups): + filter_name = "Closed" + + def get_finding_groups(self, request: HttpRequest, products=None): + """ + Retrieve dynamic finding groups containing no active findings. + + Steps: + 1. Retrieve finding IDs relevant for the user and the active subset. + 2. Iterate over all finding groups in self.finding_groups_map. + 3. For each group: + - Restrict the group's findings to those the user can see. + - Apply additional filters based on the request. + - Keep only groups with no active findings. + 4. Append groups that pass all filters to the result list. + 5. Order the resulting list according to the request via order_field and return. + """ + user_fids, active_fids = self.get_findings(products) + list_finding_group = [] + for finding_group in self.finding_groups_map.values(): + finding_group.finding_ids = set(finding_group.finding_ids) & user_fids + if self.filter_finding_group(finding_group, request): + if not (finding_group.finding_ids & active_fids): + list_finding_group.append(finding_group) + return self.order_field(request, list_finding_group) + + +class DynamicFindingGroupsFindings(View): + def get_template(self): + return "dojo/finding_group_dynamic_findings.html" + + def order_field(self, request: HttpRequest, finding_groups_findings_list): + order_field = request.GET.get("o") + if order_field: + reverse_order = order_field.startswith("-") + if reverse_order: + order_field = order_field[1:] + if order_field == "title": + finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.title, reverse=reverse_order) + elif order_field == "found_by": + finding_groups_findings_list = sorted(finding_groups_findings_list, key=lambda x: x.found_by.count(), reverse=reverse_order) + return finding_groups_findings_list + + def filters(self, request: HttpRequest): + name_filter = request.GET.get("name", "").lower() + severity_filter = request.GET.getlist("severity") + vuln_id_from_tool_filter = request.GET.get("vuln_id_from_tool") + reporter_filter = request.GET.getlist("reporter") + active_filter = request.GET.get("active") + engagement_filter = request.GET.getlist("engagement") + product_filter = request.GET.getlist("product") + return name_filter, severity_filter, vuln_id_from_tool_filter, reporter_filter, active_filter, engagement_filter, product_filter + + def filter_findings(self, findings, request: HttpRequest): + name_filter, severity_filter, vuln_id_from_tool_filter, reporter_filter, active_filter, engagement_filter, product_filter = self.filters(request) + filter_kwargs = {} + if name_filter: + filter_kwargs["title__icontains"] = name_filter + if severity_filter: + filter_kwargs["severity__in"] = severity_filter + if vuln_id_from_tool_filter: + filter_kwargs["vuln_id_from_tool__icontains"] = vuln_id_from_tool_filter + if reporter_filter: + filter_kwargs["reporter__id__in"] = reporter_filter + if active_filter: + filter_kwargs["active"] = (active_filter == "Yes") + if engagement_filter: + filter_kwargs["test__engagement__id__in"] = engagement_filter + if product_filter: + filter_kwargs["test__engagement__product__id__in"] = product_filter + return findings.filter(**filter_kwargs) + + def get_findings(self, request: HttpRequest, products=None): + finding_group = self.finding_groups_map.get(self.finding_group_id) + + # When the finding_group not exists + if not finding_group: + return None, [] + + list_findings = finding_group.finding_ids + if products: + findings = Finding.objects.filter(id__in=list_findings, test__engagement__product__in=products) + else: + findings = Finding.objects.filter(id__in=list_findings) + findings = self.filter_findings(findings, request) + return finding_group.name, self.order_field(request, findings) + + def get(self, request: HttpRequest, finding_group_id: int): + self.finding_group_id = finding_group_id + global_role = Global_Role.objects.filter(user=request.user).first() + products = get_authorized_products(Permissions.Product_View) + mode = get_user_mode(request.user.id) + self.finding_groups_map = load_or_rebuild_finding_groups(mode=mode) if mode else {} + if request.user.is_superuser or (global_role and global_role.role): + finding_group_name, findings = self.get_findings(request) + elif products.exists(): + finding_group_name, findings = self.get_findings(request, products) + else: + finding_group_name = None + paginated_findings = paginate_queryset(findings, request) + + context = { + "finding_group": finding_group_name, + "filtered": DynamicFindingGroupsFindingsFilter(request.GET), + "finding_group_id": self.finding_group_id, + "findings": paginated_findings, + "bulk_edit_form": FindingBulkUpdateForm(request.GET), + } + + add_breadcrumb(title="Dynamic Finding Group Findings", top_level=not len(request.GET), request=request) + return render(request, self.get_template(), context) diff --git a/dojo/models.py b/dojo/models.py index 9d3a238d9ca..1de85cbde37 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -2825,6 +2825,9 @@ def save(self, dedupe_option=True, rules_option=True, product_grading_option=Tru else: logger.debug("no options selected that require finding post processing") + from dojo.finding_group.redis import DynamicFindingGroups + DynamicFindingGroups.set_last_finding_change() + def get_absolute_url(self): from django.urls import reverse return reverse("view_finding", args=[str(self.id)]) @@ -3480,6 +3483,11 @@ class Vulnerability_Id(models.Model): def __str__(self): return self.vulnerability_id + def save(self, *args, **kwargs): + super().save(*args, **kwargs) + from dojo.finding_group.redis import DynamicFindingGroups + DynamicFindingGroups.set_last_finding_change() + def get_absolute_url(self): from django.urls import reverse return reverse("view_finding", args=[str(self.finding.id)]) diff --git a/dojo/static/dojo/css/dojo.css b/dojo/static/dojo/css/dojo.css index deba72474c9..7a102755ed5 100644 --- a/dojo/static/dojo/css/dojo.css +++ b/dojo/static/dojo/css/dojo.css @@ -1911,4 +1911,21 @@ input[type=number]::-webkit-outer-spin-button { color: ButtonText; } } + +.submenu-header { + font-weight: bold; + padding: 5px 15px; + color: #666; + background-color: #f5f5f5; +} + +.submenu-divider { + border-top: 1px solid #ddd; + margin: 5px 0; +} + +.nav-second-level li a:hover { + background-color: #e9ecef; + color: #000; +} \ No newline at end of file diff --git a/dojo/templates/base.html b/dojo/templates/base.html index 007612ce741..1afd72ae92d 100644 --- a/dojo/templates/base.html +++ b/dojo/templates/base.html @@ -339,12 +339,13 @@
  • -
  • diff --git a/dojo/templates/dojo/finding_group_dynamic_findings.html b/dojo/templates/dojo/finding_group_dynamic_findings.html new file mode 100644 index 00000000000..499ace71f60 --- /dev/null +++ b/dojo/templates/dojo/finding_group_dynamic_findings.html @@ -0,0 +1,8 @@ +{% extends "base.html" %} +{% load navigation_tags %} +{% load display_tags %} +{% load static %} +{% block content %} + {% comment %} Findings from Dynamic Finding Group {% endcomment %} + {% include "dojo/finding_group_dynamic_findings_list_snippet.html" %} +{% endblock %} diff --git a/dojo/templates/dojo/finding_group_dynamic_findings_list_snippet.html b/dojo/templates/dojo/finding_group_dynamic_findings_list_snippet.html new file mode 100644 index 00000000000..583585885a3 --- /dev/null +++ b/dojo/templates/dojo/finding_group_dynamic_findings_list_snippet.html @@ -0,0 +1,895 @@ +{% load navigation_tags %} +{% load display_tags %} +{% load authorization_tags %} +{% load get_endpoint_status %} +{% load static %} +{% load i18n %} +{% block finding_groups_findings_list %} +
    +
    +
    +
    +

    + {% if finding_group %} + {% blocktrans %}Findings for Dynamic Finding Group: {{ finding_group }}{% endblocktrans %} + {% else %} + {% blocktrans %}The finding_group ID {{ finding_group_id }} does not match any known finding_group.{% endblocktrans %} + {% endif %} + +

    +
    +
    + {% include "dojo/filter_snippet.html" with form=filtered.form %} +
    +
    + {% if findings %} +
    {% include "dojo/paging_snippet.html" with page=findings page_size=True %}
    + {% if not product_tab or product_tab and product_tab.product|has_object_permission:"Finding_Edit" %} + + {% endif %} +
    + + + + {% block header %} + {% if not product_tab or product_tab and product_tab.product|has_object_permission:"Finding_Edit" %} + + {% endif %} + + + + + + + + + {% endblock %} + + + + {% for finding in findings %} + + {% block body %} + {% if not product_tab or product_tab and product_tab.product|has_object_permission:"Finding_Edit" %} + + {% endif %} + + + + + + + + + {% endblock body %} + + {% endfor %} + +
    + + {% dojo_sort request "Name" "title" %} + {% trans "Severity" %} + {% trans "SLA" %}{% trans "Vulnerability Id From Tool" %}{% trans "Reporter" %}{% dojo_sort request "Found By" "found_by" %}{% trans "Status" %}
    +
    + +
    +
    +
    + +
    +
    + {% if finding.title %} + {{ finding.title }} + {% else %} + {{ finding.id }} + {% endif %} + + + {{ finding.severity }} + + + {{ finding|finding_sla }} + {{ finding.vuln_id_from_tool }} + {% if finding.reporter.get_full_name and finding.reporter.get_full_name.strip %} + {{ finding.reporter.get_full_name }} + {% else %} + {{ finding.reporter }} + {% endif %} + + {% if finding.found_by %} + {{ finding.found_by.all|join:", " }} + {% else %} + {{ finding.test.test_type }} + {% endif %} + + {{ finding|finding_display_status|safe }} {{ finding|import_history }} +
    +
    +
    + {% include "dojo/paging_snippet.html" with page=findings %} +
    + {% else %} +
    +

    + {% trans "No findings found." %} +

    +
    + {% endif %} +
    +
    +{% endblock %} +{% block postscript %} + + + + + {% include "dojo/filter_js_snippet.html" %} + {% include "dojo/snippets/selectpicker_in_dropdown.html" %} +{% endblock %} diff --git a/dojo/templates/dojo/finding_groups_dynamic_list.html b/dojo/templates/dojo/finding_groups_dynamic_list.html new file mode 100644 index 00000000000..80692f24287 --- /dev/null +++ b/dojo/templates/dojo/finding_groups_dynamic_list.html @@ -0,0 +1,8 @@ +{% extends "base.html" %} +{% load navigation_tags %} +{% load display_tags %} +{% load static %} +{% block content %} + {% comment %} All/Open/Closed Dynamic Finding Groups {% endcomment %} + {% include "dojo/finding_groups_dynamic_list_snippet.html" %} +{% endblock %} diff --git a/dojo/templates/dojo/finding_groups_dynamic_list_snippet.html b/dojo/templates/dojo/finding_groups_dynamic_list_snippet.html new file mode 100644 index 00000000000..08c9e992dc2 --- /dev/null +++ b/dojo/templates/dojo/finding_groups_dynamic_list_snippet.html @@ -0,0 +1,210 @@ +{% load navigation_tags %} +{% load display_tags %} +{% load authorization_tags %} +{% load get_endpoint_status %} +{% load static %} +{% load i18n %} +{% block finding_groups_list %} +
    +
    +
    +
    +

    + {% blocktrans %}{{ filter_name }} Finding Groups{% endblocktrans %} + +

    +
    +
    + {% include "dojo/filter_snippet.html" with form=filtered.form %} +
    +
    +
    + + +
    + {% if finding_groups %} +
    {% include "dojo/paging_snippet.html" with page=finding_groups page_size=True %}
    +
    + + + + {% block header %} + + + + + {% endblock %} + + + + {% for finding_group in finding_groups %} + + + + + + + {% endfor %} + +
    {% dojo_sort request "Name" "name" %} + {% trans "Severity" %} + {% trans "SLA" %}{% dojo_sort request "Findings Count" "findings_count" %}
    + + {{ finding_group.name }} + + + + {{ finding_group.severity }} + + + {{ finding_group.sla_days_remaining }} + + {{ finding_group.finding_ids|length }} +
    +
    +
    + {% include "dojo/paging_snippet.html" with page=finding_groups %} +
    + {% else %} +
    +

    + {% trans "No finding groups found." %} +

    +
    + {% endif %} +
    +
    +{% endblock %} +{% block postscript %} + + + + {% include "dojo/filter_js_snippet.html" %} + {% include "dojo/snippets/selectpicker_in_dropdown.html" %} +{% endblock %}