// SPDX-FileCopyrightText: edef // SPDX-License-Identifier: OSL-3.0 use std::{ sync::{Arc, Condvar, Mutex}, thread::{self, JoinHandle}, }; #[derive(Debug)] pub struct Transactor { link: Arc, syncer: Option>, } impl Transactor { pub fn new(tail: u64, f: impl FnMut(&State) + Send + 'static) -> Arc { let link = Link::new(tail); let syncer = { let link = link.clone(); thread::spawn(move || link.run(f)) }; Arc::new(Self { link, syncer: Some(syncer), }) } pub fn write(self: &Arc, tx: u64, tail: u64) { self.link.write(tx, tail) } pub fn wait(self: &Arc, tx: u64) { self.link.wait(tx) } } impl Drop for Transactor { fn drop(&mut self) { if let Some(syncer) = self.syncer.take() { self.link.close(); syncer.join().unwrap(); } } } #[derive(Debug)] struct Link { state: Mutex, written: Condvar, synced: Condvar, } #[derive(Debug, Clone)] pub struct State { pub synced_tx: u64, pub written_tx: u64, pub written_tail: u64, } impl Link { fn new(tail: u64) -> Arc { Arc::new(Self { state: Mutex::new(State { synced_tx: 0, written_tx: 0, written_tail: tail, }), written: Condvar::new(), synced: Condvar::new(), }) } fn write(self: &Arc, tx: u64, tail: u64) { { let mut state = self.state.lock().unwrap(); assert!(state.written_tx < !0, "already closed"); assert!(state.written_tx < tx, "duplicate transaction ID"); state.written_tx = tx; state.written_tail = tail; } self.written.notify_one(); } fn close(self: &Arc) { { let mut state = self.state.lock().unwrap(); state.written_tx = !0; } self.written.notify_one(); } fn wait(self: &Arc, tx: u64) { let state = self.state.lock().unwrap(); drop( self.synced .wait_while(state, |state| state.synced_tx < tx) .unwrap(), ); } fn run(self: &Arc, mut f: impl FnMut(&State)) { let mut state = self.state.lock().unwrap(); while state.synced_tx < !0 { state = self .written .wait_while(state, |state| state.synced_tx == state.written_tx) .unwrap(); let state_seen = state.clone(); drop(state); f(&state_seen); state = self.state.lock().unwrap(); state.synced_tx = state_seen.written_tx; self.synced.notify_all(); } } }