1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
use super::{Queue, SyncQueue};
use time;
use std::{mem, ptr, ops, usize, u32};
use std::sync::{Arc, Mutex, MutexGuard, Condvar};
use std::sync::atomic::{self, AtomicUsize, Ordering};

/// A queue in which values are contained by a linked list.
///
/// The current implementation is based on a mutex and two condition variables.
/// It is also mostly a placeholder until a lock-free version is implemented,
/// so it has not been tuned for performance.
pub struct LinkedQueue<T: Send> {
    inner: Arc<QueueInner<T>>,
}

impl<T: Send> LinkedQueue<T> {
    /// Constructs a new, empty `LinkedQueue<T>` with capacity `usize::MAX`.
    pub fn new() -> LinkedQueue<T> {
        LinkedQueue::with_capacity(usize::MAX)
    }

    /// Constructs a new, empty `LinkedQueue<T>` with the specified capacity.
    pub fn with_capacity(capacity: usize) -> LinkedQueue<T> {
        LinkedQueue {
            inner: Arc::new(QueueInner::new(capacity))
        }
    }

    /// Returns the number of elements in the queue.
    pub fn len(&self) -> usize {
        self.inner.len()
    }

    /// Returns `true` if the queue contains no elements.
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Adds the element `e` to the queue if possible.
    ///
    /// # Errors
    ///
    /// A call to `offer` will fail if the queue is full; the provided element
    /// `e` is returned in the `Err` variant.
    pub fn offer(&self, e: T) -> Result<(), T> {
        self.inner.offer(e)
    }

    /// Adds the element `e` to the queue, blocking for up to `ms` milliseconds.
    ///
    /// # Errors
    ///
    /// A call to `offer_ms` will fail if the element cannot be added within the
    /// timeout; the provided element `e` is returned in the `Err` variant.
    pub fn offer_ms(&self, e: T, ms: u32) -> Result<(), T> {
        self.inner.offer_ms(e, ms)
    }

    /// Adds the element `e` to the queue, blocking until it can be added.
    // TODO(soon): decide on documenting panic?
    pub fn put(&self, e: T) {
        self.inner.put(e);
    }

    /// Takes from the queue if there is an element available.
    pub fn poll(&self) -> Option<T> {
        self.inner.poll()
    }

    /// Takes from the queue, blocking for up to `ms` milliseconds.
    pub fn poll_ms(&self, ms: u32) -> Option<T> {
        self.inner.poll_ms(ms)
    }

    /// Takes from the queue, blocking until there is an element available.
    // TODO(soon): decide on documenting panic?
    pub fn take(&self) -> T {
        self.inner.take()
    }
}

impl<T: Send> Queue<T> for LinkedQueue<T> {
    fn poll(&self) -> Option<T> {
        LinkedQueue::poll(self)
    }

    fn is_empty(&self) -> bool {
        LinkedQueue::is_empty(self)
    }

    fn offer(&self, e: T) -> Result<(), T> {
        LinkedQueue::offer(self, e)
    }
}

impl<T: Send> SyncQueue<T> for LinkedQueue<T> {
    fn take(&self) -> T {
        LinkedQueue::take(self)
    }

    fn put(&self, e: T) {
        LinkedQueue::put(self, e)
    }
}

impl<T: Send> Clone for LinkedQueue<T> {
    fn clone(&self) -> LinkedQueue<T> {
        LinkedQueue { inner: self.inner.clone() }
    }
}

//  A variant of the "two lock queue" algorithm.  The putLock gates
//  entry to put (and offer), and has an associated condition for
//  waiting puts.  Similarly for the takeLock.  The "count" field
//  that they both rely on is maintained as an atomic to avoid
//  needing to get both locks in most cases. Also, to minimize need
//  for puts to get takeLock and vice-versa, cascading notifies are
//  used. When a put notices that it has enabled at least one take,
//  it signals taker. That taker in turn signals others if more
//  items have been entered since the signal. And symmetrically for
//  takes signalling puts. Operations such as remove(Object) and
//  iterators acquire both locks.
//
//  Visibility between writers and readers is provided as follows:
//
//  Whenever an element is enqueued, the putLock is acquired and
//  count updated.  A subsequent reader guarantees visibility to the
//  enqueued Node by either acquiring the putLock (via fullyLock)
//  or by acquiring the takeLock, and then reading n = count.get();
//  this gives visibility to the first n items.
//
//  To implement weakly consistent iterators, it appears we need to
//  keep all Nodes GC-reachable from a predecessor dequeued Node.
//  That would cause two problems:
//  - allow a rogue Iterator to cause unbounded memory retention
//  - cause cross-generational linking of old Nodes to new Nodes if
//    a Node was tenured while live, which generational GCs have a
//    hard time dealing with, causing repeated major collections.
//  However, only non-deleted Nodes need to be reachable from
//  dequeued Nodes, and reachability does not necessarily have to
//  be of the kind understood by the GC.  We use the trick of
//  linking a Node that has just been dequeued to itself.  Such a
//  self-link implicitly means to advance to head.next.
struct QueueInner<T: Send> {

    // Maximum number of elements the queue can contain at one time
    capacity: usize,

    // Current number of elements
    count: AtomicUsize,

    // Lock held by take, poll, etc
    head: Mutex<NodePtr<T>>,

    // Lock held by put, offer, etc
    last: Mutex<NodePtr<T>>,

    // Wait queue for waiting takes
    not_empty: Condvar,

    // Wait queue for waiting puts
    not_full: Condvar,
}

