1
//! Authors: Maurice Laveaux, Flip van Spaendonck and Jan Friso Groote
2

            
3
use std::cell::UnsafeCell;
4
use std::error::Error;
5
use std::fmt::Debug;
6
use std::ops::Deref;
7
use std::ops::DerefMut;
8
use std::sync::Arc;
9
use std::sync::Mutex;
10
use std::sync::MutexGuard;
11
use std::sync::atomic::AtomicBool;
12
use std::sync::atomic::Ordering;
13

            
14
use crossbeam_utils::CachePadded;
15

            
16
/// A shared mutex (readers-writer lock) implementation based on the so-called
17
/// busy-forbidden protocol.
18
///
19
/// # Details
20
///
21
/// Compared to a regular [std::sync::Mutex] this struct is Send but not Sync.
22
/// This means that every thread must acquire a clone of the shared mutex and
23
/// the cloned instances of the same shared mutex guarantee shared access
24
/// through the `read` operation and exclusive access for the `write` operation
25
/// of the given object.
26
pub struct BfSharedMutex<T> {
27
    /// The local control bits of each instance.
28
    ///
29
    /// TODO: Maybe use pin to share the control bits among shared mutexes.
30
    control: Arc<CachePadded<SharedMutexControl>>,
31

            
32
    /// Index into the `other` table.
33
    index: usize,
34

            
35
    /// Information shared between all clones.
36
    shared: Arc<CachePadded<SharedData<T>>>,
37
}
38

            
39
// Can only be send, but is not sync
40
unsafe impl<T> Send for BfSharedMutex<T> {}
41

            
42
/// The busy and forbidden flags used to implement the protocol.
43
#[derive(Default)]
44
struct SharedMutexControl {
45
    busy: AtomicBool,
46
    forbidden: AtomicBool,
47
}
48

            
49
/// The shared data between all instances of the shared mutex.
50
struct SharedData<T> {
51
    /// The object that is being protected.
52
    object: UnsafeCell<T>,
53

            
54
    /// The list of all the shared mutex instances.
55
    other: Mutex<Vec<Option<Arc<CachePadded<SharedMutexControl>>>>>,
56
}
57

            
58
impl<T> BfSharedMutex<T> {
59
    /// Constructs a new shared mutex for protecting access to the given object.
60
542
    pub fn new(object: T) -> Self {
61
542
        let control = Arc::new(CachePadded::new(SharedMutexControl::default()));
62

            
63
542
        Self {
64
542
            control: control.clone(),
65
542
            shared: Arc::new(CachePadded::new(SharedData {
66
542
                object: UnsafeCell::new(object),
67
542
                other: Mutex::new(vec![Some(control.clone())]),
68
542
            })),
69
542
            index: 0,
70
542
        }
71
542
    }
72
}
73

            
74
impl<T> Clone for BfSharedMutex<T> {
75
585
    fn clone(&self) -> Self {
76
        // Register a new instance in the other list.
77
585
        let control = Arc::new(CachePadded::new(SharedMutexControl::default()));
78

            
79
585
        let mut other = self.shared.other.lock().expect("Failed to lock mutex");
80
585
        other.push(Some(control.clone()));
81

            
82
585
        Self {
83
585
            control,
84
585
            index: other.len() - 1,
85
585
            shared: self.shared.clone(),
86
585
        }
87
585
    }
88
}
89

            
90
impl<T> Drop for BfSharedMutex<T> {
91
594
    fn drop(&mut self) {
92
594
        let mut other = self.shared.other.lock().expect("Failed to lock mutex");
93

            
94
        // Remove ourselves from the table.
95
594
        other[self.index] = None;
96
594
    }
97
}
98

            
99
/// The guard object for exclusive access to the underlying object.
100
#[must_use = "Dropping the guard unlocks the shared mutex immediately"]
101
pub struct BfSharedMutexWriteGuard<'a, T> {
102
    mutex: &'a BfSharedMutex<T>,
