-
Notifications
You must be signed in to change notification settings - Fork 468
feat: add custom qualifications #2446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
3bdf22e
085ea7b
05f1c0c
12f8eeb
ffb5647
21e778f
41abf75
4b8d574
0338930
5fb360a
54f2337
1d30d82
020e35b
4ccdbb6
69c498b
d46f0de
fe38f72
37e5490
27a50d0
1bedf83
79d1d49
c32e231
e6d5ab4
530b0f7
028e4cb
96edf33
69a335e
4ae0c28
78cc138
20c167f
30a974d
6a05f75
a88d51e
a70e1ab
5c4a46c
4cf66c5
3bf4cb6
673bafc
5b93e9c
2f5bec4
0c0ce56
97d693b
34403fe
e40b2e4
28d6a10
0ae96a7
7c824ea
c7dee95
3033b1c
28c41bf
5c565b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# Generated by Django 5.1.10 on 2025-08-25 21:14 | ||
|
||
import django.db.models.deletion | ||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
dependencies = [ | ||
("core", "0091_appliedcontrol_objectives_and_more"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name="qualification", | ||
name="library", | ||
field=models.ForeignKey( | ||
blank=True, | ||
null=True, | ||
on_delete=django.db.models.deletion.CASCADE, | ||
related_name="qualifications", | ||
to="core.loadedlibrary", | ||
), | ||
), | ||
migrations.AlterField( | ||
model_name="qualification", | ||
name="is_published", | ||
field=models.BooleanField(default=True, verbose_name="published"), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
RiskMatrix, | ||
ReferenceControl, | ||
Threat, | ||
Qualification, | ||
) | ||
from django.db import transaction | ||
from iam.models import Folder | ||
|
@@ -341,7 +342,6 @@ class ThreatImporter: | |
|
||
def __init__(self, threat_data: dict): | ||
self.threat_data = threat_data | ||
self._object = None | ||
|
||
def is_valid(self) -> Union[str, None]: | ||
if missing_fields := self.REQUIRED_FIELDS - set(self.threat_data.keys()): | ||
|
@@ -411,6 +411,38 @@ def import_reference_control(self, library_object: LoadedLibrary): | |
) | ||
|
||
|
||
class QualificationImporter: | ||
REQUIRED_FIELDS = {"ref_id", "urn", "name"} | ||
|
||
def __init__(self, qualification_data: dict): | ||
self.qualification_data = qualification_data | ||
|
||
def is_valid(self) -> Union[str, None]: | ||
if missing_fields := self.REQUIRED_FIELDS - set(self.qualification_data.keys()): | ||
return "Missing the following fields : {}".format(", ".join(missing_fields)) | ||
|
||
def import_qualification(self, library_object: LoadedLibrary): | ||
Qualification.objects.create( | ||
library=library_object, | ||
urn=self.qualification_data["urn"].lower(), | ||
ref_id=self.qualification_data["ref_id"], | ||
name=self.qualification_data["name"], | ||
description=self.qualification_data.get("description"), | ||
abbreviation=self.qualification_data.get("abbreviation"), | ||
qualification_ordering=self.qualification_data.get( | ||
"qualification_ordering", 0 | ||
), | ||
security_objective_ordering=self.qualification_data.get( | ||
"security_objective_ordering", 0 | ||
), | ||
provider=library_object.provider, | ||
is_published=True, | ||
locale=library_object.locale, | ||
translations=self.qualification_data.get("translations", {}), | ||
default_locale=library_object.default_locale, # Change this in the future ? | ||
) | ||
|
||
Comment on lines
+410
to
+440
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainMake qualification import idempotent and add basic field validation.
class QualificationImporter:
@@
def import_qualification(self, library_object: LoadedLibrary):
- Qualification.objects.create(
- library=library_object,
- urn=self.qualification_data["urn"].lower(),
- ref_id=self.qualification_data["ref_id"],
- name=self.qualification_data["name"],
- description=self.qualification_data.get("description"),
- abbreviation=self.qualification_data.get("abbreviation"),
- qualification_ordering=self.qualification_data.get(
- "qualification_ordering", 0
- ),
- security_objective_ordering=self.qualification_data.get(
- "security_objective_ordering", 0
- ),
- provider=library_object.provider,
- is_published=True,
- locale=library_object.locale,
- translations=self.qualification_data.get("translations", {}),
- default_locale=library_object.default_locale, # Change this in the future ?
- )
+ abbr = self.qualification_data.get("abbreviation")
+ if abbr is not None and len(abbr) > 20:
+ raise ValueError("abbreviation must be <= 20 characters")
+
+ Qualification.objects.update_or_create(
+ urn=self.qualification_data["urn"].lower(),
+ locale=library_object.locale,
+ defaults=dict(
+ library=library_object,
+ ref_id=self.qualification_data["ref_id"],
+ name=self.qualification_data["name"],
+ description=self.qualification_data.get("description"),
+ abbreviation=abbr,
+ qualification_ordering=self.qualification_data.get(
+ "qualification_ordering", 0
+ ),
+ security_objective_ordering=self.qualification_data.get(
+ "security_objective_ordering", 0
+ ),
+ provider=library_object.provider,
+ is_published=True,
+ translations=self.qualification_data.get("translations", {}),
+ default_locale=library_object.default_locale, # Change this in the future ?
+ ),
+ ) If other importers intentionally create fresh rows (non-idempotent) and rely on a cleanup step, keep consistency. Do you want me to align Threat/ReferenceControl importers to the same idempotent pattern? 🏁 Script executed: #!/bin/bash
# Inspect Qualification model definition
rg -n "class Qualification" -C3 backend/library/models.py
# List all Importer classes in utils
rg -n "class .*Importer" -C3 backend/library/utils.py
# Inspect import methods implementation in utils
rg -n -C3 "def import_" backend/library/utils.py Length of output: 4966 🏁 Script executed: #!/bin/bash
# Locate Qualification model definition
rg -n "class Qualification" -C3 .
# Find abbreviation field definitions across models
rg -n "abbreviation" -C3 . Length of output: 133386 QualificationImporter: Use
Also, ThreatImporter and ReferenceControlImporter still use |
||
|
||
# The couple (URN, locale) is unique. ===> Check this in the future | ||
class RiskMatrixImporter: | ||
REQUIRED_FIELDS = {"ref_id", "urn", "json_definition"} | ||
|
@@ -463,6 +495,7 @@ class LibraryImporter: | |
OBJECT_FIELDS = [ | ||
"threats", | ||
"reference_controls", | ||
"qualifications", | ||
"risk_matrix", # This field name is deprecated | ||
"risk_matrices", | ||
"framework", # This field name is deprecated | ||
|
@@ -480,6 +513,7 @@ def __init__(self, library: StoredLibrary): | |
self._library = library | ||
self._frameworks = [] | ||
self._threats = [] | ||
self._qualifications = [] | ||
self._reference_controls = [] | ||
self._risk_matrices = [] | ||
self._requirement_mapping_sets = [] | ||
|
@@ -538,6 +572,28 @@ def init_reference_controls( | |
invalid_reference_control_error, | ||
) | ||
|
||
def init_qualifications(self, qualifications: List[dict]) -> Union[str, None]: | ||
qualification_importers = [] | ||
import_errors = [] | ||
for index, qualification_data in enumerate(qualifications): | ||
qualification_importer = QualificationImporter(qualification_data) | ||
qualification_importers.append(qualification_importer) | ||
if (qualification_error := qualification_importer.is_valid()) is not None: | ||
import_errors.append((index, qualification_error)) | ||
|
||
self._qualifications = qualification_importers | ||
|
||
if import_errors: | ||
# We will have to think about error message internationalization later | ||
invalid_qualification_index, invalid_qualification_error = import_errors[0] | ||
return "[QUALIFICATION_ERROR] {} invalid qualification{} detected, the {}{} qualification has the following error : {}".format( | ||
len(import_errors), | ||
"s" if len(import_errors) > 1 else "", | ||
invalid_qualification_index + 1, | ||
{1: "st", 2: "nd", 3: "rd"}.get(invalid_qualification_index + 1, "th"), | ||
invalid_qualification_error, | ||
) | ||
|
||
def init_risk_matrices(self, risk_matrices: List[dict]) -> Union[str, None]: | ||
risk_matrix_importers = [] | ||
import_errors = [] | ||
|
@@ -676,6 +732,16 @@ def init(self) -> Union[str, None]: | |
logger.error("Threat import error", error=threat_import_error) | ||
return threat_import_error | ||
|
||
if "qualifications" in library_objects: | ||
qualification_data = library_objects["qualifications"] | ||
if ( | ||
qualification_import_error := self.init_qualifications( | ||
qualification_data | ||
) | ||
) is not None: | ||
logger.error("Threat import error", error=qualification_import_error) | ||
return qualification_import_error | ||
|
||
if "risk_matrix" in library_objects and "risk_matrices" in library_objects: | ||
return "A library can't have both 'risk_matrix' and 'risk_matrices' objects fields." | ||
|
||
|
@@ -767,6 +833,9 @@ def import_objects(self, library_object: LoadedLibrary): | |
for reference_control in self._reference_controls: | ||
reference_control.import_reference_control(library_object) | ||
|
||
for qualification in self._qualifications: | ||
qualification.import_qualification(library_object) | ||
|
||
for risk_matrix in self._risk_matrices: | ||
risk_matrix.import_risk_matrix(library_object) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Generated by Django 5.1.10 on 2025-08-25 21:14 | ||
|
||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
dependencies = [ | ||
("core", "0092_qualification_library_and_more"), | ||
("resilience", "0002_businessimpactanalysis_is_locked"), | ||
] | ||
|
||
operations = [ | ||
migrations.AlterField( | ||
model_name="escalationthreshold", | ||
name="qualifications", | ||
field=models.ManyToManyField( | ||
blank=True, | ||
related_name="escalation_thresholds", | ||
to="core.qualification", | ||
), | ||
), | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reference_count may over-count due to M2M duplicates — use distinct().
Counting Incidents/FearedEvents/EscalationThresholds that reference any Qualification from this library via M2M can produce duplicate rows in the join. Align with the earlier distinct() usage in this method.
Apply this diff:
📝 Committable suggestion
🤖 Prompt for AI Agents