-
Notifications
You must be signed in to change notification settings - Fork 531
feat(exp): convert qualitative risk assessments to quantitative ones #2814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
21b7be0
4fee1f9
9e06815
24f5af6
f2ff724
d1cdb46
6419e05
a08a4db
22e0147
bdf5b39
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1908,6 +1908,304 @@ def sync_to_applied_controls(self, request, pk): | |||||||||||||||||||||||||
| {"changes": RiskScenarioReadSerializer(changes, many=True).data} | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| @action( | ||||||||||||||||||||||||||
| detail=True, | ||||||||||||||||||||||||||
| methods=["post"], | ||||||||||||||||||||||||||
| url_path="convert_to_quantitative", | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| def convert_to_quantitative(self, request, pk): | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| Convert a qualitative risk assessment to a quantitative risk study. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Expected payload: | ||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| "probability_anchors": [{"index": 0, "value": 0.05}, ...], | ||||||||||||||||||||||||||
| "impact_anchors": [{"index": 0, "central_value": 25000}, ...], | ||||||||||||||||||||||||||
| "loss_threshold": 100000 | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| from crq.models import ( | ||||||||||||||||||||||||||
| QuantitativeRiskStudy, | ||||||||||||||||||||||||||
| QuantitativeRiskScenario, | ||||||||||||||||||||||||||
| QuantitativeRiskHypothesis, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| risk_assessment: RiskAssessment = self.get_object() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Check permissions | ||||||||||||||||||||||||||
| if not RoleAssignment.is_access_allowed( | ||||||||||||||||||||||||||
| user=request.user, | ||||||||||||||||||||||||||
| perm=Permission.objects.get(codename="add_quantitativeriskstudy"), | ||||||||||||||||||||||||||
| folder=risk_assessment.folder, | ||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| "detail": "You do not have permission to create quantitative risk studies in this domain." | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| status=status.HTTP_403_FORBIDDEN, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Validate payload | ||||||||||||||||||||||||||
| probability_anchors = request.data.get("probability_anchors", []) | ||||||||||||||||||||||||||
| impact_anchors = request.data.get("impact_anchors", []) | ||||||||||||||||||||||||||
| loss_threshold = request.data.get("loss_threshold") | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if not probability_anchors or not impact_anchors or loss_threshold is None: | ||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| "detail": "Missing required fields: probability_anchors, impact_anchors, and loss_threshold" | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| status=status.HTTP_400_BAD_REQUEST, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Validate loss_threshold is a positive number | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| loss_threshold = float(loss_threshold) | ||||||||||||||||||||||||||
| if loss_threshold <= 0: | ||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| {"detail": "loss_threshold must be greater than 0"}, | ||||||||||||||||||||||||||
| status=status.HTTP_400_BAD_REQUEST, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| except (TypeError, ValueError): | ||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| {"detail": "loss_threshold must be a valid number"}, | ||||||||||||||||||||||||||
| status=status.HTTP_400_BAD_REQUEST, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Validate and build probability mapping | ||||||||||||||||||||||||||
| probability_map = {} | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| for item in probability_anchors: | ||||||||||||||||||||||||||
| idx = item["index"] | ||||||||||||||||||||||||||
| value = float(item["value"]) | ||||||||||||||||||||||||||
| if not (0 <= value <= 1): | ||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| "detail": f"Probability value must be between 0 and 1, got {value} for index {idx}" | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| status=status.HTTP_400_BAD_REQUEST, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| probability_map[idx] = value | ||||||||||||||||||||||||||
| except (KeyError, TypeError, ValueError) as e: | ||||||||||||||||||||||||||
| logger.warning(f"Invalid probability anchor format: {e}") | ||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| "detail": "Invalid probability anchor format. Each anchor must have 'index' and 'value' fields, with value between 0 and 1." | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| status=status.HTTP_400_BAD_REQUEST, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Validate and build impact mapping | ||||||||||||||||||||||||||
| # Using a narrow fixed ratio: roughly ±30% around central value | ||||||||||||||||||||||||||
| # LB = central_value * 0.7, UB = central_value * 1.43 | ||||||||||||||||||||||||||
| # This gives a spread factor of ~2x which minimizes overlap between adjacent levels | ||||||||||||||||||||||||||
| impact_map = {} | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| for item in impact_anchors: | ||||||||||||||||||||||||||
| idx = item["index"] | ||||||||||||||||||||||||||
| central_value = float(item["central_value"]) | ||||||||||||||||||||||||||
| if central_value <= 0: | ||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| "detail": f"Impact central value must be positive, got {central_value} for index {idx}" | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| status=status.HTTP_400_BAD_REQUEST, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| impact_map[idx] = { | ||||||||||||||||||||||||||
| "lb": central_value * 0.7, | ||||||||||||||||||||||||||
| "ub": central_value * 1.43, | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| except (KeyError, TypeError, ValueError) as e: | ||||||||||||||||||||||||||
| logger.warning(f"Invalid impact anchor format: {e}") | ||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| "detail": "Invalid impact anchor format. Each anchor must have 'index' and 'central_value' fields, with central_value greater than 0." | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| status=status.HTTP_400_BAD_REQUEST, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Generate ref_id for the quantitative study | ||||||||||||||||||||||||||
| # If source has ref_id, append _QUANT, otherwise leave blank | ||||||||||||||||||||||||||
| quant_ref_id = ( | ||||||||||||||||||||||||||
| f"{risk_assessment.ref_id}_QUANT" if risk_assessment.ref_id else "" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Create the quantitative risk study with [QUANT] prefix | ||||||||||||||||||||||||||
| quantitative_study = QuantitativeRiskStudy.objects.create( | ||||||||||||||||||||||||||
| name=f"[QUANT] {risk_assessment.name}", | ||||||||||||||||||||||||||
| description=f"Converted from qualitative risk assessment: {risk_assessment.name}\n\n{risk_assessment.description or ''}", | ||||||||||||||||||||||||||
| folder=risk_assessment.folder, | ||||||||||||||||||||||||||
| ref_id=quant_ref_id, | ||||||||||||||||||||||||||
| status=risk_assessment.status if risk_assessment.status else "planned", | ||||||||||||||||||||||||||
| loss_threshold=loss_threshold, | ||||||||||||||||||||||||||
| distribution_model="lognormal_ci90", | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Copy authors and reviewers | ||||||||||||||||||||||||||
| quantitative_study.authors.set(risk_assessment.authors.all()) | ||||||||||||||||||||||||||
| quantitative_study.reviewers.set(risk_assessment.reviewers.all()) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| scenarios_converted = 0 | ||||||||||||||||||||||||||
| scenarios_skipped = 0 | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Convert each risk scenario to a quantitative risk scenario | ||||||||||||||||||||||||||
| for scenario in risk_assessment.risk_scenarios.all(): | ||||||||||||||||||||||||||
| # Skip scenarios without threats or assets | ||||||||||||||||||||||||||
| if not scenario.threats.exists() and not scenario.assets.exists(): | ||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||
| f"Skipping scenario '{scenario.name}' - no threats or assets associated" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| scenarios_skipped += 1 | ||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||
|
Comment on lines
+2054
to
+2059
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: skip condition mismatches comment/intent. Comment says “without threats or assets”; using “and” only skips when both are missing. Use “or” to skip when either side is missing. Apply this diff: - # Skip scenarios without threats or assets
- if not scenario.threats.exists() and not scenario.assets.exists():
+ # Skip scenarios without threats or assets
+ if not scenario.threats.exists() or not scenario.assets.exists():📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Check if we have current probability and impact | ||||||||||||||||||||||||||
| current_proba = scenario.current_proba | ||||||||||||||||||||||||||
| current_impact = scenario.current_impact | ||||||||||||||||||||||||||
| has_current = ( | ||||||||||||||||||||||||||
| current_proba is not None | ||||||||||||||||||||||||||
| and current_impact is not None | ||||||||||||||||||||||||||
| and current_proba in probability_map | ||||||||||||||||||||||||||
| and current_impact in impact_map | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Check if we have residual probability and impact | ||||||||||||||||||||||||||
| residual_proba = scenario.residual_proba | ||||||||||||||||||||||||||
| residual_impact = scenario.residual_impact | ||||||||||||||||||||||||||
| has_residual = ( | ||||||||||||||||||||||||||
| residual_proba is not None | ||||||||||||||||||||||||||
| and residual_impact is not None | ||||||||||||||||||||||||||
| and residual_proba in probability_map | ||||||||||||||||||||||||||
| and residual_impact in impact_map | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Skip if we don't have at least current values | ||||||||||||||||||||||||||
| if not has_current and not has_residual: | ||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||
| f"Skipping scenario '{scenario.name}' - no valid probability/impact values" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| scenarios_skipped += 1 | ||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Create quantitative risk scenario with same name and ref_id | ||||||||||||||||||||||||||
| quant_scenario = QuantitativeRiskScenario.objects.create( | ||||||||||||||||||||||||||
| quantitative_risk_study=quantitative_study, | ||||||||||||||||||||||||||
| name=scenario.name, | ||||||||||||||||||||||||||
| description=scenario.description or "", | ||||||||||||||||||||||||||
| ref_id=scenario.ref_id or "", | ||||||||||||||||||||||||||
| folder=risk_assessment.folder, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Copy threats, assets, and owners | ||||||||||||||||||||||||||
| quant_scenario.threats.set(scenario.threats.all()) | ||||||||||||||||||||||||||
| quant_scenario.assets.set(scenario.assets.all()) | ||||||||||||||||||||||||||
| quant_scenario.vulnerabilities.set(scenario.vulnerabilities.all()) | ||||||||||||||||||||||||||
| quant_scenario.owner.set(scenario.owner.all()) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Delete the auto-created "baseline" current hypothesis | ||||||||||||||||||||||||||
| # (we'll create our own if needed) | ||||||||||||||||||||||||||
| QuantitativeRiskHypothesis.objects.filter( | ||||||||||||||||||||||||||
| quantitative_risk_scenario=quant_scenario, risk_stage="current" | ||||||||||||||||||||||||||
| ).delete() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Create current hypothesis if current values are set | ||||||||||||||||||||||||||
| if has_current: | ||||||||||||||||||||||||||
| probability_value = probability_map[current_proba] | ||||||||||||||||||||||||||
| impact_value = impact_map[current_impact] | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| current_hypothesis = QuantitativeRiskHypothesis.objects.create( | ||||||||||||||||||||||||||
| quantitative_risk_scenario=quant_scenario, | ||||||||||||||||||||||||||
| name="current", | ||||||||||||||||||||||||||
| risk_stage="current", | ||||||||||||||||||||||||||
| ref_id=scenario.ref_id or "", | ||||||||||||||||||||||||||
| folder=risk_assessment.folder, | ||||||||||||||||||||||||||
| parameters={ | ||||||||||||||||||||||||||
| "probability": probability_value, | ||||||||||||||||||||||||||
| "impact": { | ||||||||||||||||||||||||||
| "distribution": "LOGNORMAL-CI90", | ||||||||||||||||||||||||||
| "lb": impact_value["lb"], | ||||||||||||||||||||||||||
| "ub": impact_value["ub"], | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
Comment on lines
+2115
to
+2129
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. QuantitativeRiskHypothesis likely has no “name” field — creation will crash. Model summary lists fields; name is absent. Passing name=... will raise TypeError. Drop it. Apply this diff: - current_hypothesis = QuantitativeRiskHypothesis.objects.create(
+ current_hypothesis = QuantitativeRiskHypothesis.objects.create(
quantitative_risk_scenario=quant_scenario,
- name="current",
risk_stage="current",
ref_id=scenario.ref_id or "",
folder=risk_assessment.folder,
parameters={
"probability": probability_value,
"impact": {
"distribution": "LOGNORMAL-CI90",
"lb": impact_value["lb"],
"ub": impact_value["ub"],
},
},
)- residual_hypothesis = QuantitativeRiskHypothesis.objects.create(
+ residual_hypothesis = QuantitativeRiskHypothesis.objects.create(
quantitative_risk_scenario=quant_scenario,
- name="residual",
risk_stage="residual",
is_selected=True,
ref_id=f"{scenario.ref_id}_RES" if scenario.ref_id else "",
folder=risk_assessment.folder,
parameters={
"probability": residual_probability_value,
"impact": {
"distribution": "LOGNORMAL-CI90",
"lb": residual_impact_value["lb"],
"ub": residual_impact_value["ub"],
},
},
)Also applies to: 2148-2163 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Link to existing applied controls | ||||||||||||||||||||||||||
| if scenario.existing_applied_controls.exists(): | ||||||||||||||||||||||||||
| current_hypothesis.existing_applied_controls.set( | ||||||||||||||||||||||||||
| scenario.existing_applied_controls.all() | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| current_hypothesis.save() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Run simulation for the current hypothesis | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| current_hypothesis.run_simulation() | ||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||
| f"Simulation completed for current hypothesis: {scenario.name}" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||
| logger.error( | ||||||||||||||||||||||||||
| f"Failed to run simulation for scenario {scenario.name}: {str(e)}" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Create residual hypothesis if residual values are set | ||||||||||||||||||||||||||
| if has_residual: | ||||||||||||||||||||||||||
| residual_probability_value = probability_map[residual_proba] | ||||||||||||||||||||||||||
| residual_impact_value = impact_map[residual_impact] | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| residual_hypothesis = QuantitativeRiskHypothesis.objects.create( | ||||||||||||||||||||||||||
| quantitative_risk_scenario=quant_scenario, | ||||||||||||||||||||||||||
| name="residual", | ||||||||||||||||||||||||||
| risk_stage="residual", | ||||||||||||||||||||||||||
| is_selected=True, | ||||||||||||||||||||||||||
| ref_id=f"{scenario.ref_id}_RES" if scenario.ref_id else "", | ||||||||||||||||||||||||||
| folder=risk_assessment.folder, | ||||||||||||||||||||||||||
| parameters={ | ||||||||||||||||||||||||||
| "probability": residual_probability_value, | ||||||||||||||||||||||||||
| "impact": { | ||||||||||||||||||||||||||
| "distribution": "LOGNORMAL-CI90", | ||||||||||||||||||||||||||
| "lb": residual_impact_value["lb"], | ||||||||||||||||||||||||||
| "ub": residual_impact_value["ub"], | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Link to existing controls and added controls | ||||||||||||||||||||||||||
| if scenario.existing_applied_controls.exists(): | ||||||||||||||||||||||||||
| residual_hypothesis.existing_applied_controls.set( | ||||||||||||||||||||||||||
| scenario.existing_applied_controls.all() | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| if scenario.applied_controls.exists(): | ||||||||||||||||||||||||||
| residual_hypothesis.added_applied_controls.set( | ||||||||||||||||||||||||||
| scenario.applied_controls.all() | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| residual_hypothesis.save() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| # Run simulation for the residual hypothesis | ||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| residual_hypothesis.run_simulation() | ||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||
| f"Simulation completed for residual hypothesis: {scenario.name}" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||
| logger.error( | ||||||||||||||||||||||||||
| f"Failed to run simulation for residual scenario {scenario.name}: {str(e)}" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| scenarios_converted += 1 | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||
| f"Conversion completed: {scenarios_converted} scenarios converted, {scenarios_skipped} skipped" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| return Response( | ||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| "message": "Conversion successful", | ||||||||||||||||||||||||||
| "quantitative_risk_study_id": str(quantitative_study.id), | ||||||||||||||||||||||||||
| "scenarios_converted": scenarios_converted, | ||||||||||||||||||||||||||
| "scenarios_skipped": scenarios_skipped, | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| status=status.HTTP_201_CREATED, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def convert_date_to_timestamp(date): | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coerce anchor index to int to avoid silent skips (key type mismatch).
idx is taken as-is from JSON; if frontend sends "0" (string), membership checks like current_proba in probability_map will fail. Cast index to int when building both maps.
Also applies to: 2003-2018
🤖 Prompt for AI Agents