-
-
Notifications
You must be signed in to change notification settings - Fork 505
Floss Capa Refactor #2933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Floss Capa Refactor #2933
Changes from all commits
5639151
87b8e89
63a5226
fd200af
6978938
b93481a
da6eb26
f4bc110
f06c049
c57d234
045bc30
2370697
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,251 @@ | ||
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl | ||
# See the file 'LICENSE' for copying permission. | ||
from typing import Dict | ||
|
||
from api_app.analyzers_manager.classes import DockerBasedAnalyzer, FileAnalyzer | ||
import json | ||
import logging | ||
import os | ||
import shutil | ||
import subprocess | ||
from shlex import quote | ||
from zipfile import ZipFile | ||
|
||
import requests | ||
from django.conf import settings | ||
from django.utils import timezone | ||
|
||
class CapaInfo(FileAnalyzer, DockerBasedAnalyzer): | ||
name: str = "Capa" | ||
url: str = "http://malware_tools_analyzers:4002/capa" | ||
# interval between http request polling | ||
poll_distance: int = 10 | ||
# http request polling max number of tries | ||
max_tries: int = 60 | ||
# here, max_tries * poll_distance = 10 minutes | ||
timeout: int = 60 * 9 | ||
# whereas subprocess timeout is kept as 60 * 9 = 9 minutes | ||
from api_app.analyzers_manager.classes import FileAnalyzer | ||
from api_app.analyzers_manager.exceptions import AnalyzerRunException | ||
from api_app.analyzers_manager.models import AnalyzerRulesFileVersion, PythonModule | ||
from tests.mock_utils import if_mock_connections, patch | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
BASE_LOCATION = f"{settings.MEDIA_ROOT}/capa" | ||
RULES_LOCATION = f"{BASE_LOCATION}/capa-rules" | ||
SIGNATURE_LOCATION = f"{BASE_LOCATION}/sigs" | ||
RULES_FILE = f"{RULES_LOCATION}/capa_rules.zip" | ||
RULES_URL = "https://github.com/mandiant/capa-rules/archive/refs/tags/" | ||
|
||
|
||
class CapaInfo(FileAnalyzer): | ||
shellcode: bool | ||
arch: str | ||
timeout: float = 15 | ||
force_pull_signatures: bool = False | ||
|
||
def _check_if_latest_version(self, latest_version: str) -> bool: | ||
|
||
analyzer_rules_file_version = AnalyzerRulesFileVersion.objects.filter( | ||
python_module=self.python_module | ||
).first() | ||
|
||
if analyzer_rules_file_version is None: | ||
return False | ||
|
||
return latest_version == analyzer_rules_file_version.last_downloaded_version | ||
|
||
@classmethod | ||
def _update_rules_file_version(cls, latest_version: str, file_url: str): | ||
capa_module = PythonModule.objects.get( | ||
module="capa_info.CapaInfo", | ||
base_path="api_app.analyzers_manager.file_analyzers", | ||
) | ||
|
||
_, created = AnalyzerRulesFileVersion.objects.update_or_create( | ||
python_module=capa_module, | ||
defaults={ | ||
"last_downloaded_version": latest_version, | ||
"download_url": file_url, | ||
"downloaded_at": timezone.now(), | ||
}, | ||
) | ||
|
||
if created: | ||
logger.info(f"Created new entry for {capa_module} rules file version") | ||
else: | ||
logger.info(f"Updated existing entry for {capa_module} rules file version") | ||
|
||
@classmethod | ||
def _unzip_rules(cls): | ||
logger.info(f"Extracting rules at {RULES_LOCATION}") | ||
with ZipFile(RULES_FILE, mode="r") as archive: | ||
archive.extractall( | ||
RULES_LOCATION | ||
) # this will overwrite any existing directory | ||
logger.info("Rules have been succesfully extracted") | ||
|
||
@classmethod | ||
def _download_rules(cls, latest_version: str): | ||
|
||
if os.path.exists(RULES_LOCATION): | ||
logger.info(f"Removing existing rules at {RULES_LOCATION}") | ||
shutil.rmtree(RULES_LOCATION) | ||
|
||
os.makedirs(RULES_LOCATION) | ||
logger.info(f"Created fresh rules directory at {RULES_LOCATION}") | ||
|
||
file_to_download = latest_version + ".zip" | ||
file_url = RULES_URL + file_to_download | ||
try: | ||
|
||
response = requests.get(file_url, stream=True) | ||
logger.info( | ||
f"Started downloading rules with version: {latest_version} from {file_url}" | ||
) | ||
with open(RULES_FILE, mode="wb+") as file: | ||
for chunk in response.iter_content(chunk_size=10 * 1024): | ||
file.write(chunk) | ||
|
||
cls._update_rules_file_version(latest_version, file_url) | ||
logger.info(f"Bumped up version number in db to {latest_version}") | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to download rules with error: {e}") | ||
raise AnalyzerRunException("Failed to download rules") | ||
|
||
logger.info( | ||
f"Rules with version: {latest_version} have been successfully downloaded at {RULES_LOCATION}" | ||
) | ||
|
||
@classmethod | ||
def _download_signatures(cls) -> None: | ||
logger.info(f"Downloading signatures at {SIGNATURE_LOCATION} now") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the "signatures" are less important than the rules ones. These are almost never updated while the "rules" are are updated often. Plus, most of the time, we don't want these signatures to execute either cause it would slow the Capa execution. The rules are always necessary because they are the core part of the tool while these one could not be necessary. Because of that, I would not re-update them once they are here, like you already do. But we need another additional parameter for the user to enable them explicitly otherwise it would be better if these signatures would be disabled by default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Regarding your point, Actually I've tried executing the flare-capa without the signatures but it threw an error, when only executed with rules. So, I feel the signatures are necessary for it's execution. Though, I can definitely make changes in the code that the signatures are only downloaded once or updated on-demand by the user. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. made the changes such that signatures are only downloaded the first or whenever |
||
|
||
if os.path.exists(SIGNATURE_LOCATION): | ||
logger.info(f"Removing existing signatures at {SIGNATURE_LOCATION}") | ||
shutil.rmtree(SIGNATURE_LOCATION) | ||
|
||
os.makedirs(SIGNATURE_LOCATION) | ||
logger.info(f"Created fresh signatures directory at {SIGNATURE_LOCATION}") | ||
|
||
signatures_url = "https://api.github.com/repos/mandiant/capa/contents/sigs" | ||
try: | ||
response = requests.get(signatures_url) | ||
signatures_list = response.json() | ||
|
||
for signature in signatures_list: | ||
|
||
def config(self, runtime_configuration: Dict): | ||
super().config(runtime_configuration) | ||
self.args = [] | ||
if self.arch != "64": | ||
self.arch = "32" | ||
if self.shellcode: | ||
self.args.append("-f") | ||
self.args.append("sc" + self.arch) | ||
filename = signature["name"] | ||
download_url = signature["download_url"] | ||
|
||
signature_file_path = os.path.join(SIGNATURE_LOCATION, filename) | ||
|
||
sig_content = requests.get(download_url, stream=True) | ||
with open(signature_file_path, mode="wb") as file: | ||
for chunk in sig_content.iter_content(chunk_size=10 * 1024): | ||
file.write(chunk) | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to download signature: {e}") | ||
raise AnalyzerRunException("Failed to update signatures") | ||
logger.info("Successfully updated signatures") | ||
|
||
@classmethod | ||
def update(cls) -> bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need also to set up a cronjob to automatically update these rules cause otherwise they will be obsolete soon because they are updated only once when those files are not available while on the repositories they update them continously. If I remember correctly, you should update the "update_schedule" value in the migration for both of the analyzers. See how "Maxmind" analyzer if configured for an example of an already working one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've update the migration file with appropriate cron values. |
||
try: | ||
logger.info("Updating capa rules") | ||
response = requests.get( | ||
"https://api.github.com/repos/mandiant/capa-rules/releases/latest" | ||
) | ||
latest_version = response.json()["tag_name"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you please log the version? plus, an ideal solution would be to save the latest downloaded version in a new separate model. Then, when a new update is requested, compare the new version with the latest downloaded one and abort in case the version is the same. This would optimize the caching mechanism There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure this can be good optimisation, I'll make the appropriate changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. created new model |
||
cls._download_rules(latest_version) | ||
cls._unzip_rules() | ||
logger.info("Successfully updated capa rules") | ||
|
||
return True | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to update capa rules with error: {e}") | ||
|
||
return False | ||
|
||
def run(self): | ||
# get binary | ||
binary = self.read_file_bytes() | ||
# make request data | ||
fname = str(self.filename).replace("/", "_").replace(" ", "_") | ||
args = [f"@{fname}", *self.args] | ||
req_data = {"args": args, "timeout": self.timeout} | ||
req_files = {fname: binary} | ||
|
||
return self._docker_run(req_data, req_files) | ||
try: | ||
|
||
response = requests.get( | ||
"https://api.github.com/repos/mandiant/capa-rules/releases/latest" | ||
) | ||
latest_version = response.json()["tag_name"] | ||
|
||
update_status = ( | ||
True if self._check_if_latest_version(latest_version) else self.update() | ||
) | ||
|
||
if self.force_pull_signatures or not os.path.isdir(SIGNATURE_LOCATION): | ||
self._download_signatures() | ||
|
||
if not (os.path.isdir(RULES_LOCATION)) and not update_status: | ||
|
||
raise AnalyzerRunException("Couldn't update capa rules") | ||
|
||
command: list[str] = ["/usr/local/bin/capa", "--quiet", "--json"] | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
shell_code_arch = "sc64" if self.arch == "64" else "sc32" | ||
if self.shellcode: | ||
command.append("-f") | ||
command.append(shell_code_arch) | ||
|
||
# Setting default capa-rules path | ||
command.append("-r") | ||
command.append(RULES_LOCATION) | ||
|
||
# Setting default signatures location | ||
command.append("-s") | ||
command.append(SIGNATURE_LOCATION) | ||
|
||
command.append(quote(self.filepath)) | ||
|
||
logger.info( | ||
f"Starting CAPA analysis for {self.filename} with hash: {self.md5} and command: {command}" | ||
) | ||
|
||
process: subprocess.CompletedProcess = subprocess.run( | ||
command, | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
capture_output=True, | ||
text=True, | ||
timeout=self.timeout, | ||
check=True, | ||
) | ||
|
||
result = json.loads(process.stdout) | ||
fgibertoni marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result["command_executed"] = command | ||
result["rules_version"] = latest_version | ||
|
||
logger.info( | ||
f"CAPA analysis successfully completed for file: {self.filename} with hash {self.md5}" | ||
) | ||
|
||
except subprocess.CalledProcessError as e: | ||
stderr = e.stderr | ||
logger.info( | ||
f"Capa Info failed to run for {self.filename} with hash: {self.md5} with command {e}" | ||
) | ||
raise AnalyzerRunException( | ||
f" Analyzer for {self.filename} with hash: {self.md5} failed with error: {stderr}" | ||
) | ||
|
||
return result | ||
|
||
@classmethod | ||
def _monkeypatch(cls): | ||
response_from_command = subprocess.CompletedProcess( | ||
args=[ | ||
"capa", | ||
"--quiet", | ||
"--json", | ||
"-r", | ||
"/opt/deploy/files_required/capa/capa-rules", | ||
"-s", | ||
"/opt/deploy/files_required/capa/sigs", | ||
"/opt/deploy/files_required/06ebf06587b38784e2af42dd5fbe56e5", | ||
], | ||
returncode=0, | ||
stdout='{"meta": {}, "rules": {"contain obfuscated stackstrings": {}, "enumerate PE sections":{}}}', | ||
stderr="", | ||
) | ||
patches = [ | ||
if_mock_connections( | ||
patch.object(CapaInfo, "update", return_value=True), | ||
patch("subprocess.run", return_value=response_from_command), | ||
) | ||
] | ||
return super()._monkeypatch(patches) |
Uh oh!
There was an error while loading. Please reload this page.