1
//! Authors: Maurice Laveaux, Flip van Spaendonck and Jan Friso Groote
2

            
3
use std::error::Error;
4
use std::fmt::Debug;
5
use std::ops::Deref;
6
use std::ops::DerefMut;
7

            
8
#[cfg(not(loom))]
9
mod inner {
10
    pub use std::cell::UnsafeCell;
11
    pub use std::hint::spin_loop;
12
    pub use std::sync::Arc;
13
    pub use std::sync::Mutex;
14
    pub use std::sync::MutexGuard;
15
    pub use std::sync::atomic::AtomicBool;
16
    pub use std::sync::atomic::Ordering;
17
    pub use std::sync::atomic::fence;
18
}
19

            
20
// We replace the standard implementation by loom's implementation.
21
#[cfg(loom)]
22
mod inner {
23
    pub use std::mem::ManuallyDrop;
24

            
25
    pub use loom::cell::UnsafeCell;
26
    pub use loom::hint::spin_loop;
27
    pub use loom::sync::Arc;
28
    pub use loom::sync::Mutex;
29
    pub use loom::sync::MutexGuard;
30
    pub use loom::sync::atomic::AtomicBool;
31
    pub use loom::sync::atomic::Ordering;
32
    pub use loom::sync::atomic::fence;
33
}
34

            
35
use inner::*;
36

            
37
use crossbeam_utils::CachePadded;
38

            
39
/// A shared mutex (readers-writer lock) implementation based on the so-called
40
/// busy-forbidden protocol.
41
///
42
/// # Details
43
///
44
/// Compared to a regular [std::sync::Mutex] this struct is Send but not Sync.
45
/// This means that every thread must acquire a clone of the shared mutex and
46
/// the cloned instances of the same shared mutex guarantee shared access
47
/// through the `read` operation and exclusive access for the `write` operation
48
/// of the given object.
49
pub struct BfSharedMutex<T> {
50
    /// The local control bits of each instance.
51
    ///
52
    /// TODO: Maybe use pin to share the control bits among shared mutexes.
53
    control: Arc<CachePadded<SharedMutexControl>>,
54

            
55
    /// Index into the `other` table.
56
    index: usize,
57

            
58
    /// Information shared between all clones.
59
    shared: Arc<CachePadded<SharedData<T>>>,
60
}
61

            
62
// SAFETY: Sending a BfSharedMutex to another thread transfers ownership of the
63
// protected T.
64
unsafe impl<T: Send> Send for BfSharedMutex<T> {}
65

            
66
/// The busy and forbidden flags used to implement the protocol.
67
#[derive(Default)]
68
struct SharedMutexControl {
69
    busy: AtomicBool,
70
    forbidden: AtomicBool,
71
}
72

            
73
/// The shared data between all instances of the shared mutex.
74
struct SharedData<T> {
75
    /// The object that is being protected.
76
    object: UnsafeCell<T>,
77

            
78
    /// The list of all the shared mutex instances.
79
    other: Mutex<Vec<Option<Arc<CachePadded<SharedMutexControl>>>>>,
80
}
81

            
82
impl<T> BfSharedMutex<T> {
83
    /// Constructs a new shared mutex for protecting access to the given object.
84
968
    pub fn new(object: T) -> Self {
85
968
        let control = Arc::new(CachePadded::new(SharedMutexControl::default()));
86

            
87
968
        Self {
88
968
            control: control.clone(),
89
968
            shared: Arc::new(CachePadded::new(SharedData {
90
968
                object: UnsafeCell::new(object),
91
968
                other: Mutex::new(vec![Some(control.clone())]),
92
968
            })),
93
968
            index: 0,
94
968
        }
95
968
    }
96
}
97

            
98
impl<T> Clone for BfSharedMutex<T> {
99
995
    fn clone(&self) -> Self {
100
        // Register a new instance in the other list
101
995
        let control = Arc::new(CachePadded::new(SharedMutexControl::default()));
102

            
103
995
        let mut other = self.shared.other.lock().expect("Failed to lock mutex");
104
1234
        let index = match other.iter().position(|slot| slot.is_none()) {
105
            Some(index) => {
106
                // Reuse an empty slot if available.
107
                other[index] = Some(control.clone());
108
                index
109
            }
110
            None => {
111
995
                other.push(Some(control.clone()));
112
995
                other.len() - 1
113
            }
114
        };
115

            
116
995
        Self {
117
995
            control,
118
995
            index,
119
995
            shared: self.shared.clone(),
120
995
        }
121
995
    }
122
}
123

            
124
impl<T> Drop for BfSharedMutex<T> {
125
1004
    fn drop(&mut self) {
126
1004
        let mut other = self.shared.other.lock().expect("Failed to lock mutex");
127

            
128
        // Remove ourselves from the table.
129
1004
        other[self.index] = None;
130

            
131
        // Trim trailing None slots to keep the vec compact.
132
2008
        while other.last().is_some_and(|slot| slot.is_none()) {
133
1004
            other.pop();
134
1004
        }
135
1004
    }
136
}
137

            
138
/// The guard object for exclusive access to the underlying object.
139
#[must_use = "Dropping the guard unlocks the shared mutex immediately"]
140
pub struct BfSharedMutexWriteGuard<'a, T> {
141
    #[allow(dead_code)]
