10
10
11
11
import redis
12
12
from django .conf import settings
13
- from django .utils .safestring import mark_safe
13
+ from django .utils .functional import cached_property
14
14
15
15
from dojo .models import Finding
16
16
17
17
logger = logging .getLogger (__name__ )
18
18
19
19
DD_TEST = os .getenv ("DD_TEST" , "False" ).lower () == "true"
20
- SEVERITY_ORDER = {
21
- "Critical" : 5 ,
22
- "High" : 4 ,
23
- "Medium" : 3 ,
24
- "Low" : 2 ,
25
- "Info" : 1 ,
26
- }
27
20
USER_MODES_KEY = "finding_groups_user_modes"
28
- SYSTEM_CHANGE = "finding_groups_last_finding_change"
29
- LAST_UPDATE = "finding_groups_last_update"
21
+ LAST_FINDING_CHANGE = "finding_groups_last_finding_change"
22
+ LAST_FINDING_UPDATE = "finding_groups_last_update"
30
23
31
24
32
25
class GroupMode (StrEnum ):
@@ -41,7 +34,6 @@ class DynamicFindingGroups:
41
34
name : str = ""
42
35
severity : str = "Info"
43
36
main_finding_id : int | None = None
44
- sla_finding_id : int | None = None
45
37
finding_ids : set [int ] = field (default_factory = set )
46
38
47
39
def to_dict (self ) -> dict :
@@ -63,12 +55,9 @@ def load_from_id(finding_group_id: str, fg_key: str) -> Self | None:
63
55
return None
64
56
65
57
def update_sev_sla (self , finding : Finding ) -> None :
66
- if SEVERITY_ORDER [ finding .severity ] > SEVERITY_ORDER [ self .severity ] :
58
+ if Finding . get_number_severity ( finding .severity ) > Finding . get_number_severity ( self .severity ) :
67
59
self .severity = finding .severity
68
60
self .main_finding_id = finding .id
69
- if finding .active and finding .sla_days_remaining ():
70
- if not self .sla_finding_id or finding .sla_days_remaining () < Finding .objects .get (id = self .sla_finding_id ).sla_days_remaining ():
71
- self .sla_finding_id = finding .id
72
61
73
62
def add (self , finding : Finding ) -> None :
74
63
self .update_sev_sla (finding )
@@ -77,21 +66,23 @@ def add(self, finding: Finding) -> None:
77
66
# This method is used when we filter findings in a finding group
78
67
def reconfig_finding_group (self ) -> None :
79
68
self .severity = "Info"
80
- self .sla_finding_id = None
81
69
findings = Finding .objects .filter (id__in = self .finding_ids )
82
70
for finding in findings :
83
71
self .update_sev_sla (finding )
84
72
85
73
@staticmethod
86
74
def get_group_names (finding : Finding , mode : GroupMode ) -> list [str ] | None :
87
75
if mode == GroupMode .VULN_ID_FROM_TOOL :
88
- return [finding .vuln_id_from_tool ]
76
+ if finding .vuln_id_from_tool :
77
+ return [finding .vuln_id_from_tool ]
89
78
if mode == GroupMode .TITLE :
90
- return [finding .title ]
79
+ if finding .title :
80
+ return [finding .title ]
91
81
if mode == GroupMode .CVE :
92
- cves = list (
93
- finding .vulnerability_id_set .values_list ("vulnerability_id" , flat = True ),
94
- )
82
+ cves = [
83
+ cve for cve in finding .vulnerability_id_set .values_list ("vulnerability_id" , flat = True )
84
+ if cve
85
+ ]
95
86
if cves :
96
87
return cves
97
88
return None
@@ -110,14 +101,14 @@ def set_last_finding_change() -> None:
110
101
logger .info ("Redis is not used in test environment, skipping." )
111
102
return
112
103
redis_client = get_redis_client ()
113
- redis_client .set (SYSTEM_CHANGE , datetime .now ().isoformat ())
104
+ redis_client .set (LAST_FINDING_CHANGE , datetime .now ().isoformat ())
114
105
115
106
@staticmethod
116
107
def set_last_update (mode : GroupMode , timestamp : datetime | None = None ) -> None :
117
108
if timestamp is None :
118
109
return
119
110
redis_client = get_redis_client ()
120
- redis_client .hset (LAST_UPDATE , mode .value , timestamp .isoformat ())
111
+ redis_client .hset (LAST_FINDING_UPDATE , mode .value , timestamp .isoformat ())
121
112
122
113
@staticmethod
123
114
def add_finding (finding : Finding , mode : GroupMode ) -> None :
@@ -147,37 +138,16 @@ def add_finding(finding: Finding, mode: GroupMode) -> None:
147
138
group_ids .append (finding_group_id )
148
139
redis_client .hset (id_map_key , finding .id , json .dumps (group_ids ))
149
140
150
- # This method is used in finding_groups table to show SLA
151
- def get_days_remaining (self ) -> str :
152
- if self .sla_finding_id :
153
- finding = Finding .objects .filter (id = self .sla_finding_id ).first ()
154
- days_remaining = finding .sla_days_remaining ()
155
- severity = finding .severity
156
- sla_start_date = finding .get_sla_start_date ().strftime ("%b %d, %Y" )
157
- status = "age-green"
158
- status_text = f"Remediation for { severity .lower ()} findings due in { days_remaining } days or less (started { sla_start_date } )"
159
- if days_remaining and days_remaining < 0 :
160
- status = "age-red"
161
- status_text = f"Overdue: Remediation for { severity .lower ()} findings overdue { days_remaining } days (started { sla_start_date } )"
162
- days_remaining = abs (days_remaining )
163
- elif any (
164
- Finding .objects .filter (
165
- id__in = self .finding_ids ,
166
- active = True ,
167
- ),
168
- ):
169
- status = "severity-Info"
170
- status_text = "No SLA set, but at least one finding is active"
171
- days_remaining = "No SLA"
172
- else :
173
- status = "age-blue"
174
- status_text = "No active finding"
175
- days_remaining = "Concluded"
176
- title = (
177
- f'<a class="has-popover" data-toggle="tooltip" data-placement="bottom" title="" href="#" data-content="{ status_text } ">'
178
- f'<span class="label severity { status } ">{ days_remaining } </span></a>'
179
- )
180
- return mark_safe (title )
141
+ @cached_property
142
+ def sla_days_remaining_internal (self ):
143
+ findings = Finding .objects .filter (id__in = self .finding_ids , active = True )
144
+ if not findings :
145
+ return None
146
+ return min ([find .sla_days_remaining () for find in findings if find .sla_days_remaining ()], default = None )
147
+
148
+ @property
149
+ def sla_days_remaining (self ) -> int | None :
150
+ return self .sla_days_remaining_internal
181
151
182
152
183
153
@lru_cache (maxsize = 1 )
@@ -208,22 +178,22 @@ def load_or_rebuild_finding_groups(mode: GroupMode) -> dict[str, DynamicFindingG
208
178
fg_key = DynamicFindingGroups .get_fg_key (mode )
209
179
id_map_key = DynamicFindingGroups .get_id_map_key (mode )
210
180
211
- if not redis_client .exists (SYSTEM_CHANGE ):
181
+ if not redis_client .exists (LAST_FINDING_CHANGE ):
212
182
DynamicFindingGroups .set_last_finding_change ()
213
- last_finding_change_raw = redis_client .get (SYSTEM_CHANGE )
183
+ last_finding_change_raw = redis_client .get (LAST_FINDING_CHANGE )
214
184
try :
215
185
last_finding_change_time = datetime .fromisoformat (last_finding_change_raw )
216
186
except ValueError :
217
- logger .warning (f"Invalid datetime format in Redis for { SYSTEM_CHANGE } : { last_finding_change_raw } , resetting last finding change." )
187
+ logger .warning (f"Invalid datetime format in Redis for { LAST_FINDING_CHANGE } : { last_finding_change_raw } , resetting last finding change." )
218
188
DynamicFindingGroups .set_last_finding_change ()
219
- last_finding_change_raw = redis_client .get (SYSTEM_CHANGE )
189
+ last_finding_change_raw = redis_client .get (LAST_FINDING_CHANGE )
220
190
last_finding_change_time = datetime .fromisoformat (last_finding_change_raw ) if last_finding_change_raw else None
221
191
222
192
try :
223
- last_groups_update_time = redis_client .hget (LAST_UPDATE , mode .value )
193
+ last_groups_update_time = redis_client .hget (LAST_FINDING_UPDATE , mode .value )
224
194
last_groups_update_time = datetime .fromisoformat (last_groups_update_time ) if last_groups_update_time else None
225
195
except ValueError :
226
- logger .warning (f"Invalid datetime format in Redis for { LAST_UPDATE } : { last_groups_update_time } " )
196
+ logger .warning (f"Invalid datetime format in Redis for { LAST_FINDING_UPDATE } : { last_groups_update_time } " )
227
197
last_groups_update_time = None
228
198
229
199
# Check if finding_groups and id_map exist in Redis
0 commit comments