|
3 | 3 | import importlib
|
4 | 4 | import json
|
5 | 5 | import os
|
| 6 | +import uuid |
6 | 7 | from datetime import datetime
|
| 8 | +from pathlib import Path |
7 | 9 |
|
8 | 10 | import texttable
|
9 | 11 |
|
10 |
| -from nettacker import logger |
| 12 | +from nettacker import logger, all_module_severity_and_desc |
11 | 13 | from nettacker.config import Config, version_info
|
12 | 14 | from nettacker.core.die import die_failure
|
13 | 15 | from nettacker.core.messages import messages as _
|
@@ -119,6 +121,99 @@ def create_compare_text_table(results):
|
119 | 121 | return table.draw() + "\n\n"
|
120 | 122 |
|
121 | 123 |
|
| 124 | +def create_dd_specific_json(all_scan_logs): |
| 125 | + severity_mapping = {1: "Info", 2: "Low", 3: "Medium", 4: "High", 5: "Critical"} |
| 126 | + |
| 127 | + findings = [] |
| 128 | + |
| 129 | + for log in all_scan_logs: |
| 130 | + module_name = log["module_name"].strip() |
| 131 | + date = datetime.strptime(log["date"], "%Y-%m-%d %H:%M:%S.%f").strftime("%m/%d/%Y") |
| 132 | + port = str(log.get("port", "")).strip() |
| 133 | + impact = log.get("event", "").strip() |
| 134 | + severity_justification = log.get("json_event", "").strip() |
| 135 | + service = log.get("target", "").strip() |
| 136 | + unique_id = log.get("scan_id", uuid.uuid4().hex) |
| 137 | + |
| 138 | + metadata = all_module_severity_and_desc.get(module_name, {}) |
| 139 | + severity_raw = metadata.get("severity", 0) |
| 140 | + description = metadata.get("desc", "") |
| 141 | + if severity_raw >= 9: |
| 142 | + severity = severity_mapping[5] |
| 143 | + elif severity_raw >= 7: |
| 144 | + severity = severity_mapping[4] |
| 145 | + elif severity_raw >= 4: |
| 146 | + severity = severity_mapping[3] |
| 147 | + elif severity_raw > 0: |
| 148 | + severity = severity_mapping[2] |
| 149 | + else: |
| 150 | + severity = severity_mapping[1] |
| 151 | + |
| 152 | + findings.append( |
| 153 | + { |
| 154 | + "date": date, |
| 155 | + "title": module_name, |
| 156 | + "description": description.strip(), |
| 157 | + "severity": severity, |
| 158 | + "param": port, |
| 159 | + "impact": impact, |
| 160 | + "severity_justification": severity_justification, |
| 161 | + "service": service, |
| 162 | + "unique_id_from_tool": unique_id, |
| 163 | + "static_finding": False, |
| 164 | + "dynamic_finding": True, |
| 165 | + } |
| 166 | + ) |
| 167 | + |
| 168 | + return json.dumps({"findings": findings}, indent=4) |
| 169 | + |
| 170 | + |
| 171 | +def create_sarif_report(all_scan_logs): |
| 172 | + """ |
| 173 | + Takes all_scan_logs and converts them to a SARIF based json |
| 174 | + format. The schema and version used are 2.1.0 linked below. |
| 175 | + The following conversions are made: |
| 176 | + ruleId: name of the module |
| 177 | + message: event value for each log in all_scan_logs |
| 178 | + locations.physicalLocations.artifactLocation.uri: target value |
| 179 | + webRequest.properties.json_event: json_event value for each log in all_scan_logs |
| 180 | + properties.scan_id: scan_id unique value for each run |
| 181 | + properties.date: date field specified in all_scan_logs |
| 182 | + """ |
| 183 | + |
| 184 | + sarif_structure = { |
| 185 | + "$schema": "https://json.schemastore.org/sarif-2.1.0.json", |
| 186 | + "version": "2.1.0", |
| 187 | + "runs": [ |
| 188 | + { |
| 189 | + "tool": { |
| 190 | + "driver": { |
| 191 | + "name": "Nettacker", |
| 192 | + "version": "0.4.0", |
| 193 | + "informationUri": "https://github.com/OWASP/Nettacker", |
| 194 | + } |
| 195 | + }, |
| 196 | + "results": [], |
| 197 | + } |
| 198 | + ], |
| 199 | + } |
| 200 | + |
| 201 | + for log in all_scan_logs: |
| 202 | + sarif_result = { |
| 203 | + "ruleId": log["module_name"], |
| 204 | + "message": {"text": log["event"]}, |
| 205 | + "locations": [{"physicalLocation": {"artifactLocation": {"uri": log["target"]}}}], |
| 206 | + "properties": { |
| 207 | + "scan_id": log["scan_id"], |
| 208 | + "date": log["date"], |
| 209 | + "json_event": log["json_event"], |
| 210 | + }, |
| 211 | + } |
| 212 | + sarif_structure["runs"][0]["results"].append(sarif_result) |
| 213 | + |
| 214 | + return json.dumps(sarif_structure, indent=2) |
| 215 | + |
| 216 | + |
122 | 217 | def create_report(options, scan_id):
|
123 | 218 | """
|
124 | 219 | sort all events, create log file in HTML/TEXT/JSON and remove old logs
|
@@ -179,25 +274,34 @@ def create_report(options, scan_id):
|
179 | 274 | + "</p>"
|
180 | 275 | + log_data.json_parse_js
|
181 | 276 | )
|
182 |
| - with open(report_path_filename, "w", encoding="utf-8") as report_file: |
| 277 | + with Path(report_path_filename).open("w", encoding="utf-8") as report_file: |
183 | 278 | report_file.write(html_table_content + "\n")
|
184 |
| - report_file.close() |
| 279 | + |
| 280 | + elif len(report_path_filename) >= 5 and report_path_filename[-8:].lower() == ".dd.json": |
| 281 | + with Path(report_path_filename).open("w", encoding="utf-8") as report_file: |
| 282 | + dd_content_json = create_dd_specific_json(all_scan_logs) |
| 283 | + report_file.write(dd_content_json + "\n") |
| 284 | + |
185 | 285 | elif len(report_path_filename) >= 5 and report_path_filename[-5:] == ".json":
|
186 |
| - with open(report_path_filename, "w", encoding="utf-8") as report_file: |
| 286 | + with Path(report_path_filename).open("w", encoding="utf-8") as report_file: |
187 | 287 | report_file.write(str(json.dumps(all_scan_logs)) + "\n")
|
188 |
| - report_file.close() |
| 288 | + |
| 289 | + elif len(report_path_filename) >= 6 and report_path_filename[-6:].lower() == ".sarif": |
| 290 | + with Path(report_path_filename).open("w", encoding="utf-8") as report_file: |
| 291 | + sarif_content = create_sarif_report(all_scan_logs) |
| 292 | + report_file.write(sarif_content + "\n") |
| 293 | + |
189 | 294 | elif len(report_path_filename) >= 5 and report_path_filename[-4:] == ".csv":
|
190 | 295 | keys = all_scan_logs[0].keys()
|
191 |
| - with open(report_path_filename, "a") as csvfile: |
| 296 | + with Path(report_path_filename).open("a") as csvfile: |
192 | 297 | writer = csv.DictWriter(csvfile, fieldnames=keys)
|
193 | 298 | writer.writeheader()
|
194 | 299 | for log_list in all_scan_logs:
|
195 | 300 | dict_data = {key: value for key, value in log_list.items() if key in keys}
|
196 | 301 | writer.writerow(dict_data)
|
197 |
| - csvfile.close() |
198 | 302 |
|
199 | 303 | else:
|
200 |
| - with open(report_path_filename, "w", encoding="utf-8") as report_file: |
| 304 | + with Path(report_path_filename).open("w", encoding="utf-8") as report_file: |
201 | 305 | report_file.write(build_text_table(all_scan_logs))
|
202 | 306 |
|
203 | 307 | log.write(build_text_table(all_scan_logs))
|
@@ -278,20 +382,20 @@ def get_modules_ports(item):
|
278 | 382 | len(fullpath) >= 4 and fullpath[-4:] == ".htm"
|
279 | 383 | ):
|
280 | 384 | html_report = build_compare_report(compare_results)
|
281 |
| - with open(fullpath, "w", encoding="utf-8") as compare_report: |
| 385 | + with Path(fullpath).open("w", encoding="utf-8") as compare_report: |
282 | 386 | compare_report.write(html_report + "\n")
|
283 | 387 | elif len(fullpath) >= 5 and fullpath[-5:] == ".json":
|
284 |
| - with open(fullpath, "w", encoding="utf-8") as compare_report: |
| 388 | + with Path(fullpath).open("w", encoding="utf-8") as compare_report: |
285 | 389 | compare_report.write(str(json.dumps(compare_results)) + "\n")
|
286 | 390 | elif len(fullpath) >= 5 and fullpath[-4:] == ".csv":
|
287 | 391 | keys = compare_results.keys()
|
288 |
| - with open(fullpath, "a") as csvfile: |
| 392 | + with Path(fullpath).open("a") as csvfile: |
289 | 393 | writer = csv.DictWriter(csvfile, fieldnames=keys)
|
290 | 394 | if csvfile.tell() == 0:
|
291 | 395 | writer.writeheader()
|
292 | 396 | writer.writerow(compare_results)
|
293 | 397 | else:
|
294 |
| - with open(fullpath, "w", encoding="utf-8") as compare_report: |
| 398 | + with Path(fullpath).open("w", encoding="utf-8") as compare_report: |
295 | 399 | compare_report.write(create_compare_text_table(compare_results))
|
296 | 400 |
|
297 | 401 | log.write(create_compare_text_table(compare_results))
|
|
0 commit comments