-
-
Notifications
You must be signed in to change notification settings - Fork 506
Floss Capa Refactor #2933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Floss Capa Refactor #2933
Changes from 10 commits
5639151
87b8e89
63a5226
fd200af
6978938
b93481a
da6eb26
f4bc110
f06c049
c57d234
045bc30
2370697
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,253 @@ | ||
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl | ||
# See the file 'LICENSE' for copying permission. | ||
from typing import Dict | ||
|
||
from api_app.analyzers_manager.classes import DockerBasedAnalyzer, FileAnalyzer | ||
import json | ||
import logging | ||
import os | ||
import shutil | ||
import subprocess | ||
from shlex import quote | ||
from zipfile import ZipFile | ||
|
||
import requests | ||
from django.conf import settings | ||
from django.utils import timezone | ||
|
||
class CapaInfo(FileAnalyzer, DockerBasedAnalyzer): | ||
name: str = "Capa" | ||
url: str = "http://malware_tools_analyzers:4002/capa" | ||
# interval between http request polling | ||
poll_distance: int = 10 | ||
# http request polling max number of tries | ||
max_tries: int = 60 | ||
# here, max_tries * poll_distance = 10 minutes | ||
timeout: int = 60 * 9 | ||
# whereas subprocess timeout is kept as 60 * 9 = 9 minutes | ||
from api_app.analyzers_manager.classes import FileAnalyzer | ||
from api_app.analyzers_manager.exceptions import AnalyzerRunException | ||
from api_app.analyzers_manager.models import AnalyzerRulesFileVersion, PythonModule | ||
from tests.mock_utils import if_mock_connections, patch | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
BASE_LOCATION = f"{settings.MEDIA_ROOT}/capa" | ||
RULES_LOCATION = f"{BASE_LOCATION}/capa-rules" | ||
SIGNATURE_LOCATION = f"{BASE_LOCATION}/sigs" | ||
RULES_FILE = f"{RULES_LOCATION}/capa_rules.zip" | ||
RULES_URL = "https://github.com/mandiant/capa-rules/archive/refs/tags/" | ||
|
||
|
||
class CapaInfo(FileAnalyzer): | ||
shellcode: bool | ||
arch: str | ||
timeout: float = 15 | ||
force_pull_signatures: bool = False | ||
|
||
def _check_if_latest_version(self, latest_version: str) -> bool: | ||
|
||
analyzer_rules_file_version = AnalyzerRulesFileVersion.objects.filter( | ||
python_module=self.python_module | ||
).first() | ||
|
||
if analyzer_rules_file_version is None: | ||
return False | ||
|
||
return latest_version == analyzer_rules_file_version.last_downloaded_version | ||
|
||
@classmethod | ||
def _update_rules_file_version(cls, latest_version: str, file_url: str): | ||
capa_module = PythonModule.objects.get( | ||
module="capa_info.CapaInfo", | ||
base_path="api_app.analyzers_manager.file_analyzers", | ||
) | ||
|
||
analyzer_rules_file_version, created = ( | ||
AnalyzerRulesFileVersion.objects.update_or_create( | ||
python_module=capa_module, | ||
defaults={ | ||
"last_downloaded_version": latest_version, | ||
"download_url": file_url, | ||
"downloaded_at": timezone.now(), | ||
}, | ||
) | ||
) | ||
|
||
if created: | ||
logger.info(f"Created new entry for {capa_module} rules file version") | ||
else: | ||
logger.info(f"Updated existing entry for {capa_module} rules file version") | ||
|
||
@classmethod | ||
def _unzip_rules(cls): | ||
logger.info(f"Extracting rules at {RULES_LOCATION}") | ||
with ZipFile(RULES_FILE, mode="r") as archive: | ||
archive.extractall( | ||
RULES_LOCATION | ||
) # this will overwrite any existing directory | ||
logger.info("Rules have been succesfully extracted") | ||
|
||
@classmethod | ||
def _download_rules(cls, latest_version: str): | ||
|
||
if os.path.exists(RULES_LOCATION): | ||
logger.info(f"Removing existing rules at {RULES_LOCATION}") | ||
shutil.rmtree(RULES_LOCATION) | ||
|
||
os.makedirs(RULES_LOCATION) | ||
logger.info(f"Created fresh rules directory at {RULES_LOCATION}") | ||
|
||
file_to_download = latest_version + ".zip" | ||
file_url = RULES_URL + file_to_download | ||
try: | ||
|
||
response = requests.get(file_url, stream=True) | ||
logger.info( | ||
f"Started downloading rules with version: {latest_version} from {file_url}" | ||
) | ||
with open(RULES_FILE, mode="wb+") as file: | ||
for chunk in response.iter_content(chunk_size=10 * 1024): | ||
file.write(chunk) | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
cls._update_rules_file_version(latest_version, file_url) | ||
logger.info(f"Bumped up version number in db to {latest_version}") | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to download rules with error: {e}") | ||
raise AnalyzerRunException("Failed to download rules") | ||
|
||
logger.info( | ||
f"Rules with version: {latest_version} have been successfully downloaded at {RULES_LOCATION}" | ||
) | ||
|
||
@classmethod | ||
def _download_signatures(cls) -> None: | ||
logger.info(f"Downloading signatures at {SIGNATURE_LOCATION} now") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the "signatures" are less important than the rules ones. These are almost never updated while the "rules" are are updated often. Plus, most of the time, we don't want these signatures to execute either cause it would slow the Capa execution. The rules are always necessary because they are the core part of the tool while these one could not be necessary. Because of that, I would not re-update them once they are here, like you already do. But we need another additional parameter for the user to enable them explicitly otherwise it would be better if these signatures would be disabled by default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Regarding your point, Actually I've tried executing the flare-capa without the signatures but it threw an error, when only executed with rules. So, I feel the signatures are necessary for it's execution. Though, I can definitely make changes in the code that the signatures are only downloaded once or updated on-demand by the user. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made the changes such that signatures are only downloaded the first or whenever |
||
|
||
if os.path.exists(SIGNATURE_LOCATION): | ||
logger.info(f"Removing existing signatures at {SIGNATURE_LOCATION}") | ||
shutil.rmtree(SIGNATURE_LOCATION) | ||
|
||
os.makedirs(SIGNATURE_LOCATION) | ||
logger.info(f"Created fresh signatures directory at {SIGNATURE_LOCATION}") | ||
|
||
signatures_url = "https://api.github.com/repos/mandiant/capa/contents/sigs" | ||
try: | ||
response = requests.get(signatures_url) | ||
signatures_list = response.json() | ||
|
||
for signature in signatures_list: | ||
|
||
def config(self, runtime_configuration: Dict): | ||
super().config(runtime_configuration) | ||
self.args = [] | ||
if self.arch != "64": | ||
self.arch = "32" | ||
if self.shellcode: | ||
self.args.append("-f") | ||
self.args.append("sc" + self.arch) | ||
filename = signature["name"] | ||
download_url = signature["download_url"] | ||
|
||
signature_file_path = os.path.join(SIGNATURE_LOCATION, filename) | ||
|
||
sig_content = requests.get(download_url, stream=True) | ||
with open(signature_file_path, mode="wb") as file: | ||
for chunk in sig_content.iter_content(chunk_size=10 * 1024): | ||
file.write(chunk) | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to download signature: {e}") | ||
raise AnalyzerRunException("Failed to update signatures") | ||
logger.info("Successfully updated signatures") | ||
|
||
@classmethod | ||
def update(cls) -> bool: | ||
mlodic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
logger.info("Updating capa rules") | ||
response = requests.get( | ||
"https://api.github.com/repos/mandiant/capa-rules/releases/latest" | ||
) | ||
latest_version = response.json()["tag_name"] | ||
mlodic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cls._download_rules(latest_version) | ||
cls._unzip_rules() | ||
logger.info("Successfully updated capa rules") | ||
|
||
return True | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to update capa rules with error: {e}") | ||
|
||
return False | ||
|
||
def run(self): | ||
# get binary | ||
binary = self.read_file_bytes() | ||
# make request data | ||
fname = str(self.filename).replace("/", "_").replace(" ", "_") | ||
args = [f"@{fname}", *self.args] | ||
req_data = {"args": args, "timeout": self.timeout} | ||
req_files = {fname: binary} | ||
|
||
return self._docker_run(req_data, req_files) | ||
try: | ||
|
||
response = requests.get( | ||
"https://api.github.com/repos/mandiant/capa-rules/releases/latest" | ||
) | ||
latest_version = response.json()["tag_name"] | ||
|
||
update_status = ( | ||
True if self._check_if_latest_version(latest_version) else self.update() | ||
) | ||
|
||
if self.force_pull_signatures or not os.path.isdir(SIGNATURE_LOCATION): | ||
self._download_signatures() | ||
|
||
if not (os.path.isdir(RULES_LOCATION)) and not update_status: | ||
|
||
raise AnalyzerRunException("Couldn't update capa rules") | ||
|
||
command: list[str] = ["/usr/local/bin/capa", "--quiet", "--json"] | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
shell_code_arch = "sc64" if self.arch == "64" else "sc32" | ||
if self.shellcode: | ||
command.append("-f") | ||
command.append(shell_code_arch) | ||
|
||
# Setting default capa-rules path | ||
command.append("-r") | ||
command.append(RULES_LOCATION) | ||
|
||
# Setting default signatures location | ||
command.append("-s") | ||
command.append(SIGNATURE_LOCATION) | ||
|
||
command.append(quote(self.filepath)) | ||
|
||
logger.info( | ||
f"Starting CAPA analysis for {self.filename} with hash: {self.md5} and command: {command}" | ||
) | ||
|
||
process: subprocess.CompletedProcess = subprocess.run( | ||
command, | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
capture_output=True, | ||
text=True, | ||
timeout=self.timeout, | ||
check=True, | ||
) | ||
|
||
result = json.loads(process.stdout) | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result["command_executed"] = command | ||
result["rules_version"] = latest_version | ||
|
||
logger.info( | ||
f"CAPA analysis successfully completed for file: {self.filename} with hash {self.md5}" | ||
) | ||
|
||
except subprocess.CalledProcessError as e: | ||
stderr = e.stderr | ||
logger.info( | ||
f"Capa Info failed to run for {self.filename} with hash: {self.md5} with command {e}" | ||
) | ||
raise AnalyzerRunException( | ||
f" Analyzer for {self.filename} with hash: {self.md5} failed with error: {stderr}" | ||
) | ||
|
||
return result | ||
|
||
@classmethod | ||
def _monkeypatch(cls): | ||
response_from_command = subprocess.CompletedProcess( | ||
args=[ | ||
"capa", | ||
"--quiet", | ||
"--json", | ||
"-r", | ||
"/opt/deploy/files_required/capa/capa-rules", | ||
"-s", | ||
"/opt/deploy/files_required/capa/sigs", | ||
"/opt/deploy/files_required/06ebf06587b38784e2af42dd5fbe56e5", | ||
], | ||
returncode=0, | ||
stdout='{"meta": {}, "rules": {"contain obfuscated stackstrings": {}, "enumerate PE sections":{}}}', | ||
stderr="", | ||
) | ||
patches = [ | ||
if_mock_connections( | ||
patch.object(CapaInfo, "update", return_value=True), | ||
patch("subprocess.run", return_value=response_from_command), | ||
) | ||
] | ||
return super()._monkeypatch(patches) | ||
Comment on lines
+228
to
+251
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be removed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a log file here before this comparison. In particular it would great to write there the new and the old version number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense.