Skip to content

Commit 7d2ad9f

Browse files
committed
implement External DataLoader
1 parent 60df8f9 commit 7d2ad9f

File tree

3 files changed

+199
-1
lines changed

3 files changed

+199
-1
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
use std::io::Read;
2+
3+
use once_cell::sync::Lazy;
4+
5+
/// To register a new external data loader, simply add an executable in your $PATH whose name
6+
/// starts with this prefix.
7+
pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader";
8+
9+
/// Keeps track of the paths all external executable [`crate::DataLoader`]s.
10+
///
11+
/// Lazy initialized the first time a file is opened by running a full scan of the `$PATH`.
12+
///
13+
/// External loaders are _not_ registered on a per-extension basis: we want users to be able to
14+
/// filter data on a much more fine-grained basis that just file extensions (e.g. checking the file
15+
/// itself for magic bytes).
16+
pub static EXTERNAL_LOADER_PATHS: Lazy<Vec<std::path::PathBuf>> = Lazy::new(|| {
17+
re_tracing::profile_function!();
18+
19+
use walkdir::WalkDir;
20+
21+
let dirpaths = std::env::var("PATH")
22+
.ok()
23+
.into_iter()
24+
.flat_map(|paths| paths.split(':').map(ToOwned::to_owned).collect::<Vec<_>>())
25+
.map(std::path::PathBuf::from);
26+
27+
let executables: ahash::HashSet<_> = dirpaths
28+
.into_iter()
29+
.flat_map(|dirpath| {
30+
WalkDir::new(dirpath).into_iter().filter_map(|entry| {
31+
let Ok(entry) = entry else {
32+
return None;
33+
};
34+
let filepath = entry.path();
35+
let is_rerun_loader = filepath.file_name().map_or(false, |filename| {
36+
filename
37+
.to_string_lossy()
38+
.starts_with(EXTERNAL_DATA_LOADER_PREFIX)
39+
});
40+
(filepath.is_file() && is_rerun_loader).then(|| filepath.to_owned())
41+
})
42+
})
43+
.collect();
44+
45+
// NOTE: We call all available loaders and do so in parallel: order is irrelevant here.
46+
executables.into_iter().collect()
47+
});
48+
49+
/// Iterator over all registered external [`DataLoader`]s.
50+
#[inline]
51+
pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = std::path::PathBuf> {
52+
EXTERNAL_LOADER_PATHS.iter().cloned()
53+
}
54+
55+
// ---
56+
57+
/// A [`crate::DataLoader`] that forwards the path to load to all executables present in
58+
/// the user's `PATH` with name a name that starts with [`EXTERNAL_DATA_LOADER_PREFIX`].
59+
///
60+
/// The external loaders are expected to log rrd data to their standard output.
61+
///
62+
/// Refer to our `external_data_loader` example for more information.
63+
pub struct ExternalDataLoader;
64+
65+
impl crate::DataLoader for ExternalDataLoader {
66+
#[inline]
67+
fn name(&self) -> String {
68+
"rerun.data_loaders.External".into()
69+
}
70+
71+
fn load_from_file(
72+
&self,
73+
store_id: re_log_types::StoreId,
74+
filepath: std::path::PathBuf,
75+
tx: std::sync::mpsc::Sender<crate::LoadedData>,
76+
) -> Result<(), crate::DataLoaderError> {
77+
use std::process::{Command, Stdio};
78+
79+
re_tracing::profile_function!(filepath.display().to_string());
80+
81+
for exe in EXTERNAL_LOADER_PATHS.iter() {
82+
let store_id = store_id.clone();
83+
let filepath = filepath.clone();
84+
let tx = tx.clone();
85+
86+
// NOTE: spawn is fine, the entire loader is native-only.
87+
rayon::spawn(move || {
88+
let child = Command::new(exe)
89+
.arg(filepath.clone())
90+
.args(["--recording-id".to_owned(), store_id.to_string()])
91+
.stdout(Stdio::piped())
92+
.stderr(Stdio::piped())
93+
.spawn();
94+
95+
let mut child = match child {
96+
Ok(child) => child,
97+
Err(err) => {
98+
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
99+
return;
100+
}
101+
};
102+
103+
let Some(stdout) = child.stdout.take() else {
104+
let reason = "stdout unreachable";
105+
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
106+
return;
107+
};
108+
let Some(stderr) = child.stderr.take() else {
109+
let reason = "stderr unreachable";
110+
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
111+
return;
112+
};
113+
114+
re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);
115+
116+
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
117+
let stdout = std::io::BufReader::new(stdout);
118+
let decoder = match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
119+
Ok(decoder) => decoder,
120+
Err(err) => {
121+
re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
122+
return;
123+
}
124+
};
125+
126+
decode_and_stream(&filepath, &tx, decoder);
127+
128+
let status = match child.wait() {
129+
Ok(output) => output,
130+
Err(err) => {
131+
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
132+
return;
133+
}
134+
};
135+
136+
if !status.success() {
137+
let mut stderr = std::io::BufReader::new(stderr);
138+
let mut reason = String::new();
139+
stderr.read_to_string(&mut reason).ok();
140+
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
141+
}
142+
});
143+
}
144+
145+
Ok(())
146+
}
147+
148+
#[inline]
149+
fn load_from_file_contents(
150+
&self,
151+
_store_id: re_log_types::StoreId,
152+
_path: std::path::PathBuf,
153+
_contents: std::borrow::Cow<'_, [u8]>,
154+
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
155+
) -> Result<(), crate::DataLoaderError> {
156+
// TODO(cmc): You could imagine a world where plugins can be streamed rrd data via their
157+
// standard input… but today is not world.
158+
Ok(()) // simply not interested
159+
}
160+
}
161+
162+
fn decode_and_stream<R: std::io::Read>(
163+
filepath: &std::path::Path,
164+
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
165+
decoder: re_log_encoding::decoder::Decoder<R>,
166+
) {
167+
re_tracing::profile_function!(filepath.display().to_string());
168+
169+
for msg in decoder {
170+
let msg = match msg {
171+
Ok(msg) => msg,
172+
Err(err) => {
173+
re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
174+
continue;
175+
}
176+
};
177+
if tx.send(msg.into()).is_err() {
178+
break; // The other end has decided to hang up, not our problem.
179+
}
180+
}
181+
}

