summary refs log tree commit diff
path: root/ripple/fossil/src/transactor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ripple/fossil/src/transactor.rs')
-rw-r--r--ripple/fossil/src/transactor.rs127
1 files changed, 127 insertions, 0 deletions
diff --git a/ripple/fossil/src/transactor.rs b/ripple/fossil/src/transactor.rs
new file mode 100644
index 0000000..f13e63b
--- /dev/null
+++ b/ripple/fossil/src/transactor.rs
@@ -0,0 +1,127 @@
+// SPDX-FileCopyrightText: edef <edef@unfathomable.blue>
+// SPDX-License-Identifier: OSL-3.0
+
+use std::{
+	sync::{Arc, Condvar, Mutex},
+	thread::{self, JoinHandle},
+};
+
+#[derive(Debug)]
+pub struct Transactor {
+	link: Arc<Link>,
+	syncer: Option<JoinHandle<()>>,
+}
+
+impl Transactor {
+	pub fn new(tail: u64, f: impl FnMut(&State) + Send + 'static) -> Arc<Self> {
+		let link = Link::new(tail);
+
+		let syncer = {
+			let link = link.clone();
+			thread::spawn(move || link.run(f))
+		};
+
+		Arc::new(Self {
+			link,
+			syncer: Some(syncer),
+		})
+	}
+
+	pub fn write(self: &Arc<Self>, tx: u64, tail: u64) {
+		self.link.write(tx, tail)
+	}
+
+	pub fn wait(self: &Arc<Self>, tx: u64) {
+		self.link.wait(tx)
+	}
+}
+
+impl Drop for Transactor {
+	fn drop(&mut self) {
+		if let Some(syncer) = self.syncer.take() {
+			self.link.close();
+			syncer.join().unwrap();
+		}
+	}
+}
+
+#[derive(Debug)]
+struct Link {
+	state: Mutex<State>,
+	written: Condvar,
+	synced: Condvar,
+}
+
+#[derive(Debug, Clone)]
+pub struct State {
+	pub synced_tx: u64,
+	pub written_tx: u64,
+	pub written_tail: u64,
+}
+
+impl Link {
+	fn new(tail: u64) -> Arc<Self> {
+		Arc::new(Self {
+			state: Mutex::new(State {
+				synced_tx: 0,
+				written_tx: 0,
+				written_tail: tail,
+			}),
+			written: Condvar::new(),
+			synced: Condvar::new(),
+		})
+	}
+
+	fn write(self: &Arc<Self>, tx: u64, tail: u64) {
+		{
+			let mut state = self.state.lock().unwrap();
+			assert!(state.written_tx < !0, "already closed");
+			assert!(state.written_tx < tx, "duplicate transaction ID");
+
+			state.written_tx = tx;
+			state.written_tail = tail;
+		}
+
+		self.written.notify_one();
+	}
+
+	fn close(self: &Arc<Self>) {
+		{
+			let mut state = self.state.lock().unwrap();
+			state.written_tx = !0;
+		}
+
+		self.written.notify_one();
+	}
+
+	fn wait(self: &Arc<Self>, tx: u64) {
+		let state = self.state.lock().unwrap();
+
+		drop(
+			self.synced
+				.wait_while(state, |state| state.synced_tx < tx)
+				.unwrap(),
+		);
+	}
+
+	fn run(self: &Arc<Self>, mut f: impl FnMut(&State)) {
+		let mut state = self.state.lock().unwrap();
+
+		while state.synced_tx < !0 {
+			state = self
+				.written
+				.wait_while(state, |state| state.synced_tx == state.written_tx)
+				.unwrap();
+
+			let state_seen = state.clone();
+			drop(state);
+
+			f(&state_seen);
+
+			state = self.state.lock().unwrap();
+			state.synced_tx = state_seen.written_tx;
+
+			self.synced.notify_all();
+		}
+	}
+}