5
5
import logging
6
6
import os
7
7
import subprocess
8
+ from shlex import quote
8
9
from zipfile import ZipFile
9
10
10
11
import requests
@@ -45,11 +46,17 @@ def _download_rules(cls, latest_version: str):
45
46
46
47
file_to_download = latest_version + ".zip"
47
48
file_url = RULES_URL + file_to_download
48
- response = requests .get (file_url , stream = True )
49
- logger .info (f"Started downloading rules from { file_url } " )
50
- with open (RULES_FILE , mode = "wb+" ) as file :
51
- for chunk in response .iter_content (chunk_size = 10 * 1024 ):
52
- file .write (chunk )
49
+ try :
50
+
51
+ response = requests .get (file_url , stream = True )
52
+ logger .info (f"Started downloading rules from { file_url } " )
53
+ with open (RULES_FILE , mode = "wb+" ) as file :
54
+ for chunk in response .iter_content (chunk_size = 10 * 1024 ):
55
+ file .write (chunk )
56
+
57
+ except Exception as e :
58
+ logger .error (f"Failed to download rules with error: { e } " )
59
+ raise AnalyzerRunException ("Failed to download rules" )
53
60
54
61
logger .info (f"Rules have been successfully downloaded at { RULES_LOCATION } " )
55
62
@@ -61,28 +68,23 @@ def _download_signatures(cls) -> None:
61
68
os .makedirs (SIGNATURE_LOCATION )
62
69
63
70
signatures_url = "https://api.github.com/repos/mandiant/capa/contents/sigs"
64
- response = requests .get (signatures_url )
65
- signatures_list = response .json ()
66
-
67
- for signature in signatures_list :
68
- try :
69
- subprocess .run (
70
- [
71
- "/usr/bin/wget" ,
72
- "-P" ,
73
- SIGNATURE_LOCATION ,
74
- signature ["download_url" ],
75
- ],
76
- check = True ,
77
- capture_output = True ,
78
- )
71
+ try :
72
+ response = requests .get (signatures_url )
73
+ signatures_list = response .json ()
79
74
80
- except subprocess .CalledProcessError as e :
81
- stderr = e .stderr
82
- logger .error (f"Failed to download signature: { e } " )
83
- raise AnalyzerRunException (
84
- f"Failed to update signatures due to error: { stderr } "
85
- )
75
+ for signature in signatures_list :
76
+
77
+ filename = signature ["name" ]
78
+ download_url = signature ["download_url" ]
79
+
80
+ sig_content = requests .get (download_url , stream = True )
81
+ with open (filename , mode = "wb" ) as file :
82
+ for chunk in sig_content .iter_content (chunk_size = 10 * 1024 ):
83
+ file .write (chunk )
84
+
85
+ except Exception as e :
86
+ logger .error (f"Failed to download signature: { e } " )
87
+ raise AnalyzerRunException ("Failed to update signatures" )
86
88
logger .info ("Successfully updated singatures" )
87
89
88
90
@classmethod
@@ -132,7 +134,7 @@ def run(self):
132
134
command .append ("-s" )
133
135
command .append (SIGNATURE_LOCATION )
134
136
135
- command .append (self .filepath )
137
+ command .append (quote ( self .filepath ) )
136
138
137
139
logger .info (f"Starting CAPA analysis for { self .filename } " )
138
140
0 commit comments