1
//! Authors: Maurice Laveaux, Flip van Spaendonck and Jan Friso Groote
2

            
3
use std::cell::UnsafeCell;
4
use std::error::Error;
5
use std::fmt::Debug;
6
use std::ops::Deref;
7
use std::ops::DerefMut;
8
use std::sync::Arc;
9
use std::sync::Mutex;
10
use std::sync::MutexGuard;
11
use std::sync::atomic::AtomicBool;
12
use std::sync::atomic::Ordering;
13

            
14
use crossbeam_utils::CachePadded;
15

            
16
/// A shared mutex (readers-writer lock) implementation based on the so-called
17
/// busy-forbidden protocol.
18
///
19
/// # Details
20
///
21
/// Compared to a regular [std::sync::Mutex] this struct is Send but not Sync.
22
/// This means that every thread must acquire a clone of the shared mutex and
23
/// the cloned instances of the same shared mutex guarantee shared access
24
/// through the `read` operation and exclusive access for the `write` operation
25
/// of the given object.
26
pub struct BfSharedMutex<T> {
27
    /// The local control bits of each instance.
28
    ///
29
    /// TODO: Maybe use pin to share the control bits among shared mutexes.
30
    control: Arc<CachePadded<SharedMutexControl>>,
31

            
32
    /// Index into the `other` table.
33
    index: usize,
34

            
35
    /// Information shared between all clones.
36
    shared: Arc<CachePadded<SharedData<T>>>,
37
}
38

            
39
// Can only be send, but is not sync
40
unsafe impl<T> Send for BfSharedMutex<T> {}
41

            
42
/// The busy and forbidden flags used to implement the protocol.
43
#[derive(Default)]
44
struct SharedMutexControl {
45
    busy: AtomicBool,
46
    forbidden: AtomicBool,
47
}
48

            
49
/// The shared data between all instances of the shared mutex.
50
struct SharedData<T> {
51
    /// The object that is being protected.
52
    object: UnsafeCell<T>,
53

            
54
    /// The list of all the shared mutex instances.
55
    other: Mutex<Vec<Option<Arc<CachePadded<SharedMutexControl>>>>>,
56
}
57

            
58
impl<T> BfSharedMutex<T> {
59
    /// Constructs a new shared mutex for protecting access to the given object.
60
602
    pub fn new(object: T) -> Self {
61
602
        let control = Arc::new(CachePadded::new(SharedMutexControl::default()));
62

            
63
602
        Self {
64
602
            control: control.clone(),
65
602
            shared: Arc::new(CachePadded::new(SharedData {
66
602
                object: UnsafeCell::new(object),
67
602
                other: Mutex::new(vec![Some(control.clone())]),
68
602
            })),
69
602
            index: 0,
70
602
        }
71
602
    }
72
}
73

            
74
impl<T> Clone for BfSharedMutex<T> {
75
645
    fn clone(&self) -> Self {
76
        // Register a new instance in the other list.
77
645
        let control = Arc::new(CachePadded::new(SharedMutexControl::default()));
78

            
79
645
        let mut other = self.shared.other.lock().expect("Failed to lock mutex");
80
645
        other.push(Some(control.clone()));
81

            
82
645
        Self {
83
645
            control,
84
645
            index: other.len() - 1,
85
645
            shared: self.shared.clone(),
86
645
        }
87
645
    }
88
}
89

            
90
impl<T> Drop for BfSharedMutex<T> {
91
654
    fn drop(&mut self) {
92
654
        let mut other = self.shared.other.lock().expect("Failed to lock mutex");
93

            
94
        // Remove ourselves from the table.
95
654
        other[self.index] = None;
96
654
    }
97
}
98

            
99
/// The guard object for exclusive access to the underlying object.
100
#[must_use = "Dropping the guard unlocks the shared mutex immediately"]
101
pub struct BfSharedMutexWriteGuard<'a, T> {
102
    mutex: &'a BfSharedMutex<T>,