142
    mutex: &'a BfSharedMutex<T>,
143

            
144
    guard: MutexGuard<'a, Vec<Option<Arc<CachePadded<SharedMutexControl>>>>>,
145

            
146
    /// When loom is enabled, we store a write reference tracked by Loom.
147
    #[cfg(loom)]
148
    ptr: ManuallyDrop<loom::cell::MutPtr<T>>,
149
}
150

            
151
/// Allow dereferencing the underlying object.
152
impl<T> Deref for BfSharedMutexWriteGuard<'_, T> {
153
    type Target = T;
154

            
155
3006
    fn deref(&self) -> &Self::Target {
156
        // We are the only guard after `write()`, so we can provide immutable access to the underlying object. (No mutable references the guard can exist)
157
        #[cfg(not(loom))]
158
        unsafe {
159
3006
            &*self.mutex.shared.object.get()
160
        }
161

            
162
        #[cfg(loom)]
163
        unsafe {
164
            self.ptr.deref().deref()
165
        }
166
3006
    }
167
}
168

            
169
impl<T> DerefMut for BfSharedMutexWriteGuard<'_, T> {
170
396910
    fn deref_mut(&mut self) -> &mut Self::Target {
171
        // We are the only guard after `write()`, so we can provide mutable access to the underlying object.
172
        #[cfg(not(loom))]
173
396910
        unsafe {
174
396910
            &mut *self.mutex.shared.object.get()
175
396910
        }
176

            
177
        #[cfg(loom)]
178
        unsafe {
179
            self.ptr.deref().deref()
180
        }
181
396910
    }
182
}
183

            
184
impl<T> Drop for BfSharedMutexWriteGuard<'_, T> {
185
396911
    fn drop(&mut self) {
186
        // End Loom's write tracking before releasing the protocol.
187
        #[cfg(loom)]
188
        unsafe {
189
            ManuallyDrop::drop(&mut self.ptr);
190
        }
191

            
192
        // Allow other threads to acquire access to the shared mutex.
193
872011
        for control in self.guard.iter().flatten() {
194
872011
            control.forbidden.store(false, Ordering::Release);
195
872011
        }
196

            
197
        // The mutex guard is then dropped here.
198
396911
    }
199
}
200

            
201
// SAFETY: Sharing &WriteGuard across threads only exposes &T.
202
unsafe impl<T: Sync> Sync for BfSharedMutexWriteGuard<'_, T> {}
203

            
204
#[must_use = "Dropping the guard unlocks the shared mutex immediately"]
205
pub struct BfSharedMutexReadGuard<'a, T> {
206
    mutex: &'a BfSharedMutex<T>,
207

            
208
    /// When loom is enabled, we store a read reference tracked by Loom.
209
    #[cfg(loom)]
210
    ptr: ManuallyDrop<loom::cell::ConstPtr<T>>,
211
}
212

            
213
// SAFETY: Sharing &ReadGuard across threads only exposes &T.
214
unsafe impl<T: Sync> Sync for BfSharedMutexReadGuard<'_, T> {}
215

            
216
/// Allows dereferencing the underlying object.
217
impl<T> Deref for BfSharedMutexReadGuard<'_, T> {
218
    type Target = T;
