summary refs log tree commit diff
path: root/ripple/fossil/src/transactor.rs
blob: f13e63bb9d000b347a44be31299230ab79e8b619 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// SPDX-FileCopyrightText: edef <edef@unfathomable.blue>
// SPDX-License-Identifier: OSL-3.0

use std::{
	sync::{Arc, Condvar, Mutex},
	thread::{self, JoinHandle},
};

#[derive(Debug)]
pub struct Transactor {
	link: Arc<Link>,
	syncer: Option<JoinHandle<()>>,
}

impl Transactor {
	pub fn new(tail: u64, f: impl FnMut(&State) + Send + 'static) -> Arc<Self> {
		let link = Link::new(tail);

		let syncer = {
			let link = link.clone();
			thread::spawn(move || link.run(f))
		};

		Arc::new(Self {
			link,
			syncer: Some(syncer),
		})
	}

	pub fn write(self: &Arc<Self>, tx: u64, tail: u64) {
		self.link.write(tx, tail)
	}

	pub fn wait(self: &Arc<Self>, tx: u64) {
		self.link.wait(tx)
	}
}

impl Drop for Transactor {
	fn drop(&mut self) {
		if let Some(syncer) = self.syncer.take() {
			self.link.close();
			syncer.join().unwrap();
		}
	}
}

#[derive(Debug)]
struct Link {
	state: Mutex<State>,
	written: Condvar,
	synced: Condvar,
}

#[derive(Debug, Clone)]
pub struct State {
	pub synced_tx: u64,
	pub written_tx: u64,
	pub written_tail: u64,
}

impl Link {
	fn new(tail: u64) -> Arc<Self> {
		Arc::new(Self {
			state: Mutex::new(State {
				synced_tx: 0,
				written_tx: 0,
				written_tail: tail,
			}),
			written: Condvar::new(),
			synced: Condvar::new(),
		})
	}

	fn write(self: &Arc<Self>, tx: u64, tail: u64) {
		{
			let mut state = self.state.lock().unwrap();
			assert!(state.written_tx < !0, "already closed");
			assert!(state.written_tx < tx, "duplicate transaction ID");

			state.written_tx = tx;
			state.written_tail = tail;
		}

		self.written.notify_one();
	}

	fn close(self: &Arc<Self>) {
		{
			let mut state = self.state.lock().unwrap();
			state.written_tx = !0;
		}

		self.written.notify_one();
	}

	fn wait(self: &Arc<Self>, tx: u64) {
		let state = self.state.lock().unwrap();

		drop(
			self.synced
				.wait_while(state, |state| state.synced_tx < tx)
				.unwrap(),
		);
	}

	fn run(self: &Arc<Self>, mut f: impl FnMut(&State)) {
		let mut state = self.state.lock().unwrap();

		while state.synced_tx < !0 {
			state = self
				.written
				.wait_while(state, |state| state.synced_tx == state.written_tx)
				.unwrap();

			let state_seen = state.clone();
			drop(state);

			f(&state_seen);

			state = self.state.lock().unwrap();
			state.synced_tx = state_seen.written_tx;

			self.synced.notify_all();
		}
	}
}