103
    guard: MutexGuard<'a, Vec<Option<Arc<CachePadded<SharedMutexControl>>>>>,
104
}
105

            
106
/// Allow dereferencing the underlying object.
107
impl<T> Deref for BfSharedMutexWriteGuard<'_, T> {
108
    type Target = T;
109

            
110
1898
    fn deref(&self) -> &Self::Target {
111
        // We are the only guard after `write()`, so we can provide immutable access to the underlying object. (No mutable references the guard can exist)
112
1898
        unsafe { &*self.mutex.shared.object.get() }
113
1898
    }
114
}
115

            
116
impl<T> DerefMut for BfSharedMutexWriteGuard<'_, T> {
117
44171
    fn deref_mut(&mut self) -> &mut Self::Target {
118
        // We are the only guard after `write()`, so we can provide mutable access to the underlying object.
119
44171
        unsafe { &mut *self.mutex.shared.object.get() }
120
44171
    }
121
}
122

            
123
impl<T> Drop for BfSharedMutexWriteGuard<'_, T> {
124
44165
    fn drop(&mut self) {
125
        // Allow other threads to acquire access to the shared mutex.
126
224801
        for control in self.guard.iter().flatten() {
127
224801
            control.forbidden.store(false, std::sync::atomic::Ordering::SeqCst);
128
224801
        }
129

            
130
        // The mutex guard is then dropped here.
131
44165
    }
132
}
133

            
134
pub struct BfSharedMutexReadGuard<'a, T> {
135
    mutex: &'a BfSharedMutex<T>,
136
}
137

            
138
/// Allow dereferences the underlying object.
139
impl<T> Deref for BfSharedMutexReadGuard<'_, T> {
140
    type Target = T;
141

            
142
5285270
    fn deref(&self) -> &Self::Target {
143
        // There can only be shared guards, which only provide immutable access to the object.
144
5285270
        unsafe { &*self.mutex.shared.object.get() }
145
5285270
    }
146
}
147

            
148
impl<T> Drop for BfSharedMutexReadGuard<'_, T> {
149
971140383
    fn drop(&mut self) {
150
971140383
        debug_assert!(
151
971140383
            self.mutex.control.busy.load(Ordering::SeqCst),
152
            "Cannot unlock shared lock that was not acquired"
153
        );
154

            
155
971140383
        self.mutex.control.busy.store(false, Ordering::SeqCst);
156
971140383
    }
157
}
158

            
159
impl<T> BfSharedMutex<T> {
160
    /// Provides read access to the underlying object, allowing multiple immutable references to it.
161
    #[inline]
162
971140383
    pub fn read<'a>(&'a self) -> Result<BfSharedMutexReadGuard<'a, T>, Box<dyn Error + 'a>> {
163
971140383
        debug_assert!(
164
971140383
            !self.control.busy.load(Ordering::SeqCst),
165
            "Cannot acquire read access again inside a reader section"
166
        );
167

            
168
971140383
        self.control.busy.store(true, Ordering::SeqCst);
169
971142173
        while self.control.forbidden.load(Ordering::SeqCst) {
170
1790
            self.control.busy.store(false, Ordering::SeqCst);
171

            
172
            // Wait for the mutex of the writer.
173
1790
            let mut _guard = self.shared.other.lock()?;
174

            
175
1790
            self.control.busy.store(true, Ordering::SeqCst);
176
        }
177

            
178
        // We now have immutable access to the object due to the protocol.
179
971140383
        Ok(BfSharedMutexReadGuard { mutex: self })
180
971140383
    }
181

            
182
    /// Creates a new `BfSharedMutexReadGuard` without checking if the lock is held.
183
    ///
184
    /// # Safety
185
    ///
186
    /// This method must only be called if the thread logically holds a read lock.
187
    ///
188
    /// This function does not increment the read count of the lock. Calling this function when a
189
    /// guard has already been produced is undefined behaviour unless the guard was forgotten
