-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Add disk usage check before downloading files #19041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
f0ceb78
2108d75
bcf06b4
ae168fd
ac7193a
f1637d0
52a052c
ec1ff48
8de4531
a8b13de
ca9d58b
7f77841
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import json | ||
import logging | ||
import os | ||
import shutil | ||
import signal | ||
import tempfile | ||
import traceback | ||
|
@@ -10,7 +11,6 @@ | |
from datetime import datetime | ||
from multiprocessing import Process, Queue | ||
from queue import Empty | ||
from shutil import copyfile, rmtree | ||
from time import sleep, time | ||
from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union | ||
from urllib import parse | ||
|
@@ -101,6 +101,16 @@ def _wait_for_file_to_exist(s3: S3Client, obj: parse.ParseResult, sleep_time: in | |
raise e | ||
|
||
|
||
def _wait_for_disk_usage_higher_than_threshold(threshold_in_gb: int = 25, sleep_time: int = 3) -> None: | ||
usage = shutil.disk_usage("/") | ||
|
||
while (usage.free / 1000 / 1000 / 1000) <= threshold_in_gb: | ||
sleep(sleep_time) | ||
usage = shutil.disk_usage("/") | ||
|
||
|
||
return | ||
|
||
|
||
def _download_data_target(input_dir: Dir, cache_dir: str, queue_in: Queue, queue_out: Queue) -> None: | ||
"""This function is used to download data from a remote directory to a cache directory to optimise reading.""" | ||
s3 = S3Client() | ||
|
@@ -123,7 +133,11 @@ def _download_data_target(input_dir: Dir, cache_dir: str, queue_in: Queue, queue | |
continue | ||
|
||
if input_dir.url is not None or input_dir.path is not None: | ||
# 6. Download all the required paths to unblock the current index | ||
if input_dir.url: | ||
# 6. Wait for the removers to catch up when we are downloading data. | ||
_wait_for_disk_usage_higher_than_threshold(25) | ||
|
||
# 7. Download all the required paths to unblock the current index | ||
for path in paths: | ||
local_path = path.replace(input_dir.path, cache_dir) | ||
|
||
|
@@ -141,7 +155,7 @@ def _download_data_target(input_dir: Dir, cache_dir: str, queue_in: Queue, queue | |
s3.client.download_fileobj(obj.netloc, obj.path.lstrip("/"), f) | ||
|
||
elif os.path.isfile(path): | ||
copyfile(path, local_path) | ||
shutil.copyfile(path, local_path) | ||
else: | ||
raise ValueError(f"The provided {input_dir.url} isn't supported.") | ||
|
||
|
@@ -198,7 +212,7 @@ def _upload_fn(upload_queue: Queue, remove_queue: Queue, cache_dir: str, output_ | |
except Exception as e: | ||
print(e) | ||
elif os.path.isdir(output_dir.path): | ||
copyfile(local_filepath, os.path.join(output_dir.path, os.path.basename(local_filepath))) | ||
shutil.copyfile(local_filepath, os.path.join(output_dir.path, os.path.basename(local_filepath))) | ||
else: | ||
raise ValueError(f"The provided {output_dir.path} isn't supported.") | ||
|
||
|
@@ -686,7 +700,7 @@ def _upload_index(self, output_dir: Dir, cache_dir: str, num_nodes: int, node_ra | |
local_filepath, obj.netloc, os.path.join(obj.path.lstrip("/"), os.path.basename(local_filepath)) | ||
) | ||
elif os.path.isdir(output_dir.path): | ||
copyfile(local_filepath, os.path.join(output_dir.path, os.path.basename(local_filepath))) | ||
shutil.copyfile(local_filepath, os.path.join(output_dir.path, os.path.basename(local_filepath))) | ||
|
||
if num_nodes == 1 or node_rank is None: | ||
return | ||
|
@@ -707,7 +721,7 @@ def _upload_index(self, output_dir: Dir, cache_dir: str, num_nodes: int, node_ra | |
with open(node_index_filepath, "wb") as f: | ||
s3.client.download_fileobj(obj.netloc, obj.path.lstrip("/"), f) | ||
elif os.path.isdir(output_dir.path): | ||
copyfile(remote_filepath, node_index_filepath) | ||
shutil.copyfile(remote_filepath, node_index_filepath) | ||
|
||
merge_cache = Cache(cache_dir, chunk_bytes=1) | ||
merge_cache._merge_no_wait() | ||
|
@@ -948,15 +962,15 @@ def _cleanup_cache(self) -> None: | |
|
||
# Cleanup the cache dir folder to avoid corrupted files from previous run to be there. | ||
if os.path.exists(cache_dir): | ||
rmtree(cache_dir, ignore_errors=True) | ||
shutil.rmtree(cache_dir, ignore_errors=True) | ||
|
||
os.makedirs(cache_dir, exist_ok=True) | ||
|
||
cache_data_dir = _get_cache_data_dir() | ||
|
||
# Cleanup the cache data folder to avoid corrupted files from previous run to be there. | ||
if os.path.exists(cache_data_dir): | ||
rmtree(cache_data_dir, ignore_errors=True) | ||
shutil.rmtree(cache_data_dir, ignore_errors=True) | ||
|
||
os.makedirs(cache_data_dir, exist_ok=True) | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.