crates/re_data_source/src/data_loader/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ static BUILTIN_LOADERS: Lazy<Vec<Arc<dyn DataLoader>>> = Lazy::new(|| {
162162
Arc::new(RrdLoader) as Arc<dyn DataLoader>,
163163
Arc::new(ArchetypeLoader),
164164
Arc::new(DirectoryLoader),
165+
#[cfg(not(target_arch = "wasm32"))]
166+
Arc::new(ExternalDataLoader),
165167
]
166168
});
167169

@@ -177,6 +179,14 @@ mod loader_archetype;
177179
mod loader_directory;
178180
mod loader_rrd;
179181

182+
#[cfg(not(target_arch = "wasm32"))]
183+
mod loader_external;
184+
180185
pub use self::loader_archetype::ArchetypeLoader;
181186
pub use self::loader_directory::DirectoryLoader;
182187
pub use self::loader_rrd::RrdLoader;
188+
189+
#[cfg(not(target_arch = "wasm32"))]
190+
pub(crate) use self::loader_external::EXTERNAL_LOADER_PATHS;
191+
#[cfg(not(target_arch = "wasm32"))]
192+
pub use self::loader_external::{ExternalDataLoader, EXTERNAL_DATA_LOADER_PREFIX};

crates/re_data_source/src/load_file.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,17 @@ pub(crate) fn load(
137137
is_dir: bool,
138138
contents: Option<std::borrow::Cow<'_, [u8]>>,
139139
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
140+
#[cfg(target_arch = "wasm32")]
141+
let no_external_loaders = true;
142+
#[cfg(not(target_arch = "wasm32"))]
143+
let no_external_loaders = crate::data_loader::EXTERNAL_LOADER_PATHS.is_empty();
144+
140145
let extension = extension(path);
141146
let is_builtin = is_builtin(path, is_dir);
142147

143-
if !is_builtin {
148+
// If there are no external loaders registered (which is always the case on wasm) and we don't
149+
// have a builtin loader for it, then we know for a fact that we won't be able to load it.
150+
if !is_builtin && no_external_loaders {
144151
return if extension.is_empty() {
145152
Err(anyhow::anyhow!("files without extensions (file.XXX) are not supported").into())
146153
} else {

0 commit comments

Comments
 (0)