190
    /// with `mem::forget`.
191
    #[inline]
192
968045281
    pub unsafe fn create_read_guard_unchecked(&self) -> BfSharedMutexReadGuard<'_, T> {
193
968045281
        BfSharedMutexReadGuard { mutex: self }
194
968045281
    }
195

            
196
    /// Returns a raw pointer to the underlying data.
197
    ///
198
    /// This is useful when combined with `mem::forget` to hold a lock without
199
    /// the need to maintain a `RwLockReadGuard` or `RwLockWriteGuard` object
200
    /// alive, for example when dealing with FFI.
201
    ///
202
    /// # Safety
203
    ///
204
    /// You must ensure that there are no data races when dereferencing the
205
    /// returned pointer, for example if the current thread logically owns a
206
    /// `RwLockReadGuard` or `RwLockWriteGuard` but that guard has been discarded
207
    /// using `mem::forget`.
208
    #[inline]
209
57321063
    pub fn data_ptr(&self) -> *mut T {
210
57321063
        self.shared.object.get()
211
57321063
    }
212

            
213
    /// Provide write access to the underlying object, only a single mutable reference to the object exists.
214
    #[inline]
215
44165
    pub fn write<'a>(&'a self) -> Result<BfSharedMutexWriteGuard<'a, T>, Box<dyn Error + 'a>> {
216
44165
        let other = self.shared.other.lock()?;
217

            
218
44165
        debug_assert!(
219
44165
            !self.control.busy.load(std::sync::atomic::Ordering::SeqCst),
220
            "Can only exclusive lock outside of a shared lock, no upgrading!"
221
        );
222
44165
        debug_assert!(
223
44165
            !self.control.forbidden.load(std::sync::atomic::Ordering::SeqCst),
224
            "Can not acquire exclusive lock inside of exclusive section"
225
        );
226

            
227
        // Make all instances wait due to forbidden access.
228
224801
        for control in other.iter().flatten() {
229
224801
            debug_assert!(
230
224801
                !control.forbidden.load(std::sync::atomic::Ordering::SeqCst),
231
                "Other instance is already forbidden, this cannot happen"
232
            );
233

            
234
224801
            control.forbidden.store(true, std::sync::atomic::Ordering::SeqCst);
235
        }
236

            
237
        // Wait for the instances to exit their busy status.
238
337017
        for (index, option) in other.iter().enumerate() {
239
337017
            if index != self.index
240
292852
                && let Some(object) = option {
241
2427138
                    while object.busy.load(std::sync::atomic::Ordering::SeqCst) {
242
2246502
                        std::hint::spin_loop();
243
2246502
                    }
244
156381
                }
245
        }
246

            
247
        // We now have exclusive access to the object according to the protocol
248
44165
        Ok(BfSharedMutexWriteGuard {
249
44165
            mutex: self,
250
44165
            guard: other,
251
44165
        })
252
44165
    }
253

            
254
    /// Check if the shared mutex is locked shared, meaning no other thread has a read lock.
255
140822
    pub fn is_locked(&self) -> bool {
256
140822
        self.control.busy.load(Ordering::Relaxed)
257
140822
    }
258

            
259
    /// Check if the shared mutex is locked exclusively, meaning no other thread has a lock.
260
    pub fn is_locked_exclusive(&self) -> bool {
261
        self.control.forbidden.load(Ordering::Relaxed)
262
    }
263

            
264
    /// Obtain mutable access to the object without locking, is safe because we have mutable access.
265
    pub fn get_mut(&mut self) -> &mut T {
266
        unsafe { &mut *self.shared.object.get() }
267
    }
