summary refs log tree commit diff
path: root/ripple/fossil/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ripple/fossil/src/lib.rs')
-rw-r--r--ripple/fossil/src/lib.rs112
1 files changed, 87 insertions, 25 deletions
diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs
index 3b2b3f8..4cea16a 100644
--- a/ripple/fossil/src/lib.rs
+++ b/ripple/fossil/src/lib.rs
@@ -2,6 +2,7 @@
 // SPDX-License-Identifier: OSL-3.0
 
 use {
+	crate::chunker::Chunker,
 	anyhow::{Context, Result},
 	byteorder::{BigEndian, ByteOrder},
 	prost::Message,
@@ -20,13 +21,15 @@ pub mod store {
 	include!(concat!(env!("OUT_DIR"), "/fossil.store.rs"));
 }
 
-const CHUNK_BYTES: usize = 0x400;
+mod chunker;
+
 const DIGEST_BYTES: usize = blake3::OUT_LEN;
 
 pub struct Store {
 	meta: sled::Tree,
 	blobs: sled::Tree,
-	chunks: RefCell<fs::File>,
+	chunks: sled::Tree,
+	chunks_file: RefCell<fs::File>,
 	chunks_tail: Cell<u64>,
 }
 
@@ -37,8 +40,9 @@ impl Store {
 		let db = sled::open(path)?;
 		let meta = (&*db).clone();
 		let blobs = db.open_tree("blobs")?;
+		let chunks = db.open_tree("chunks")?;
 
-		let chunks = fs::OpenOptions::new()
+		let chunks_file = fs::OpenOptions::new()
 			.read(true)
 			.append(true)
 			.create(true)
@@ -49,12 +53,13 @@ impl Store {
 			.map(|v| BigEndian::read_u64(&v))
 			.unwrap_or_default();
 
-		chunks.set_len(chunks_tail)?;
+		chunks_file.set_len(chunks_tail)?;
 
 		Ok(Store {
 			blobs,
 			meta,
-			chunks: RefCell::new(chunks),
+			chunks,
+			chunks_file: RefCell::new(chunks_file),
 			chunks_tail: Cell::new(chunks_tail),
 		})
 	}
@@ -124,34 +129,66 @@ impl Store {
 	}
 
 	fn write_blob_inner(&self, ident: &Digest, outboard: Vec<u8>, data: &[u8]) {
-		let mut chunks_file = self.chunks.borrow_mut();
-		let offset = self.chunks_tail.get();
+		let mut chunks_file = self.chunks_file.borrow_mut();
+		let mut offset = self.chunks_tail.get();
+		let mut batch = sled::Batch::default();
 
-		chunks_file.write_all(data).unwrap();
-		let chunks_tail = offset + data.len() as u64;
+		let chunks = Chunker::from(data)
+			.map(|chunk_data| {
+				self.write_chunk(&mut chunks_file, &mut offset, &mut batch, chunk_data)
+			})
+			.collect::<Vec<_>>();
 
 		let blob_buf = store::Blob {
-			offset,
-			length: data.len() as u64,
+			chunks,
 			bao_inline: outboard,
 		}
 		.encode_to_vec();
 
 		let chunks_tail_buf = {
 			let mut buf = [0u8; 8];
-			BigEndian::write_u64(&mut buf, chunks_tail);
+			BigEndian::write_u64(&mut buf, offset);
 			buf
 		};
 
 		// TODO(edef): figure out fsync for durability
-		(&self.blobs, &self.meta)
-			.transaction(|(blobs, meta)| {
+		(&self.blobs, &self.chunks, &self.meta)
+			.transaction(|(blobs, chunks, meta)| {
+				chunks.apply_batch(&batch)?;
 				blobs.insert(&*ident.as_bytes(), &*blob_buf)?;
 				meta.insert("chunks_tail", &chunks_tail_buf)?;
 				Ok::<_, ConflictableTransactionError>(())
 			})
 			.unwrap();
-		self.chunks_tail.set(chunks_tail);
+		self.chunks_tail.set(offset);
+	}
+
+	fn write_chunk(
+		&self,
+		chunks_file: &mut fs::File,
+		offset: &mut u64,
+		batch: &mut sled::Batch,
+		data: &[u8],
+	) -> store::Chunk {
+		let ident = blake3::hash(data);
+		if let Some(chunk) = self.get_chunk(&ident) {
+			return chunk;
+		}
+
+		chunks_file.write_all(data).unwrap();
+		let chunk = store::Chunk {
+			offset: *offset,
+			length: data.len() as u32,
+		};
+		*offset += data.len() as u64;
+
+		batch.insert(ident.as_bytes(), chunk.encode_to_vec());
+		chunk
+	}
+
+	fn get_chunk(&self, ident: &Digest) -> Option<store::Chunk> {
+		let buf = self.chunks.get(&*ident.as_bytes()).unwrap()?;
+		Some(store::Chunk::decode(&*buf).unwrap())
 	}
 
 	pub fn read_blob(&self, ident: Digest) -> Vec<u8> {
@@ -168,15 +205,31 @@ impl Store {
 			.expect("blob not found");
 
 		let store::Blob {
-			offset,
-			length,
+			mut chunks,
 			bao_inline,
 		} = store::Blob::decode(&*buf).unwrap();
 
+		let mut blob_length: u64 = 0;
+		let chunks = chunks
+			.drain(..)
+			.map(|chunk| {
+				let chunk_offset = blob_length;
+				blob_length += chunk.length as u64;
+				(
+					chunk_offset,
+					Slice {
+						offset: chunk.offset,
+						length: chunk.length,
+					},
+				)
+			})
+			.collect();
+
 		Blob(bao::decode::Decoder::new_outboard(
 			RawBlob {
 				store: self,
-				slice: Slice { offset, length },
+				chunks,
+				length: blob_length,
 				position: 0,
 			},
 			io::Cursor::new(bao_inline),
@@ -211,29 +264,38 @@ impl io::Seek for Blob<'_> {
 #[derive(Debug)]
 struct Slice {
 	offset: u64,
-	length: u64,
+	length: u32,
 }
 
 struct RawBlob<'a> {
 	store: &'a Store,
-	slice: Slice,
+	chunks: BTreeMap<u64, Slice>,
+	length: u64,
 	position: u64,
 }
 
 impl io::Read for RawBlob<'_> {
 	fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+		let (&chunk_offset, chunk_slice) =
+			if let Some(entry) = self.chunks.range(..=self.position).next_back() {
+				entry
+			} else {
+				// empty blob
+				return Ok(0);
+			};
+
 		let prev_pos = self.position;
 		let next_pos = Ord::min(
 			self.position.saturating_add(dst.len() as u64),
-			self.slice.length,
+			chunk_offset + chunk_slice.length as u64,
 		);
 
 		let len = (next_pos - prev_pos) as usize;
 		let dst = &mut dst[..len];
 
-		let offset = self.slice.offset + prev_pos;
+		let offset = prev_pos - chunk_offset + chunk_slice.offset;
 		self.store
-			.chunks
+			.chunks_file
 			.borrow()
 			.read_exact_at(dst, offset)
 			.context("Couldn't read blob data")
@@ -257,12 +319,12 @@ impl io::Seek for RawBlob<'_> {
 	fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
 		let pos = match pos {
 			io::SeekFrom::Start(n) => Some(n),
-			io::SeekFrom::End(n) => checked_add_signed(self.slice.length, n),
+			io::SeekFrom::End(n) => checked_add_signed(self.length, n),
 			io::SeekFrom::Current(n) => checked_add_signed(self.position, n),
 		};
 
 		match pos {
-			Some(n) if n <= self.slice.length => {
+			Some(n) if n <= self.length => {
 				self.position = n;
 				Ok(self.position)
 			}