Skip to content
Open
35 changes: 35 additions & 0 deletions application/tests/web_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,46 @@ def test_smartlink(self) -> None:
self.maxDiff = None
collection = db.Node_collection().with_graph()
with self.app.test_client() as client:
# Case 1: No CREs connected
response = client.get(
"/smartlink/standard/foo/611",
headers={"Content-Type": "application/json"},
)
self.assertEqual(404, response.status_code)

# Case 2: Single CRE connected
# For testting , creating a new CRE and standard
cres = {
"cp": defs.CRE(id="111-545", description="CA", name="CP",tags=["tp"]),
}
standards={
"cwe1": defs.Standard(name="CWE1", sectionID="450"),
}
cres["cp"].add_link(
defs.Link(
ltype=defs.LinkTypes.LinkedTo, document=standards["cwe1"]
)
)
dcp=collection.add_cre(cres["cp"])
dcwe1=collection.add_node(standards["cwe1"])
collection.add_link(dcp, dcwe1, ltype=defs.LinkTypes.LinkedTo)


# Test the /smartlink route for single CRE connected
response = client.get(
"/smartlink/standard/CWE1/450",
headers={"Content-Type": "application/json"},
)
location = ""
for head in response.headers:
if head[0] == "Location":
location = head[1]

# Assert the redirection to the linked CRE
self.assertEqual(location, "/cre/111-545")
self.assertEqual(302, response.status_code)

# Case 3: Multi CRE connected
cres = {
"ca": defs.CRE(id="111-111", description="CA", name="CA", tags=["ta"]),
"cd": defs.CRE(id="222-222", description="CD", name="CD", tags=["td"]),
Expand Down Expand Up @@ -559,6 +593,7 @@ def test_smartlink(self) -> None:
self.assertEqual(302, response.status_code)

# negative test, this cwe does not exist, therefore we redirect to Mitre!
# Case 4: Redirect to external resource
response = client.get(
"/smartlink/standard/CWE/999",
headers={"Content-Type": "application/json"},
Expand Down
133 changes: 123 additions & 10 deletions application/web/web_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from alive_progress import alive_bar
from typing import Any
from application.utils import oscal_utils, redis

import networkx as nx
from typing import Dict, List, Tuple
from application.database.inmemory_graph import CRE_Graph
from rq import job, exceptions

from application.utils import spreadsheet_parsers
Expand Down Expand Up @@ -43,7 +45,7 @@
from application.utils.spreadsheet import write_csv
import oauthlib
import google.auth.transport.requests


ITEMS_PER_PAGE = 20

Expand All @@ -57,7 +59,7 @@

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)


class SupportedFormats(Enum):
Expand Down Expand Up @@ -105,6 +107,46 @@ def neo4j_not_running_rejection():
)


#Defining a function to fetch all links
def fetch_all_links(graph: Any, node_id: str) -> Dict[str, List[Tuple[str, str, Dict]]]:
"""
Fetch all links (incoming and outgoing) for a given node.

Args:
graph (Any): The in-memory graph (CRE_Graph or networkx.DiGraph).
node_id (str): The ID of the node.

Returns:
Dict[str, List[Tuple[str, str, Dict]]]: A dictionary with incoming and outgoing links.
"""
# Access the underlying networkx graph
if isinstance(graph, CRE_Graph):
nx_graph = graph.get_raw_graph()
else:
nx_graph = graph

# Normalize the node_id to match the format in the graph
if not node_id.startswith("Node: "):
node_id = f"Node: {node_id}"

logger.debug(f"Normalized node ID: {node_id}")

logger.debug(f"All nodes in the graph: {list(nx_graph.nodes)}")

if not nx_graph.has_node(node_id):
raise ValueError(f"Node {node_id} does not exist in the graph.")

# Fetch incoming links
incoming_links = list(nx_graph.in_edges(node_id, data=True))

# Fetch outgoing links
outgoing_links = list(nx_graph.out_edges(node_id, data=True))

return {
"incoming": incoming_links,
"outgoing": outgoing_links,
}