268
}
269

            
270
impl<T: Debug> Debug for BfSharedMutex<T> {
271
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272
        f.debug_map()
273
            .entry(&"busy", &self.control.busy.load(Ordering::SeqCst))
274
            .entry(&"forbidden", &self.control.forbidden.load(Ordering::SeqCst))
275
            .entry(&"index", &self.index)
276
            .entry(&"len(other)", &self.shared.other.lock().unwrap().len())
277
            .finish()?;
278

            
279
        writeln!(f)?;
280
        writeln!(f, "other values: [")?;
281
        for control in self.shared.other.lock().unwrap().iter().flatten() {
282
            f.debug_map()
283
                .entry(&"busy", &control.busy.load(Ordering::SeqCst))
284
                .entry(&"forbidden", &control.forbidden.load(Ordering::SeqCst))
285
                .finish()?;
286
            writeln!(f)?;
287
        }
288

            
289
        writeln!(f, "]")
290
    }
291
}
292

            
293
/// A global shared mutex that can be used to protect global data.
294
///
295
/// # Details
296
///
297
/// This is a wrapper around `BfSharedMutex` that provides a global instance
298
/// that can be used to protect global data. Must be cloned to obtain mutable
299
/// access.
300
pub struct GlobalBfSharedMutex<T> {
301
    /// The shared mutex that is used to protect the global data.
302
    pub shared_mutex: BfSharedMutex<T>,
303
}
304

            
305
impl<T> GlobalBfSharedMutex<T> {
306
    /// Constructs a new global shared mutex for protecting access to the given object.
307
593
    pub fn new(object: T) -> Self {
308
593
        Self {
309
593
            shared_mutex: BfSharedMutex::new(object),
310
593
        }
311
593
    }
312

            
313
    /// Returns a clone of the global shared mutex, which allows writing and reading.
314
595
    pub fn share(&self) -> BfSharedMutex<T> {
315
595
        self.shared_mutex.clone()
316
595
    }
317
}
318

            
319
// Can be Send and Sync, because it cannot be mutated anyway.
320
unsafe impl<T: Send> Send for GlobalBfSharedMutex<T> {}
321
unsafe impl<T: Send> Sync for GlobalBfSharedMutex<T> {}
322

            
323
#[cfg(test)]
324
mod tests {
325
    use crate::bf_sharedmutex::BfSharedMutex;
326
    use rand::prelude::*;
327
    use std::hint::black_box;
328

            
329
    use merc_utilities::random_test_threads;
330
    use merc_utilities::test_threads;
331

            
332
    // These are just simple tests.
333
    #[test]
334
    #[cfg_attr(miri, ignore)]
335
1
    fn test_random_shared_mutex_exclusive() {
336
1
        let shared_number = BfSharedMutex::new(5);
337
1
        let num_iterations = 500;
338
1
        let num_threads = 20;
339

            
340
1
        test_threads(
341
1
            num_threads,
342
20
            || shared_number.clone(),
343
20
            move |number| {
344
10000
                for _ in 0..num_iterations {
345
10000
                    *number.write().unwrap() += 5;
346
10000
                }
347
20
            },
348
        );
349

            
350
1
        assert_eq!(*shared_number.write().unwrap(), num_threads * num_iterations * 5 + 5);
351
1
    }
352

            
353
    #[test]
354
    #[cfg_attr(miri, ignore)]
355
1
    fn test_random_shared() {
356
1
        let shared_vector = BfSharedMutex::new(vec![]);
357

            
358
1
        let num_threads = 20;
359
1
        let num_iterations = 5000;
360

            
361
1
        random_test_threads(
362
1
            num_iterations,
363
1
            num_threads,
364
20
            || shared_vector.clone(),
365
100000
            |rng, shared_vector| {
366
100000
                if rng.random_bool(0.95) {
367
                    // Read a random index.
368
95075
                    let read = shared_vector.read().unwrap();
369
95075
                    if read.len() > 0 {
370
95048
                        let index = rng.random_range(0..read.len());
371
95048
                        black_box(assert_eq!(read[index], 5));
372
27
                    }
373
4925
                } else {
374
4925
                    // Add a new vector element.
375
4925
                    shared_vector.write().unwrap().push(5);
376
4925
                }
377
100000
            },
378
        );
379
1
    }
380
}