summary refs log tree commit diff
path: root/ripple/fossil/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ripple/fossil/src/lib.rs')
-rw-r--r--ripple/fossil/src/lib.rs111
1 files changed, 95 insertions, 16 deletions
diff --git a/ripple/fossil/src/lib.rs b/ripple/fossil/src/lib.rs
index 6420437..af931b0 100644
--- a/ripple/fossil/src/lib.rs
+++ b/ripple/fossil/src/lib.rs
@@ -3,6 +3,7 @@
 
 pub use crate::chunker::Chunker;
 use {
+	crate::transactor::Transactor,
 	anyhow::{anyhow, bail, Context, Result},
 	byteorder::{BigEndian, ByteOrder},
 	prost::Message,
@@ -15,6 +16,7 @@ use {
 		os::unix::prelude::*,
 		path::Path,
 		str,
+		sync::Arc,
 	},
 };
 
@@ -23,15 +25,19 @@ pub mod store {
 }
 
 mod chunker;
+mod transactor;
 
 const DIGEST_BYTES: usize = blake3::OUT_LEN;
 
 pub struct Store {
-	meta: sled::Tree,
 	blobs: sled::Tree,
+	blobs_dirty: sled::Tree,
 	chunks: sled::Tree,
+	chunks_dirty: sled::Tree,
 	chunks_file: RefCell<fs::File>,
 	chunks_tail: Cell<u64>,
+	transactor: Arc<Transactor>,
+	last_tx_id: Cell<u64>,
 }
 
 impl Store {
@@ -41,7 +47,9 @@ impl Store {
 		let db = sled::open(path)?;
 		let meta = (&*db).clone();
 		let blobs = db.open_tree("blobs")?;
+		let blobs_dirty = db.open_tree("blobs_dirty")?;
 		let chunks = db.open_tree("chunks")?;
+		let chunks_dirty = db.open_tree("chunks_dirty")?;
 
 		let chunks_file = fs::OpenOptions::new()
 			.read(true)
@@ -54,25 +62,79 @@ impl Store {
 			.map(|v| BigEndian::read_u64(&v))
 			.unwrap_or_default();
 
-		chunks_file.set_len(chunks_tail)?;
+		// clear out dirty state after an unclean shutdown
+		{
+			chunks_file.set_len(chunks_tail)?;
+
+			for blob in &blobs_dirty {
+				let (tx, blob) = blob?;
+				blobs.remove(blob)?;
+				blobs_dirty.remove(tx)?;
+			}
+
+			for chunk in &chunks_dirty {
+				let (tx, chunk) = chunk?;
+				chunks.remove(chunk)?;
+				chunks_dirty.remove(tx)?;
+			}
+		}
+
+		let transactor = {
+			let chunks_file = chunks_file.try_clone()?;
+
+			let blobs_dirty = blobs_dirty.clone();
+			let chunks_dirty = chunks_dirty.clone();
+
+			Transactor::new(chunks_tail, move |state| {
+				chunks_file.sync_all().unwrap();
+
+				let written_tx = state.written_tx.to_be_bytes();
+
+				let mut blobs_batch = sled::Batch::default();
+				for tx in blobs_dirty.range(..=written_tx) {
+					let (tx, _) = tx.unwrap();
+					blobs_batch.remove(tx);
+				}
+
+				let mut chunks_batch = sled::Batch::default();
+				for tx in chunks_dirty.range(..=written_tx) {
+					let (tx, _) = tx.unwrap();
+					chunks_batch.remove(tx);
+				}
+
+				(&blobs_dirty, &chunks_dirty, &meta)
+					.transaction(|(blobs_dirty, chunks_dirty, meta)| {
+						blobs_dirty.apply_batch(&blobs_batch)?;
+						chunks_dirty.apply_batch(&chunks_batch)?;
+						meta.insert("chunks_tail", &state.written_tail.to_be_bytes())?;
+						Ok::<_, ConflictableTransactionError>(())
+					})
+					.unwrap();
+
+				db.flush().unwrap();
+			})
+		};
 
 		Ok(Store {
 			blobs,
-			meta,
+			blobs_dirty,
 			chunks,
+			chunks_dirty,
 			chunks_file: RefCell::new(chunks_file),
 			chunks_tail: Cell::new(chunks_tail),
+			transactor,
+			last_tx_id: Cell::new(0),
 		})
 	}
 
+	fn next_tx_id(&self) -> u64 {
+		let id = self.last_tx_id.get() + 1;
+		self.last_tx_id.set(id);
+		id
+	}
+
 	fn flush(&self) {
-		// NOTE: sled *can* flush without us explicitly asking for it, so it's
-		// possible for the store to end up containing pointers to chunks that
-		// aren't fsynced yet. The easiest fix is to always `chunks_file.sync_data()`
-		// before we write anything to the database, but that's kind of a performance hazard.
-		// TODO(edef): keep pending and known-durable blobs/chunks separate in the database
-		self.chunks_file.borrow_mut().sync_data().unwrap();
-		self.meta.flush().unwrap();
+		self.transactor.wait(self.last_tx_id.get());
 	}
 
 	pub fn add_git_tree(&self, repo: &git2::Repository, tree: git2::Oid) -> DirectoryRef {
@@ -209,11 +271,19 @@ impl Store {
 	fn write_blob_inner(&self, ident: &Digest, outboard: Vec<u8>, data: &[u8]) {
 		let mut chunks_file = self.chunks_file.borrow_mut();
 		let mut offset = self.chunks_tail.get();
+
 		let mut batch = sled::Batch::default();
+		let mut batch_dirty = sled::Batch::default();
 
 		let chunks = Chunker::from(data)
 			.map(|chunk_data| {
-				self.write_chunk(&mut chunks_file, &mut offset, &mut batch, chunk_data)
+				self.write_chunk(
+					&mut chunks_file,
+					&mut offset,
+					&mut batch,
+					&mut batch_dirty,
+					chunk_data,
+				)
 			})
 			.collect::<Vec<_>>();
 
@@ -223,17 +293,24 @@ impl Store {
 		}
 		.encode_to_vec();
 
-		let chunks_tail_buf = offset.to_be_bytes();
-
-		(&self.blobs, &self.chunks, &self.meta)
-			.transaction(|(blobs, chunks, meta)| {
+		(
+			&self.blobs,
+			&self.blobs_dirty,
+			&self.chunks,
+			&self.chunks_dirty,
+		)
+			.transaction(|(blobs, blobs_dirty, chunks, chunks_dirty)| {
 				chunks.apply_batch(&batch)?;
+				chunks_dirty.apply_batch(&batch_dirty)?;
 				blobs.insert(&*ident.as_bytes(), &*blob_buf)?;
-				meta.insert("chunks_tail", &chunks_tail_buf)?;
+				blobs_dirty.insert(&self.next_tx_id().to_be_bytes(), &*ident.as_bytes())?;
 				Ok::<_, ConflictableTransactionError>(())
 			})
 			.unwrap();
+
 		self.chunks_tail.set(offset);
+		self.transactor
+			.write(self.last_tx_id.get(), self.chunks_tail.get());
 	}
 
 	fn write_chunk(
@@ -241,6 +318,7 @@ impl Store {
 		chunks_file: &mut fs::File,
 		offset: &mut u64,
 		batch: &mut sled::Batch,
+		batch_dirty: &mut sled::Batch,
 		data: &[u8],
 	) -> store::Chunk {
 		let ident = blake3::hash(data);
@@ -256,6 +334,7 @@ impl Store {
 		*offset += data.len() as u64;
 
 		batch.insert(ident.as_bytes(), chunk.encode_to_vec());
+		batch_dirty.insert(&self.next_tx_id().to_be_bytes(), ident.as_bytes());
 		chunk
 	}