1
//! Authors: Maurice Laveaux, Flip van Spaendonck and Jan Friso Groote
2

            
3
use std::alloc;
4
use std::alloc::Layout;
5
use std::cmp::max;
6
use std::ops::Index;
7
use std::ptr;
8
use std::ptr::NonNull;
9
use std::sync::atomic::AtomicUsize;
10
use std::sync::atomic::Ordering;
11

            
12
use crate::BfSharedMutex;
13

            
14
/// An implementation of [Vec<T, A>] based on the [BfSharedMutex] implementation
15
/// that can be safely send between threads. Elements in the vector can be written
16
/// concurrently iff type T is [Sync].
17
pub struct BfVec<T> {
18
    shared: BfSharedMutex<BfVecShared<T>>,
19
}
20

            
21
/// The internal shared data of the [BfVec].
22
pub struct BfVecShared<T> {
23
    buffer: Option<NonNull<T>>,
24
    capacity: usize,
25
    len: AtomicUsize,
26
}
27

            
28
impl<T> BfVec<T> {
29
    /// Create a new vector with zero capacity.
30
1
    pub fn new() -> BfVec<T> {
31
1
        BfVec {
32
1
            shared: BfSharedMutex::new(BfVecShared::<T> {
33
1
                buffer: None,
34
1
                capacity: 0,
35
1
                len: AtomicUsize::new(0),
36
1
            }),
37
1
        }
38
1
    }
39

            
40
    /// Append a new element to the vector.
41
1000000
    pub fn push(&self, value: T) {
42
1000000
        let mut read = self.shared.read().unwrap();
43

            
44
        // Reserve an index for the new element.
45
1000000
        let mut last_index = read.len.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
46

            
47
1000025
        while last_index + 1 >= read.capacity {
48
25
            // Vector needs to be resized.
49
25
            let new_capacity = max(read.capacity * 2, 8);
50
25

            
51
25
            // Make the length consistent and reserve the new size.
52
25
            read.len.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
53
25
            drop(read);
54
25
            self.reserve(new_capacity);
55
25

            
56
25
            // Acquire read access and try a new position.
57
25
            read = self.shared.read().unwrap();
58
25
            last_index = read.len.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
59
25
        }
60

            
61
        // Write the element on the specified index.
62
1000000
        unsafe {
63
1000000
            // We know that the buffer has been allocated by this point.
64
1000000
            let end = read.buffer.unwrap().as_ptr().add(last_index);
65
1000000
            ptr::write(end, value);
66
1000000
        }
67
1000000
    }
68

            
69
    /// Obtain another view on the vector to share among threads.
70
10
    pub fn share(&self) -> BfVec<T> {
71
10
        BfVec {
72
10
            shared: self.shared.clone(),
73
10
        }
74
10
    }
75

            
76
    /// Obtain the number of elements in the vector.
77
1000002
    pub fn len(&self) -> usize {
78
1000002
        self.shared.read().unwrap().len.load(Ordering::Relaxed)
79
1000002
    }
80

            
81
    /// Returns true iff the vector is empty.
82
    pub fn is_empty(&self) -> bool {
83
        self.len() == 0
84
    }
85

            
86
    /// Drops the elements in the Vec, but keeps the capacity.
87
    pub fn clear(&self) {
88
        let mut write = self.shared.write().unwrap();
89
        write.clear();
90
    }
91

            
92
    /// Reserve the given capacity.
93
25
    fn reserve(&self, capacity: usize) {
94
25
        let mut write = self.shared.write().unwrap();
95

            
96
        // A reserve could have happened in the meantime which makes this call obsolete
97
25
        if capacity <= write.capacity {
98
7
            return;
99
18
        }
100

            
101
18
        let old_layout = Layout::array::<T>(write.capacity).unwrap();
102
18
        let layout = Layout::array::<T>(capacity).unwrap();
103

            
104
        unsafe {
105
18
            let new_buffer = alloc::alloc(layout) as *mut T;
106
18
            if new_buffer.is_null() {
107
                alloc::handle_alloc_error(layout);
108
18
            }
109

            
110
18
            if let Some(old_buffer) = write.buffer {
111
17
                debug_assert!(
112
17
                    write.len.load(Ordering::Relaxed) <= write.capacity,
113
                    "Length {} should be less than capacity {}",
114
                    write.len.load(Ordering::Relaxed),
115
                    write.capacity
116
                );
117

            
118
17
                ptr::copy_nonoverlapping(old_buffer.as_ptr(), new_buffer, write.len.load(Ordering::Relaxed));
119

            
120
                // Clean up the old buffer.
121
17
                alloc::dealloc(old_buffer.as_ptr() as *mut u8, old_layout);
122
1
            }
123

            
124
18
            write.capacity = capacity;
125
18
            write.buffer = NonNull::new(new_buffer);
126
        }
127
25
    }
128

            
129
    /// Get access to the underlying data storage.
130
1000000
    fn data(&self) -> *const T {
131
1000000
        self.shared.read().unwrap().buffer.unwrap().as_ptr()
132
1000000
    }
133
}
134

            
135
impl<T> Default for BfVec<T> {
136
    fn default() -> Self {
137
        Self::new()
138
    }
139
}
140

            
141
impl<T> Index<usize> for BfVec<T> {
142
    type Output = T;
143

            
144
1000000
    fn index(&self, index: usize) -> &Self::Output {
145
1000000
        debug_assert!(index < self.len());
146

            
147
1000000
        unsafe { &*self.data().add(index) }
148
1000000
    }
149
}
150

            
151
impl<T> BfVecShared<T> {
152
    /// Clears the vector by dropping all elements.
153
1
    pub fn clear(&mut self) {
154
        // Only drop items within the 0..len range since the other values are not initialised.
155
1000000
        for i in 0..self.len.load(Ordering::Relaxed) {
156
1000000
            unsafe {
157
1000000
                // We have exclusive access so dropping is safe.
158
1000000
                let ptr = self.buffer.unwrap().as_ptr().add(i);
159
1000000

            
160
1000000
                ptr::drop_in_place(ptr);
161
1000000
            }
162
        }
163
1
    }
164
}
165

            
166
impl<T> Drop for BfVecShared<T> {
167
1
    fn drop(&mut self) {
168
1
        self.clear();
169

            
170
        unsafe {
171
            // Deallocate the underlying storage.
172
1
            let layout = Layout::array::<T>(self.capacity).unwrap();
173
1
            if let Some(buffer) = self.buffer {
174
1
                alloc::dealloc(buffer.as_ptr() as *mut u8, layout);
175
1
            }
176
        }
177
1
    }
178
}
179

            
180
unsafe impl<T> Send for BfVec<T> {}
181

            
182
#[cfg(test)]
183
mod tests {
184
    use std::thread;
185

            
186
    use super::*;
187

            
188
    // These are just simple tests.
189
    #[test]
190
    #[cfg_attr(miri, ignore)]
191
1
    fn test_push() {
192
1
        let mut threads = vec![];
193

            
194
1
        let shared_vector = BfVec::<u32>::new();
195
1
        let num_threads = 10;
196
1
        let num_iterations = 100000;
197

            
198
10
        for t in 0..num_threads {
199
10
            let shared_vector = shared_vector.share();
200
10
            threads.push(thread::spawn(move || {
201
1000000
                for _ in 0..num_iterations {
202
1000000
                    shared_vector.push(t);
203
1000000
                }
204
10
            }));
205
        }
206

            
207
        // Check whether threads have completed succesfully.
208
10
        for thread in threads {
209
10
            thread.join().unwrap();
210
10
        }
211

            
212
        // Check the vector for some kind of consistency, correct total
213
1
        let mut total = 0;
214
1000000
        for i in 0..shared_vector.len() {
215
1000000
            total += shared_vector[i];
216
1000000
        }
217

            
218
1
        assert_eq!(total, num_threads * (num_threads - 1) * num_iterations / 2);
219
1
        assert_eq!(shared_vector.len(), (num_threads * num_iterations) as usize);
220
1
    }
221
}