219

            
220
5284936
    fn deref(&self) -> &Self::Target {
221
        // There can only be shared guards, which only provide immutable access to the object.
222
        #[cfg(not(loom))]
223
        unsafe {
224
5284936
            &*self.mutex.shared.object.get()
225
        }
226

            
227
        #[cfg(loom)]
228
        unsafe {
229
            self.ptr.deref().deref()
230
        }
231
5284936
    }
232
}
233

            
234
impl<T> Drop for BfSharedMutexReadGuard<'_, T> {
235
1574186772
    fn drop(&mut self) {
236
1574186772
        debug_assert!(
237
1574186772
            self.mutex.control.busy.load(Ordering::Relaxed),
238
            "Cannot unlock shared lock that was not acquired"
239
        );
240

            
241
        // End Loom's read tracking before releasing the protocol.
242
        #[cfg(loom)]
243
        unsafe {
244
            ManuallyDrop::drop(&mut self.ptr);
245
        }
246

            
247
        // Release is sufficient, this synchronises the writes of this thread with writer.
248
1574186772
        self.mutex.control.busy.store(false, Ordering::Release);
249
1574186772
    }
250
}
251

            
252
impl<T> BfSharedMutex<T> {
253
    /// Provides read access to the underlying object, allowing multiple immutable references to it.
254
1574186772
    pub fn read<'a>(&'a self) -> Result<BfSharedMutexReadGuard<'a, T>, Box<dyn Error + 'a>> {
255
1574186772
        debug_assert!(
256
1574186772
            !self.control.busy.load(Ordering::Relaxed),
257
            "Cannot acquire read access again inside a reader section"
258
        );
259

            
260
1574186772
        self.control.busy.store(true, Ordering::Relaxed);
261
1574186772
        fence(Ordering::SeqCst);
262
1574188783
        while self.control.forbidden.load(Ordering::Acquire) {
263
            // Signal the writer that this thread is no longer busy, allowing it to make progress.
264
2011
            self.control.busy.store(false, Ordering::Relaxed);
265

            
266
            // For loom with spin locks we must ensure that other threads can make progress for fairness.
267
            #[cfg(loom)]
268
            loom::thread::yield_now();
269

            
270
            // Wait for the mutex of the writer.
271
2011
            let _guard = self.shared.other.lock()?;
272

            
273
            // Allow another thread to acquire the lock.
274
            #[cfg(loom)]
275
            loom::thread::yield_now();
276

            
277
2011
            self.control.busy.store(true, Ordering::Relaxed);
278
        }
279

            
280
        // We now have immutable access to the object due to the protocol.
281
1574186772
        Ok(BfSharedMutexReadGuard {
282
1574186772
            mutex: self,
283
1574186772
            #[cfg(loom)]
284
1574186772
            ptr: ManuallyDrop::new(self.shared.object.get()),
285
1574186772
        })
286
1574186772
    }
287

            
288
    /// Creates a new `BfSharedMutexReadGuard` without checking if the lock is held.
289
    ///
290
    /// # Safety
291
    ///
292
    /// This method must only be called if the thread logically holds a read lock.
293
    ///
294
    /// This function does not increment the read count of the lock. Calling this function when a
295
    /// guard has already been produced is undefined behaviour unless the guard was forgotten
296
    /// with `mem::forget`.
297
1571091781
    pub unsafe fn create_read_guard_unchecked(&self) -> BfSharedMutexReadGuard<'_, T> {
298
1571091781
        BfSharedMutexReadGuard {
299
1571091781
            mutex: self,
300
1571091781
            #[cfg(loom)]
301
1571091781
            ptr: ManuallyDrop::new(self.shared.object.get()),
302
1571091781
        }
303
1571091781
    }
304

            
305
    /// Returns a raw pointer to the underlying data.
306
    ///
307
    /// This is useful when combined with `mem::forget` to hold a lock without
308
    /// the need to maintain a [`BfSharedMutexReadGuard`] or [`BfSharedMutexWriteGuard`] object
309
    /// alive, for example when dealing with FFI.
310
    ///
311
    /// # Safety
312
    ///
313
    /// You must ensure that there are no data races when dereferencing the
314
    /// returned pointer, for example if the current thread logically owns a
315
    /// [`BfSharedMutexReadGuard`] or [`BfSharedMutexWriteGuard`] but that guard has been discarded
316
    /// using `mem::forget`.
317
    #[cfg(not(loom))]
