summary refs log tree commit diff
path: root/ripple
diff options
context:
space:
mode:
authoredef <edef@unfathomable.blue>2022-04-19 00:57:34 +0000
committeredef <edef@unfathomable.blue>2022-04-19 01:18:15 +0000
commit7831d9e831c923bf4577da714b220948e4295d34 (patch)
treea8e9595d4c6e64626099d86862489a4db69c47aa /ripple
parent78c53cf327090dbfe70c9e73001dfde3bae8ddc5 (diff)
ripple/fossil: prepare for seekable, streaming blob reading
This implements blob reading in terms of RawBlob, a fairly naive
streaming blob reader. For now, we still only use it for simple
one-shot reads.

Change-Id: Iecd4f926412b474ca6f3dde8c6055c0c3781301f
Diffstat (limited to 'ripple')
-rw-r--r--ripple/fossil/src/lib.rs105
1 files changed, 89 insertions, 16 deletions
diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs
index d7818cd..e65253b 100644
--- a/ripple/fossil/src/lib.rs
+++ b/ripple/fossil/src/lib.rs
@@ -4,7 +4,13 @@
 use {
 	byteorder::{BigEndian, ByteOrder},
 	prost::Message,
-	std::{collections::BTreeMap, fs, io, os::unix::fs::PermissionsExt, path::Path},
+	std::{
+		collections::BTreeMap,
+		fs,
+		io::{self, BufRead, Read},
+		os::unix::fs::PermissionsExt,
+		path::Path,
+	},
 };
 
 pub mod store {
@@ -75,10 +81,10 @@ impl Store {
 	}
 
 	fn write_blob(&self, data: &[u8]) -> Digest {
-		let digest = {
+		let ident = {
 			let mut h = blake3::Hasher::new();
 			h.update_with_join::<blake3::join::RayonJoin>(data);
-			*h.finalize().as_bytes()
+			h.finalize()
 		};
 
 		// TODO(edef): maybe don't use the default tree?
@@ -88,28 +94,26 @@ impl Store {
 		self.db
 			.transaction::<_, _, sled::Error>(|db| {
 				for (n, chunk) in data.chunks(CHUNK_BYTES).enumerate() {
-					let mut key = [0u8; DIGEST_BYTES + OFFSET_BYTES];
-					key[..DIGEST_BYTES].copy_from_slice(&digest);
-					BigEndian::write_u32(&mut key[DIGEST_BYTES..], n as u32);
-					db.insert(&key[..], chunk)?;
+					db.insert(chunk_key(&ident, n as u32).as_slice(), chunk)?;
 				}
 				Ok(())
 			})
 			.unwrap();
 
-		digest.into()
+		ident.into()
 	}
 
-	pub fn read_blob(&self, r#ref: Digest) -> Vec<u8> {
+	pub fn read_blob(&self, ident: Digest) -> Vec<u8> {
 		let mut buffer = Vec::new();
-		let mut h = blake3::Hasher::new();
-		for element in self.db.scan_prefix(r#ref.as_bytes()) {
-			let (_, chunk) = element.unwrap();
-			h.update(&chunk);
-			buffer.extend_from_slice(&chunk);
-		}
+		self.raw_blob(ident).read_to_end(&mut buffer).unwrap();
+
+		let computed_ident = {
+			let mut h = blake3::Hasher::new();
+			h.update_with_join::<blake3::join::RayonJoin>(&buffer);
+			h.finalize()
+		};
 
-		if h.finalize() != r#ref {
+		if computed_ident != ident {
 			if buffer.is_empty() {
 				panic!("blob not found");
 			}
@@ -118,6 +122,75 @@ impl Store {
 
 		buffer
 	}
+
+	fn raw_blob(&self, ident: Digest) -> RawBlob<'_> {
+		RawBlob {
+			store: self,
+			ident,
+			buf: None,
+			off: 0,
+		}
+	}
+}
+
+fn chunk_key(ident: &Digest, chunk: u32) -> [u8; DIGEST_BYTES + OFFSET_BYTES] {
+	let mut key = [0u8; DIGEST_BYTES + OFFSET_BYTES];
+	key[..DIGEST_BYTES].copy_from_slice(ident.as_bytes());
+	BigEndian::write_u32(&mut key[DIGEST_BYTES..], chunk as u32);
+	key
+}
+
+fn chunk_id(offset: u64) -> u32 {
+	(offset / CHUNK_BYTES as u64).try_into().unwrap()
+}
+
+struct RawBlob<'a> {
+	store: &'a Store,
+	ident: Digest,
+	/// current chunk
+	buf: Option<sled::IVec>,
+	/// reader offset
+	/// LSBs are intra-chunk, MSBs are chunk number
+	off: u64,
+}
+
+impl io::BufRead for RawBlob<'_> {
+	fn fill_buf(&mut self) -> io::Result<&[u8]> {
+		let buf = match self.buf {
+			Some(ref buf) => buf,
+			None => {
+				let chunk = chunk_id(self.off);
+				match self.store.db.get(chunk_key(&self.ident, chunk))? {
+					None => return Ok(&[]),
+					Some(contents) => self.buf.insert(contents),
+				}
+			}
+		};
+
+		let off = (self.off % CHUNK_BYTES as u64) as usize;
+		Ok(buf.get(off..).unwrap_or_default())
+	}
+
+	fn consume(&mut self, amt: usize) {
+		let prev_offset = self.off;
+		let next_offset = self.off.saturating_add(amt as u64);
+
+		if chunk_id(next_offset) != chunk_id(prev_offset) {
+			self.buf.take();
+		}
+
+		self.off = next_offset;
+	}
+}
+
+impl io::Read for RawBlob<'_> {
+	fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+		let src = self.fill_buf()?;
+		let len = Ord::min(src.len(), dst.len());
+		dst[..len].copy_from_slice(&src[..len]);
+		self.consume(len);
+		Ok(len)
+	}
 }
 
 pub type Digest = blake3::Hash;