@app.route("/rest/v1/id/<creid>", methods=["GET"])
@app.route("/rest/v1/name/<crename>", methods=["GET"])
def find_cre(creid: str = None, crename: str = None) -> Any: # refer
Expand Down Expand Up @@ -457,7 +499,9 @@ def smartlink(
if posthog:
posthog.capture(f"smartlink", f"name:{name}")

database = db.Node_collection()
# database = db.Node_collection()
database = db.Node_collection().with_graph()
graph = database.graph
opt_version = request.args.get("version")
# match ntype to the credoctypes case-insensitive
typ = [t.value for t in defs.Credoctypes if t.value.lower() == ntype.lower()]
Expand All @@ -466,9 +510,10 @@ def smartlink(
page = 1
items_per_page = 1
found_section_id = False
logger.debug(f"Fetching nodes with name: {name}, sectionID: {section}")
_, nodes, _ = database.get_nodes_with_pagination(
name=name,
section=section,
sectionID=section,
page=int(page),
items_per_page=int(items_per_page),
version=opt_version,
Expand All @@ -485,25 +530,93 @@ def smartlink(
ntype=doctype,
)
found_section_id = True
if nodes and len(nodes[0].links):

logger.debug(f"Nodes fetched: {nodes}")

# # If only one node is connected, redirect directly to its CRE page
# if nodes and len(nodes) == 1:
# node = nodes[0]
# logger.debug(f"Checking links for node {node.id}: {node.links}")
# if len(node.links) == 1:
# linked_cre = node.links[0].document
# logger.debug(f"Redirecting to linked CRE: {linked_cre.id}")
# return redirect(f"/cre/{linked_cre.id}")
# logger.debug(f"No linked CREs found for node {node.id}, using fallback redirection.")
# return redirect(f"/cre/{node.name}:{node.section or ''}")

# # If multiple nodes are connected, redirect to the generic node page
# if nodes and len(nodes[0].links) > 1:
# logger.info(
# f"Found node of type {ntype}, name {name}, and section {section}, redirecting to generic node page."
# )
# if found_section_id:
# return redirect(f"/node/{ntype}/{name}/sectionid/{section}")
# return redirect(f"/node/{ntype}/{name}/section/{section}")

# Preprocess to aggregate all linked CREs (direct and internal links)


# all_linked_cres = []
# if nodes:
# node = nodes[0]
# logger.debug(f"Checking links for node {node.id}: {node.links}")
# all_linked_cres.extend([link.document for link in node.links])
# for link in node.links:
# if hasattr(link.document, "links"):
# for internal_link in link.document.links:
# if internal_link.document not in all_linked_cres:
# all_linked_cres.append(internal_link.document)
all_linked_cres = []
if nodes:
node = nodes[0]
logger.debug(f"Checking links for node {node.id}: {node.links}")

# Fetch all links (incoming and outgoing) for the node
all_links = fetch_all_links(graph, node.id)
total_links = len(all_links["incoming"]) + len(all_links["outgoing"])
logger.debug(f"Total links for node {node.id}: {total_links}")

# Preprocess to aggregate all linked CREs (direct and internal links)

all_linked_cres.extend([link.document for link in node.links])
for link in node.links:
if hasattr(link.document, "links"):
for internal_link in link.document.links:
if internal_link.document not in all_linked_cres:
all_linked_cres.append(internal_link.document)
# Remove duplicates

all_linked_cres = list({cre.id: cre for cre in all_linked_cres}.values())
logger.debug(f"All linked CREs for node {node.id}: {[cre.id for cre in all_linked_cres]}")

# If only one linked CRE, redirect directly to its CRE page
if len(all_linked_cres) == 1:
linked_cre = all_linked_cres[0]
logger.debug(f"Redirecting to linked CRE: {linked_cre.id}")
return redirect(f"/cre/{linked_cre.id}")

# If multiple linked CREs, redirect to the generic node page
if len(all_linked_cres) > 1:
logger.info(
f"found node of type {ntype}, name {name} and section {section}, redirecting to opencre"
f"Found multiple linked CREs for node {node.id}, redirecting to generic node page."
)
if found_section_id:
return redirect(f"/node/{ntype}/{name}/sectionid/{section}")
return redirect(f"/node/{ntype}/{name}/section/{section}")


# If no nodes are found, attempt to redirect to an external resource
elif doctype == defs.Credoctypes.Standard.value and redirectors.redirect(
name, section
):
logger.info(
f"did not find node of type {ntype}, name {name} and section {section}, redirecting to external resource"
f"Did not find node of type {ntype}, name {name}, and section {section}, redirecting to external resource."
)
return redirect(redirectors.redirect(name, section))
else:
logger.warning(f"not sure what happened, 404")
logger.warning(f"No nodes or CREs found for name: {name}, section: {section}. Returning 404.")
return abort(404, "Document does not exist")


@app.route("/rest/v1/deeplink/<ntype>/<name>", methods=["GET"])
@app.route("/rest/v1/deeplink/<name>", methods=["GET"])
@app.route("/deeplink/<ntype>/<name>", methods=["GET"])
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,6 @@
"style": "module",
"parser": "typescript"
}
}
},
"packageManager": "[email protected]+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
}