318
91759284
    pub fn data_ptr(&self) -> *mut T {
319
91759284
        self.shared.object.get()
320
91759284
    }
321

            
322
    #[cfg(loom)]
323
    pub fn data_ptr(&self) -> loom::cell::ConstPtr<T> {
324
        self.shared.object.get()
325
    }
326

            
327
    /// Provide write access to the underlying object, only a single mutable reference to the object exists.
328
396911
    pub fn write<'a>(&'a self) -> Result<BfSharedMutexWriteGuard<'a, T>, Box<dyn Error + 'a>> {
329
396911
        let other = self.shared.other.lock()?;
330

            
331
396911
        debug_assert!(
332
396911
            !self.control.busy.load(Ordering::Relaxed),
333
            "Can only exclusive lock outside of a shared lock, no upgrading!"
334
        );
335
396911
        debug_assert!(
336
396911
            !self.control.forbidden.load(Ordering::Relaxed),
337
            "Can not acquire exclusive lock inside of exclusive section"
338
        );
339

            
340
        // Make all instances wait due to forbidden access.
341
872011
        for control in other.iter().flatten() {
342
872011
            debug_assert!(
343
872011
                !control.forbidden.load(Ordering::Relaxed),
344
                "Other instance is already forbidden, this cannot happen"
345
            );
346

            
347
872011
            control.forbidden.store(true, Ordering::Relaxed);
348
        }
349

            
350
396911
        fence(Ordering::SeqCst);
351

            
352
        // Wait for the instances to exit their busy status.
353
888408
        for (index, option) in other.iter().enumerate() {
354
888408
            if index != self.index
355
491497
                && let Some(object) = option
356
            {
357
                // We just synchronize with the busy store of the other instances.
358
2429095
                while object.busy.load(Ordering::Acquire) {
359
1953995
                    spin_loop();
360
1953995
                }
361
413308
            }
362
        }
363

            
364
        // We now have exclusive access to the object according to the protocol
365
396911
        Ok(BfSharedMutexWriteGuard {
366
396911
            mutex: self,
367
396911
            guard: other,
368
396911
            #[cfg(loom)]
369
396911
            ptr: ManuallyDrop::new(self.shared.object.get_mut()),
370
396911
        })
371
396911
    }
372

            
373
    /// Check if this instance's read lock is currently held (i.e., this instance's `busy` flag is set).
374
    ///
375
    /// Note: this only reflects the state of **this** clone of the shared mutex. Other clones
376
    /// may independently hold their own read locks.
377
917941
    pub fn is_locked(&self) -> bool {
378
917941
        self.control.busy.load(Ordering::Relaxed)
379
917941
    }
380

            
381
    /// Check if this instance has been forbidden from acquiring a read lock, which indicates
382
    /// that another clone is holding or acquiring a write lock.
383
    pub fn is_locked_exclusive(&self) -> bool {
384
        self.control.forbidden.load(Ordering::Relaxed)
385
    }
386

            
387
    /// Obtain mutable access to the object without locking, is safe because we have mutable access.
388
    pub fn get_mut(&mut self) -> &mut T {
389
        #[cfg(not(loom))]
390
        unsafe {
391
            &mut *self.shared.object.get()
392
        }
393
        #[cfg(loom)]
394
        unsafe {
395
            self.shared.object.get_mut().with(|ptr| &mut *ptr)
396
        }
397
    }
398
}
399

            
400
impl<T: Debug> Debug for BfSharedMutex<T> {
401
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402
        let other = self.shared.other.lock().unwrap();
403

            
404
        f.debug_map()
405
            .entry(&"busy", &self.control.busy.load(Ordering::Relaxed))
406
            .entry(&"forbidden", &self.control.forbidden.load(Ordering::Relaxed))
407
            .entry(&"index", &self.index)
408
            .entry(&"len(other)", &other.len())
409
            .finish()?;
410

            
411
        writeln!(f)?;
412
        writeln!(f, "other values: [")?;
413
        for control in other.iter().flatten() {
414
            f.debug_map()
415
                .entry(&"busy", &control.busy.load(Ordering::Relaxed))
416
                .entry(&"forbidden", &control.forbidden.load(Ordering::Relaxed))
417
                .finish()?;
418
            writeln!(f)?;
419
        }
420

            
421
        writeln!(f, "]")
422
    }