103
    guard: MutexGuard<'a, Vec<Option<Arc<CachePadded<SharedMutexControl>>>>>,
104
}
105

            
106
/// Allow dereferencing the underlying object.
107
impl<T> Deref for BfSharedMutexWriteGuard<'_, T> {
108
    type Target = T;
109

            
110
1719
    fn deref(&self) -> &Self::Target {
111
        // We are the only guard after `write()`, so we can provide immutable access to the underlying object. (No mutable references the guard can exist)
112
1719
        unsafe { &*self.mutex.shared.object.get() }
113
1719
    }
114
}
115

            
116
impl<T> DerefMut for BfSharedMutexWriteGuard<'_, T> {
117
16206
    fn deref_mut(&mut self) -> &mut Self::Target {
118
        // We are the only guard after `write()`, so we can provide mutable access to the underlying object.
119
16206
        unsafe { &mut *self.mutex.shared.object.get() }
120
16206
    }
121
}
122

            
123
impl<T> Drop for BfSharedMutexWriteGuard<'_, T> {
124
16201
    fn drop(&mut self) {
125
        // Allow other threads to acquire access to the shared mutex.
126
223309
        for control in self.guard.iter().flatten() {
127
223309
            control.forbidden.store(false, std::sync::atomic::Ordering::SeqCst);
128
223309
        }
129

            
130
        // The mutex guard is then dropped here.
131
16201
    }
132
}
133

            
134
pub struct BfSharedMutexReadGuard<'a, T> {
135
    mutex: &'a BfSharedMutex<T>,
136
}
137

            
138
/// Allow dereferences the underlying object.
139
impl<T> Deref for BfSharedMutexReadGuard<'_, T> {
140
    type Target = T;
141

            
142
5284803
    fn deref(&self) -> &Self::Target {
143
        // There can only be shared guards, which only provide immutable access to the object.
144
5284803
        unsafe { &*self.mutex.shared.object.get() }
145
5284803
    }
146
}
147

            
148
impl<T> Drop for BfSharedMutexReadGuard<'_, T> {
149
969739483
    fn drop(&mut self) {
150
969739483
        debug_assert!(
151
969739483
            self.mutex.control.busy.load(Ordering::SeqCst),
152
            "Cannot unlock shared lock that was not acquired"
153
        );
154

            
155
969739483
        self.mutex.control.busy.store(false, Ordering::SeqCst);
156
969739483
    }
157
}
158

            
159
impl<T> BfSharedMutex<T> {
160
    /// Provides read access to the underlying object, allowing multiple immutable references to it.
161
    #[inline]
162
969739483
    pub fn read<'a>(&'a self) -> Result<BfSharedMutexReadGuard<'a, T>, Box<dyn Error + 'a>> {
163
969739483
        debug_assert!(
164
969739483
            !self.control.busy.load(Ordering::SeqCst),
165
            "Cannot acquire read access again inside a reader section"
166
        );
167

            
168
969739483
        self.control.busy.store(true, Ordering::SeqCst);
169
969741849
        while self.control.forbidden.load(Ordering::SeqCst) {
170
2366
            self.control.busy.store(false, Ordering::SeqCst);
171

            
172
            // Wait for the mutex of the writer.
173
2366
            let mut _guard = self.shared.other.lock()?;
174

            
175
2366
            self.control.busy.store(true, Ordering::SeqCst);
176
        }
177

            
178
        // We now have immutable access to the object due to the protocol.
179
969739483
        Ok(BfSharedMutexReadGuard { mutex: self })
180
969739483
    }
181

            
182
    /// Creates a new `BfSharedMutexReadGuard` without checking if the lock is held.
183
    ///
184
    /// # Safety
185
    ///
186
    /// This method must only be called if the thread logically holds a read lock.
187
    ///
188
    /// This function does not increment the read count of the lock. Calling this function when a
189
    /// guard has already been produced is undefined behaviour unless the guard was forgotten
190
    /// with `mem::forget`.
191
    #[inline]
192
966644555
    pub unsafe fn create_read_guard_unchecked(&self) -> BfSharedMutexReadGuard<'_, T> {
193
966644555
        BfSharedMutexReadGuard { mutex: self }
194
966644555
    }
195

            
196
    /// Returns a raw pointer to the underlying data.
197
    ///
198
    /// This is useful when combined with `mem::forget` to hold a lock without
199
    /// the need to maintain a `RwLockReadGuard` or `RwLockWriteGuard` object
200
    /// alive, for example when dealing with FFI.
201
    ///
202
    /// # Safety
203
    ///
204
    /// You must ensure that there are no data races when dereferencing the
205
    /// returned pointer, for example if the current thread logically owns a
206
    /// `RwLockReadGuard` or `RwLockWriteGuard` but that guard has been discarded
207
    /// using `mem::forget`.
208
    #[inline]
209
57130083
    pub fn data_ptr(&self) -> *mut T {
210
57130083
        self.shared.object.get()
211
57130083
    }
212

            
213
    /// Provide write access to the underlying object, only a single mutable reference to the object exists.
214
    #[inline]
215
16201
    pub fn write<'a>(&'a self) -> Result<BfSharedMutexWriteGuard<'a, T>, Box<dyn Error + 'a>> {
216
16201
        let other = self.shared.other.lock()?;
217

            
218
16201
        debug_assert!(
219
16201
            !self.control.busy.load(std::sync::atomic::Ordering::SeqCst),
220
            "Can only exclusive lock outside of a shared lock, no upgrading!"
221
        );
222
16201
        debug_assert!(
223
16201
            !self.control.forbidden.load(std::sync::atomic::Ordering::SeqCst),
224
            "Can not acquire exclusive lock inside of exclusive section"
225
        );
226

            
227
        // Make all instances wait due to forbidden access.
228
223309
        for control in other.iter().flatten() {
229
223309
            debug_assert!(
230
223309
                !control.forbidden.load(std::sync::atomic::Ordering::SeqCst),
231
                "Other instance is already forbidden, this cannot happen"
232
            );
233

            
234
223309
            control.forbidden.store(true, std::sync::atomic::Ordering::SeqCst);
235
        }
236

            
237
        // Wait for the instances to exit their busy status.
238
316821
        for (index, option) in other.iter().enumerate() {
239
316821
            if index != self.index {
240
300620
                if let Some(object) = option {
241
2462679
                    while object.busy.load(std::sync::atomic::Ordering::SeqCst) {
242
2255571
                        std::hint::spin_loop();
243
2255571
                    }
244
93512
                }
245
16201
            }
246
        }
247

            
248
        // We now have exclusive access to the object according to the protocol
249
16201
        Ok(BfSharedMutexWriteGuard {
250
16201
            mutex: self,
251
16201
            guard: other,
252
16201
        })
253
16201
    }
254

            
255
    /// Check if the shared mutex is locked shared, meaning no other thread has a read lock.
256
395766
    pub fn is_locked(&self) -> bool {
257
395766
        self.control.busy.load(Ordering::Relaxed)
258
395766
    }
259

            
260
    /// Check if the shared mutex is locked exclusively, meaning no other thread has a lock.
261
    pub fn is_locked_exclusive(&self) -> bool {
262
        self.control.forbidden.load(Ordering::Relaxed)
263
    }
264

            
265
    /// Obtain mutable access to the object without locking, is safe because we have mutable access.
266
    pub fn get_mut(&mut self) -> &mut T {
267
        unsafe { &mut *self.shared.object.get() }
268
    }