impl<T: Send> QueueInner<T> {
    fn new(capacity: usize) -> QueueInner<T> {
        let head = NodePtr::new(Node::empty());

        QueueInner {
            capacity: capacity,
            count: AtomicUsize::new(0),
            head: Mutex::new(head),
            last: Mutex::new(head),
            not_empty: Condvar::new(),
            not_full: Condvar::new(),
        }
    }

    fn len(&self) -> usize {
        self.count.load(Ordering::Relaxed)
    }

    fn put(&self, e: T) {
        self.offer_ms(e, u32::MAX)
            .ok().expect("something went wrong");
    }

    fn offer(&self, e: T) -> Result<(), T> {
        if self.len() == self.capacity {
            return Err(e);
        }

        self.offer_ms(e, 0)
    }

    fn offer_ms(&self, e: T, mut dur: u32) -> Result<(), T> {
        // Acquire the write lock
        let mut last = self.last.lock()
            .ok().expect("something went wrong");

        if self.len() == self.capacity {
            let mut now = time::precise_time_ns();

            loop {
                if dur == 0 {
                    return Err(e);
                }

                last = self.not_full.wait_timeout_ms(last, dur)
                    .ok().expect("something went wrong").0;

                if self.len() != self.capacity {
                    break;
                }

                let n = time::precise_time_ns();
                let d = (n - now) / 1_000_000;

                if d >= dur as u64 {
                    dur = 0;
                } else {
                    dur -= d as u32;
                    now = n;
                }
            }
        }

        // Enqueue the node
        enqueue(Node::new(e), &mut last);

        // Increment the count
        let cnt = self.count.fetch_add(1, Ordering::Release);

        if cnt + 1 < self.capacity {
            self.not_full.notify_one();
        }

        drop(last);

        self.notify_not_empty();

        Ok(())
    }

    fn take(&self) -> T {
        self.poll_ms(u32::MAX)
            .expect("something went wrong")
    }

    fn poll(&self) -> Option<T> {
        if self.len() == 0 {
            // Fast path check
            return None;
        }

        self.poll_ms(0)
    }

    fn poll_ms(&self, mut dur: u32) -> Option<T> {
        // Acquire the read lock
        let mut head = self.head.lock()
            .ok().expect("something went wrong");

        if self.len() == 0 {
            let mut now = time::precise_time_ns();

            loop {
                if dur == 0 {
                    return None;
                }

                head = self.not_empty.wait_timeout_ms(head, dur)
                    .ok().expect("something went wrong").0;

                if self.len() != 0 {
                    break;
                }

                let n = time::precise_time_ns();
                let d = (n - now) / 1_000_000;

                if d >= dur as u64 {
                    dur = 0;
                } else {
                    dur -= d as u32;
                    now = n;
                }
            }
        }

        // Acquire memory from write side
        atomic::fence(Ordering::Acquire);

        // At this point, we are guaranteed to be able to dequeue a value
        let val = dequeue(&mut head);
        let cnt = self.count.fetch_sub(1, Ordering::Relaxed);

        if cnt > 1 {
            self.not_empty.notify_one();
        }

        // Release the lock here so that acquire the write lock does not result
        // in a deadlock
        drop(head);

        if cnt == self.capacity {
            self.notify_not_full();
        }

        Some(val)
    }

    // Signals a waiting put. Called only from take / poll
    fn notify_not_full(&self) {
        let _l = self.last.lock()
            .ok().expect("something went wrong");

        self.not_full.notify_one();
    }

    fn notify_not_empty(&self) {
        let _l = self.head.lock()
            .ok().expect("something went wrong");

        self.not_empty.notify_one();
    }
}

impl<T: Send> Drop for QueueInner<T> {
    fn drop(&mut self) {
        while let Some(_) = self.poll() {
        }
    }
}

fn dequeue<T: Send>(mut head: &mut MutexGuard<NodePtr<T>>) -> T {
    let h = **head;
    let mut first = h.next;
    **head = first;
    h.free();
    first.item.take().expect("item already consumed")
}

fn enqueue<T: Send>(node: Node<T>, mut last: &mut MutexGuard<NodePtr<T>>) {
    let ptr = NodePtr::new(node);

    last.next = ptr;
    **last = ptr;
}

struct Node<T: Send> {
    next: NodePtr<T>,
    item: Option<T>,
}

impl<T: Send> Node<T> {
    fn new(val: T) -> Node<T> {
        Node {
            next: NodePtr::null(),
            item: Some(val),
        }
    }

    fn empty() -> Node<T> {
        Node {
            next: NodePtr::null(),
            item: None,
        }
    }
}

struct NodePtr<T: Send> {
    ptr: *mut Node<T>,
}

impl<T: Send> NodePtr<T> {
    fn new(node: Node<T>) -> NodePtr<T> {
        NodePtr { ptr: unsafe { mem::transmute(Box::new(node)) }}
    }

    fn null() -> NodePtr<T> {
        NodePtr { ptr: ptr::null_mut() }
    }

    fn free(self) {
        let NodePtr { ptr } = self;
        let _: Box<Node<T>> = unsafe { mem::transmute(ptr) };
    }
}

impl<T: Send> ops::Deref for NodePtr<T> {
    type Target = Node<T>;

    fn deref(&self) -> &Node<T> {
        unsafe { mem::transmute(self.ptr) }
    }
}

impl<T: Send> ops::DerefMut for NodePtr<T> {
    fn deref_mut(&mut self) -> &mut Node<T> {
        unsafe { mem::transmute(self.ptr) }
    }
}

impl<T: Send> Clone for NodePtr<T> {
    fn clone(&self) -> NodePtr<T> {
        NodePtr { ptr: self.ptr }
    }
}

impl<T: Send> Copy for NodePtr<T> {}
unsafe impl<T: Send> Send for NodePtr<T> {}