423
}
424

            
425
/// A global shared mutex that can be used to protect global data.
426
///
427
/// # Details
428
///
429
/// This is a wrapper around `BfSharedMutex` that provides a global instance
430
/// that can be used to protect global data. Must be cloned to obtain mutable
431
/// access.
432
pub struct GlobalBfSharedMutex<T> {
433
    /// The shared mutex that is used to protect the global data.
434
    pub shared_mutex: BfSharedMutex<T>,
435
}
436

            
437
impl<T> GlobalBfSharedMutex<T> {
438
    /// Constructs a new global shared mutex for protecting access to the given object.
439
959
    pub fn new(object: T) -> Self {
440
959
        Self {
441
959
            shared_mutex: BfSharedMutex::new(object),
442
959
        }
443
959
    }
444

            
445
    /// Returns a clone of the global shared mutex, which allows writing and reading.
446
962
    pub fn share(&self) -> BfSharedMutex<T> {
447
962
        self.shared_mutex.clone()
448
962
    }
449
}
450

            
451
// SAFETY: Sending a GlobalBfSharedMutex<T> to another thread requires T: Send, same as RwLock<T>.
452
unsafe impl<T: Send> Send for GlobalBfSharedMutex<T> {}
453

            
454
// SAFETY: Multiple threads holding &GlobalBfSharedMutex<T> can call share() concurrently
455
unsafe impl<T: Send + Sync> Sync for GlobalBfSharedMutex<T> {}
456

            
457
#[cfg(test)]
458
mod tests {
459
    use std::hint::black_box;
460

            
461
    use rand::RngExt;
462

            
463
    use merc_utilities::random_test_threads;
464
    use merc_utilities::test_threads;
465

            
466
    use super::BfSharedMutex;
467

            
468
    // These are just simple tests.
469
    #[test]
470
    #[cfg_attr(miri, ignore)]
471
1
    fn test_random_bf_shared_mutex_exclusive() {
472
1
        let shared_number = BfSharedMutex::new(5);
473
1
        let num_iterations = 500;
474
1
        let num_threads = 3;
475

            
476
1
        test_threads(
477
1
            num_threads,
478
3
            || shared_number.clone(),
479
3
            move |number| {
480
1500
                for _ in 0..num_iterations {
481
1500
                    *number.write().unwrap() += 5;
482
1500
                }
483
3
            },
484
        );
485

            
486
1
        assert_eq!(*shared_number.write().unwrap(), num_threads * num_iterations * 5 + 5);
487
1
    }
488

            
489
    #[test]
490
    #[cfg_attr(miri, ignore)]
491
1
    fn test_random_bf_shared_mutex() {
492
1
        let shared_vector = BfSharedMutex::new(vec![]);
493

            
494
1
        let num_threads = 20;
495
1
        let num_iterations = 5000;
496

            
497
1
        random_test_threads(
498
1
            num_iterations,
499
1
            num_threads,
500
20
            || shared_vector.clone(),
501
100000
            |rng, shared_vector| {
502
100000
                if rng.random_bool(0.95) {
503
                    // Read a random index.
504
94957
                    let read = shared_vector.read().unwrap();
505
94957
                    if read.len() > 0 {
506
94926
                        let index = rng.random_range(0..read.len());
507
94926
                        black_box(assert_eq!(read[index], 5));
508
31
                    }
509
5043
                } else {
510
5043
                    // Add a new vector element.
511
5043
                    shared_vector.write().unwrap().push(5);
512
5043
                }
513
100000
            },
514
        );
515
1
    }
516

            
517
    #[test]
518
    #[cfg(loom)]
519
    fn test_loom_bf_shared_mutex() {
520
        let mut builder = loom::model::Builder::new();
521
        builder.preemption_bound = Some(1);
522

            
523
        builder.check(|| {
524
            let shared_mutex = BfSharedMutex::new(false);
525

            
526
            let threads: Vec<_> = (0..3)
527
                .map(|_| {
528
                    let shared_mutex = shared_mutex.clone();
529
                    loom::thread::spawn(move || {
530
                        // Just perform some operations on the shared mutex.
531
                        let result = *shared_mutex.read().unwrap();
532

            
533
                        *shared_mutex.write().unwrap() = !result;
534
                    })
535
                })
536
                .collect();
537

            
538
            for th in threads {
539
                th.join().unwrap();
540
            }
541
        });
542
    }
543
}