269
}
270

            
271
impl<T: Debug> Debug for BfSharedMutex<T> {
272
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273
        f.debug_map()
274
            .entry(&"busy", &self.control.busy.load(Ordering::SeqCst))
275
            .entry(&"forbidden", &self.control.forbidden.load(Ordering::SeqCst))
276
            .entry(&"index", &self.index)
277
            .entry(&"len(other)", &self.shared.other.lock().unwrap().len())
278
            .finish()?;
279

            
280
        writeln!(f)?;
281
        writeln!(f, "other values: [")?;
282
        for control in self.shared.other.lock().unwrap().iter().flatten() {
283
            f.debug_map()
284
                .entry(&"busy", &control.busy.load(Ordering::SeqCst))
285
                .entry(&"forbidden", &control.forbidden.load(Ordering::SeqCst))
286
                .finish()?;
287
            writeln!(f)?;
288
        }
289

            
290
        writeln!(f, "]")
291
    }
292
}
293

            
294
/// A global shared mutex that can be used to protect global data.
295
///
296
/// # Details
297
///
298
/// This is a wrapper around `BfSharedMutex` that provides a global instance
299
/// that can be used to protect global data. Must be cloned to obtain mutable
300
/// access.
301
pub struct GlobalBfSharedMutex<T> {
302
    /// The shared mutex that is used to protect the global data.
303
    pub shared_mutex: BfSharedMutex<T>,
304
}
305

            
306
impl<T> GlobalBfSharedMutex<T> {
307
    /// Constructs a new global shared mutex for protecting access to the given object.
308
533
    pub fn new(object: T) -> Self {
309
533
        Self {
310
533
            shared_mutex: BfSharedMutex::new(object),
311
533
        }
312
533
    }
313

            
314
    /// Returns a clone of the global shared mutex, which allows writing and reading.
315
535
    pub fn share(&self) -> BfSharedMutex<T> {
316
535
        self.shared_mutex.clone()
317
535
    }
318
}
319

            
320
// Can be Send and Sync, because it cannot be mutated anyway.
321
unsafe impl<T: Send> Send for GlobalBfSharedMutex<T> {}
322
unsafe impl<T: Send> Sync for GlobalBfSharedMutex<T> {}
323

            
324
#[cfg(test)]
325
mod tests {
326
    use crate::bf_sharedmutex::BfSharedMutex;
327
    use rand::prelude::*;
328
    use std::hint::black_box;
329

            
330
    use merc_utilities::random_test_threads;
331
    use merc_utilities::test_threads;
332

            
333
    // These are just simple tests.
334
    #[test]
335
    #[cfg_attr(miri, ignore)]
336
1
    fn test_random_shared_mutex_exclusive() {
337
1
        let shared_number = BfSharedMutex::new(5);
338
1
        let num_iterations = 500;
339
1
        let num_threads = 20;
340

            
341
1
        test_threads(
342
1
            num_threads,
343
20
            || shared_number.clone(),
344
20
            move |number| {
345
10000
                for _ in 0..num_iterations {
346
10000
                    *number.write().unwrap() += 5;
347
10000
                }
348
20
            },
349
        );
350

            
351
1
        assert_eq!(*shared_number.write().unwrap(), num_threads * num_iterations * 5 + 5);
352
1
    }
353

            
354
    #[test]
355
    #[cfg_attr(miri, ignore)]
356
1
    fn test_random_shared() {
357
1
        let shared_vector = BfSharedMutex::new(vec![]);
358

            
359
1
        let num_threads = 20;
360
1
        let num_iterations = 5000;
361

            
362
1
        random_test_threads(
363
1
            num_iterations,
364
1
            num_threads,
365
20
            || shared_vector.clone(),
366
100000
            |rng, shared_vector| {
367
100000
                if rng.random_bool(0.95) {
368
                    // Read a random index.
369
94900
                    let read = shared_vector.read().unwrap();
370
94900
                    if read.len() > 0 {
371
94900
                        let index = rng.random_range(0..read.len());
372
94900
                        black_box(assert_eq!(read[index], 5));
373
                    }
374
5100
                } else {
375
5100
                    // Add a new vector element.
376
5100
                    shared_vector.write().unwrap().push(5);
377
5100
                }
378
100000
            },
379
        );
380
1
    }
381
}