cargo : futures-util @ 0.3.32
PE Patrick Elsen signed 2026-05-28 published 2026-05-28

Claims

concurrency-documentedconcurrency-impl-correctconcurrency-impl-documentedconcurrency-impl-safeconcurrency-impl-testedconcurrency-safedatastructure-impl-boundsdatastructure-impl-correctdatastructure-impl-safedatastructure-impl-testedhas-binarieshas-build-exechas-fuzz-testshas-install-exechas-integration-testshas-property-testshas-unit-testsimpl-algorithmimpl-concurrencyimpl-cryptoimpl-datastructureimpl-interpreterimpl-jitimpl-parserimpl-protocolis-benignunsafe-documentedunsafe-minimalunsafe-safeunsafe-testeduses-concurrencyuses-cryptouses-environmentuses-execuses-filesystemuses-interpreteruses-jituses-networkuses-unsafe

Summary

futures-util 0.3.32: combinators for Future/Stream/Sink, FuturesUnordered, BiLock, async Mutex, IO adapters, and futures-0.1 compat. The 135 unsafe blocks concentrate in Pin projection (MaybeDone, JoinAll, Shared), FuturesUnordered's lock-free task list and MPSC ready queue, and BiLock's AtomicPtr state machine. All reviewed invariants hold. No loom or sanitizer testing exists for concurrent primitives (concurrency-impl-tested, unsafe-tested). No binaries, build scripts, or network/filesystem/exec access.

Report

Subject

futures-util is the main utility layer of the futures-rs ecosystem. It provides combinators for Future (map, then, flatten, select, join, shared), combinators for Stream and Sink, the FuturesUnordered concurrent task set, the FuturesOrdered stream, async-aware locking primitives (BiLock, Mutex), IO adapters wrapping AsyncRead/AsyncWrite (buffering, read-to-end, lines, copy, split), and compatibility shims between futures 0.1 and 0.3 APIs. The crate is approximately 26 000 lines of Rust across ~180 files with 135 unsafe occurrences.

Methodology

The published .crate contents were compared against the upstream Git repository at the commit recorded in .cargo_vcs_info.json using diff. Differences are limited to standard cargo manifest normalisation and the presence of a generated Cargo.lock and .cargo_vcs_info.json in the published artifact — no code-file divergence was found.

All 22 files containing unsafe were identified with grep -rn "unsafe" src/ -l and read in full. The review focused on:

  1. Pin projection soundness: pin_mut!, MaybeDone, TryMaybeDone, JoinAll/iter_pin_mut, BufWriter::write_to_buffer_unchecked, the Shared future, and FlattenUnordered (via pin_project_lite).
  2. FuturesUnordered internals: the intrusive doubly-linked all-tasks list, the 1024cores MPSC ready-to-run queue (ReadyToRunQueue), Task Send/Sync impl, the custom waker_ref bypassing 'static, and the Bomb panic-safety guard.
  3. BiLock synchronisation: the AtomicPtr<Waker> state machine and Box::from_raw discipline.
  4. Mutex and MappedMutexGuard: UnsafeCell access under lock, mem::forget discipline in MutexGuard::map.
  5. The futures 0.1 compat layer: UnsafeNotify01 implementation and Box::from_raw on waker pointers.

The repository VCS checkout was used to survey CI configuration and test coverage. has-unit-tests: inline unit tests are present in src/lock/mutex.rs. has-integration-tests is false; has-fuzz-tests is false; has-property-tests is false. The futures workspace has integration-style tests under futures/tests/ covering FuturesUnordered, Mutex, and IO combinators, but these are part of a separate crate. No loom or sanitizer-based testing was found for the concurrent primitives, so unsafe-tested and concurrency-impl-tested are false. grep was used for pattern searches.

Results

The published crate contents match the VCS at the recorded commit. No binary artifacts are present, justifying has-binaries. There is no build.rs, justifying has-build-exec and has-install-exec. The crate is is-benign.

The codebase makes no network or filesystem calls and does not spawn child processes or read environment variables, justifying uses-network, uses-filesystem, uses-exec, uses-environment, uses-crypto, uses-jit, uses-interpreter. It uses-concurrency (atomic primitives, Arc, Mutex) and uses-unsafe.

The crate implements no cryptographic algorithms, parsers, protocols, interpreters, or JIT compilers, justifying impl-crypto, impl-parser, impl-protocol, impl-interpreter, impl-jit, impl-algorithm. It does implement concurrency primitives (impl-concurrency: FuturesUnordered, BiLock, Mutex, Shared) and a data structure (impl-datastructure: FuturesUnordered's intrusive linked list and MPSC queue). The concurrency API surface is documented per type with thread-safety guarantees, justifying concurrency-documented and concurrency-impl-documented. The public API correctly documents Send/Sync bounds, justifying concurrency-safe. Operations on the data structures meet their documented complexity, justifying datastructure-impl-bounds. datastructure-impl-tested and concurrency-impl-tested are false (no loom/sanitizer coverage).

Pin projection (unsafe-safe, unsafe-documented, unsafe-minimal): All Pin::new_unchecked and get_unchecked_mut calls were reviewed. In MaybeDone/TryMaybeDone, the enum is self-pinned: the Future variant is structurally pinned, while Done and Gone hold Unpin data and are safe to replace via mem::replace after get_unchecked_mut. The JoinAll::iter_pin_mut helper correctly projects a Pin<&mut [T]> into per-element Pin<&mut T>, relying on the slice itself not being moved (it is heap-allocated in Box). BufWriter uses get_unchecked_mut only within pin_project_lite-generated projections and a bounds-checked write_to_buffer_unchecked helper with a debug assertion. All uses carry inline safety comments. The pin_mut! macro follows the standard shadow-binding stack-pinning pattern. Overall, unsafe-safe, unsafe-documented, and unsafe-minimal hold.

FuturesUnordered (impl-datastructure, impl-concurrency, concurrency-impl-safe, datastructure-impl-safe, datastructure-impl-correct, concurrency-impl-correct): The all-tasks list is a lock-free singly-linked list accessed with AcqRel swaps on head_all. A pending_next_all sentinel (the stub node pointer) allows insertion to proceed in two steps — the spin_next_all loop handles the window where next_all has not been written yet. The ready-to-run queue is the classic 1024cores MPSC algorithm with AcqRel/Release/Acquire orderings matching the reference. dequeue requires exclusive access, enforced by the &mut Self receiver on poll_next. The Task Send/Sync impls are sound because the future inside UnsafeCell is only accessed from the FuturesUnordered owner thread. The custom waker_ref forgoing 'static is safe: the Arc keeping Task alive is owned by the FuturesUnordered list and the Bomb guard, both of which outlive the waker. Panic safety is handled via the Bomb drop guard. The FIXME comment on enqueue (missing explicit safety preconditions) is recorded as FINDING-1.

BiLock (concurrency-impl-safe, concurrency-impl-correct): The AtomicPtr<Waker> state machine correctly tracks three states (null, 1, heap pointer). Box::from_raw is always called on the previous pointer when swapping in a new waker, preventing leaks. The invalid_ptr helper creates a strict-provenance-compatible sentinel.

Shared (concurrency-impl-safe): The UnsafeCell<FutureOrOutput<Fut>> is protected by the POLLING atomic state that allows at most one concurrent poll. The Reset guard sets POISONED on panic, after which no further access to the cell occurs.

One quality finding (FINDING-1) was recorded for the missing safety preconditions on enqueue.

Conclusion

The unsafe code in this crate is voluminous but structurally sound. The Pin-projection patterns are standard and correctly applied throughout. FuturesUnordered's lock-free implementation follows a well-known algorithm with appropriate memory orderings. BiLock and Mutex are straightforward with correct UnsafeCell discipline. The main gap is the absence of loom or sanitizer-based concurrency testing for the concurrent primitives (unsafe-tested, concurrency-impl-tested).

Findings(1)

FINDING-1 quality low

enqueue missing safety preconditions

The enqueue function in src/stream/futures_unordered/ready_to_run_queue.rs carries a FIXME comment at line 30 noting that it takes a raw pointer without documented safety conditions. The caller (both the push path in FuturesUnordered and the stub re-enqueue inside dequeue) upholds the required invariant that the pointer refers to a live, queued Task, but this is not stated in a # Safety comment on the function. This is a minor documentation gap given that the function is pub(super) and the module is well-contained.

Annotations(7)

src/future/future/shared.rs

src/future/future/shared.rs, line 261-403

impl<Fut> Future for Shared<Fut>
where
    Fut: Future,
    Fut::Output: Clone,
{
    type Output = Fut::Output;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = &mut *self;

        let inner = this.inner.take().expect("Shared future polled again after completion");

        // Fast path for when the wrapped future has already completed
        if inner.notifier.state.load(Acquire) == COMPLETE {
            // Safety: We're in the COMPLETE state
            return unsafe { Poll::Ready(inner.take_or_clone_output()) };
        }

        inner.record_waker(&mut this.waker_key, cx);

        match inner
            .notifier
            .state
            .compare_exchange(IDLE, POLLING, SeqCst, SeqCst)
            .unwrap_or_else(|x| x)
        {
            IDLE => {
                // Lock acquired, fall through
            }
            POLLING => {
                // Another task is currently polling, at this point we just want
                // to ensure that the waker for this task is registered
                this.inner = Some(inner);
                return Poll::Pending;
            }
            COMPLETE => {
                // Safety: We're in the COMPLETE state
                return unsafe { Poll::Ready(inner.take_or_clone_output()) };
            }
            POISONED => panic!("inner future panicked during poll"),
            _ => unreachable!(),
        }

        let waker = waker_ref(&inner.notifier);
        let mut cx = Context::from_waker(&waker);

        struct Reset<'a> {
            state: &'a AtomicUsize,
            did_not_panic: bool,
        }

        impl Drop for Reset<'_> {
            fn drop(&mut self) {
                if !self.did_not_panic {
                    self.state.store(POISONED, SeqCst);
                }
            }
        }

        let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false };

        let output = {
            let future = unsafe {
                match &mut *inner.future_or_output.get() {
                    FutureOrOutput::Future(fut) => Pin::new_unchecked(fut),
                    _ => unreachable!(),
                }
            };

            let poll_result = future.poll(&mut cx);
            reset.did_not_panic = true;

            match poll_result {
                Poll::Pending => {
                    if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok()
                    {
                        // Success
                        drop(reset);
                        this.inner = Some(inner);
                        return Poll::Pending;
                    } else {
                        unreachable!()
                    }
                }
                Poll::Ready(output) => output,
            }
        };

        unsafe {
            *inner.future_or_output.get() = FutureOrOutput::Output(output);
        }

        inner.notifier.state.store(COMPLETE, SeqCst);

        // Wake all tasks and drop the slab
        #[cfg(feature = "std")]
        let mut wakers_guard = inner.notifier.wakers.lock().unwrap();
        #[cfg(not(feature = "std"))]
        let mut wakers_guard = inner.notifier.wakers.lock();

        let mut wakers = wakers_guard.take().unwrap();
        for waker in wakers.drain().flatten() {
            waker.wake();
        }

        drop(reset); // Make borrow checker happy
        drop(wakers_guard);

        // Safety: We're in the COMPLETE state
        unsafe { Poll::Ready(inner.take_or_clone_output()) }
    }
}

impl<Fut> Clone for Shared<Fut>
where
    Fut: Future,
{
    fn clone(&self) -> Self {
        Self { inner: self.inner.clone(), waker_key: NULL_WAKER_KEY }
    }
}

impl<Fut> Drop for Shared<Fut>
where
    Fut: Future,
{
    fn drop(&mut self) {
        if self.waker_key != NULL_WAKER_KEY {
            if let Some(ref inner) = self.inner {
                #[cfg(feature = "std")]
                if let Ok(mut wakers) = inner.notifier.wakers.lock() {
                    if let Some(wakers) = wakers.as_mut() {
                        wakers.remove(self.waker_key);
                    }
                }
                #[cfg(not(feature = "std"))]
                if let Some(wakers) = inner.notifier.wakers.lock().as_mut() {
                    wakers.remove(self.waker_key);
                }
            }
        }
    }
}

Shared uses UnsafeCell<FutureOrOutput> to allow multiple clones to race on polling. The POLLING atomic state guarantees at most one task polls the future at a time. Pin::new_unchecked is used on the future inside UnsafeCell; the future is never moved after creation. The Reset guard sets POISONED on panic, preventing further access. Justifies impl-concurrency and concurrency-impl-safe.

src/future/maybe_done.rs

src/future/maybe_done.rs, line 54-105

    pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
        unsafe {
            match self.get_unchecked_mut() {
                Self::Done(res) => Some(res),
                _ => None,
            }
        }
    }

    /// Attempt to take the output of a `MaybeDone` without driving it
    /// towards completion.
    #[inline]
    pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
        match &*self {
            Self::Done(_) => {}
            Self::Future(_) | Self::Gone => return None,
        }
        unsafe {
            match mem::replace(self.get_unchecked_mut(), Self::Gone) {
                Self::Done(output) => Some(output),
                _ => unreachable!(),
            }
        }
    }
}

impl<Fut: Future> FusedFuture for MaybeDone<Fut> {
    fn is_terminated(&self) -> bool {
        match self {
            Self::Future(_) => false,
            Self::Done(_) | Self::Gone => true,
        }
    }
}

impl<Fut: Future> Future for MaybeDone<Fut> {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        unsafe {
            match self.as_mut().get_unchecked_mut() {
                Self::Future(f) => {
                    let res = ready!(Pin::new_unchecked(f).poll(cx));
                    self.set(Self::Done(res));
                }
                Self::Done(_) => {}
                Self::Gone => panic!("MaybeDone polled after value taken"),
            }
        }
        Poll::Ready(())
    }
}

MaybeDone uses get_unchecked_mut to access the pinned enum interior. This is the standard pattern for pinned enums without pin_project: the Future variant must not be moved once pinned, and the Done/Gone variants are always Unpin. The code carefully uses self.set() (which calls drop-in-place followed by ptr::write, never moves) to transition states. Justifies uses-unsafe and unsafe-safe.

src/lock/bilock.rs

src/lock/bilock.rs, line 90-191

    pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> {
        let mut waker = None;
        loop {
            let n = self.arc.state.swap(invalid_ptr(1), SeqCst);
            match n as usize {
                // Woohoo, we grabbed the lock!
                0 => return Poll::Ready(BiLockGuard { bilock: self }),

                // Oops, someone else has locked the lock
                1 => {}

                // A task was previously blocked on this lock, likely our task,
                // so we need to update that task.
                _ => unsafe {
                    let mut prev = Box::from_raw(n);
                    *prev = cx.waker().clone();
                    waker = Some(prev);
                },
            }

            // type ascription for safety's sake!
            let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone()));
            let me = Box::into_raw(me);

            match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) {
                // The lock is still locked, but we've now parked ourselves, so
                // just report that we're scheduled to receive a notification.
                Ok(_) => return Poll::Pending,

                // Oops, looks like the lock was unlocked after our swap above
                // and before the compare_exchange. Deallocate what we just
                // allocated and go through the loop again.
                Err(n) if n.is_null() => unsafe {
                    waker = Some(Box::from_raw(me));
                },

                // The top of this loop set the previous state to 1, so if we
                // failed the CAS above then it's because the previous value was
                // *not* zero or one. This indicates that a task was blocked,
                // but we're trying to acquire the lock and there's only one
                // other reference of the lock, so it should be impossible for
                // that task to ever block itself.
                Err(n) => panic!("invalid state: {}", n as usize),
            }
        }
    }

    /// Perform a "blocking lock" of this lock, consuming this lock handle and
    /// returning a future to the acquired lock.
    ///
    /// This function consumes the `BiLock<T>` and returns a sentinel future,
    /// `BiLockAcquire<T>`. The returned future will resolve to
    /// `BiLockGuard<T>`.
    ///
    /// Note that the returned future will never resolve to an error.
    #[cfg(feature = "bilock")]
    #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
    pub fn lock(&self) -> BiLockAcquire<'_, T> {
        BiLockAcquire { bilock: self }
    }

    /// Returns `true` only if the other `BiLock<T>` originated from the same call to `BiLock::new`.
    pub fn is_pair_of(&self, other: &Self) -> bool {
        Arc::ptr_eq(&self.arc, &other.arc)
    }

    /// Attempts to put the two "halves" of a `BiLock<T>` back together and
    /// recover the original value. Succeeds only if the two `BiLock<T>`s
    /// originated from the same call to `BiLock::new`.
    pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>>
    where
        T: Unpin,
    {
        if self.is_pair_of(&other) {
            drop(other);
            let inner = Arc::try_unwrap(self.arc)
                .ok()
                .expect("futures: try_unwrap failed in BiLock<T>::reunite");
            Ok(unsafe { inner.into_value() })
        } else {
            Err(ReuniteError(self, other))
        }
    }

    fn unlock(&self) {
        let n = self.arc.state.swap(ptr::null_mut(), SeqCst);
        match n as usize {
            // we've locked the lock, shouldn't be possible for us to see an
            // unlocked lock.
            0 => panic!("invalid unlocked state"),

            // Ok, no one else tried to get the lock, we're done.
            1 => {}

            // Another task has parked themselves on this lock, let's wake them
            // up as its now their turn.
            _ => unsafe {
                Box::from_raw(n).wake();
            },
        }
    }
}

BiLock poll_lock uses AtomicPtr to represent three states: null (unlocked), 1 (locked, no waiter), or a heap-allocated Box pointer. The unlock path drops the Box via Box::from_raw. The safety invariant is that exactly one owner holds state=1 at a time and the two-owner design is enforced by the API (only two handles ever exist). Justifies impl-concurrency and concurrency-impl-safe.

src/macros.rs

src/macros.rs, line 20-31

macro_rules! pin_mut {
    ($($x:ident),* $(,)?) => { $(
        // Move the value to ensure that it is owned
        let mut $x = $x;
        // Shadow the original binding so that it can't be directly accessed
        // ever again.
        #[allow(unused_mut)]
        let mut $x = unsafe {
            $crate::__private::Pin::new_unchecked(&mut $x)
        };
    )* }
}

pin_mut! macro: uses Pin::new_unchecked on a locally-bound &mut reference after shadow-binding ensures the original binding is inaccessible. This is the canonical safe stack-pinning pattern. Justifies uses-unsafe.

src/stream/futures_unordered/mod.rs

src/stream/futures_unordered/mod.rs, line 400-554

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let len = self.len();

        // Keep track of how many child futures we have polled,
        // in case we want to forcibly yield.
        let mut polled = 0;
        let mut yielded = 0;

        // Ensure `parent` is correctly set.
        self.ready_to_run_queue.waker.register(cx.waker());

        loop {
            // Safety: &mut self guarantees the mutual exclusion `dequeue`
            // expects
            let task = match unsafe { self.ready_to_run_queue.dequeue() } {
                Dequeue::Empty => {
                    if self.is_empty() {
                        // We can only consider ourselves terminated once we
                        // have yielded a `None`
                        *self.is_terminated.get_mut() = true;
                        return Poll::Ready(None);
                    } else {
                        return Poll::Pending;
                    }
                }
                Dequeue::Inconsistent => {
                    // At this point, it may be worth yielding the thread &
                    // spinning a few times... but for now, just yield using the
                    // task system.
                    cx.waker().wake_by_ref();
                    return Poll::Pending;
                }
                Dequeue::Data(task) => task,
            };

            debug_assert!(task != self.ready_to_run_queue.stub());

            // Safety:
            // - `task` is a valid pointer.
            // - We are the only thread that accesses the `UnsafeCell` that
            //   contains the future
            let future = match unsafe { &mut *(*task).future.get() } {
                Some(future) => future,

                // If the future has already gone away then we're just
                // cleaning out this task. See the comment in
                // `release_task` for more information, but we're basically
                // just taking ownership of our reference count here.
                None => {
                    // This case only happens when `release_task` was called
                    // for this task before and couldn't drop the task
                    // because it was already enqueued in the ready to run
                    // queue.

                    // Safety: `task` is a valid pointer
                    let task = unsafe { Arc::from_raw(task) };

                    // Double check that the call to `release_task` really
                    // happened. Calling it required the task to be unlinked.
                    debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all());
                    unsafe {
                        debug_assert!((*task.prev_all.get()).is_null());
                    }
                    continue;
                }
            };

            // Safety: `task` is a valid pointer
            let task = unsafe { self.unlink(task) };

            // Unset queued flag: This must be done before polling to ensure
            // that the future's task gets rescheduled if it sends a wake-up
            // notification **during** the call to `poll`.
            let prev = task.queued.swap(false, SeqCst);
            assert!(prev);

            // We're going to need to be very careful if the `poll`
            // method below panics. We need to (a) not leak memory and
            // (b) ensure that we still don't have any use-after-frees. To
            // manage this we do a few things:
            //
            // * A "bomb" is created which if dropped abnormally will call
            //   `release_task`. That way we'll be sure the memory management
            //   of the `task` is managed correctly. In particular
            //   `release_task` will drop the future. This ensures that it is
            //   dropped on this thread and not accidentally on a different
            //   thread (bad).
            // * We unlink the task from our internal queue to preemptively
            //   assume it'll panic, in which case we'll want to discard it
            //   regardless.
            struct Bomb<'a, Fut> {
                queue: &'a mut FuturesUnordered<Fut>,
                task: Option<Arc<Task<Fut>>>,
            }

            impl<Fut> Drop for Bomb<'_, Fut> {
                fn drop(&mut self) {
                    if let Some(task) = self.task.take() {
                        self.queue.release_task(task);
                    }
                }
            }

            let mut bomb = Bomb { task: Some(task), queue: &mut *self };

            // Poll the underlying future with the appropriate waker
            // implementation. This is where a large bit of the unsafety
            // starts to stem from internally. The waker is basically just
            // our `Arc<Task<Fut>>` and can schedule the future for polling by
            // enqueuing itself in the ready to run queue.
            //
            // Critically though `Task<Fut>` won't actually access `Fut`, the
            // future, while it's floating around inside of wakers.
            // These structs will basically just use `Fut` to size
            // the internal allocation, appropriately accessing fields and
            // deallocating the task if need be.
            let res = {
                let task = bomb.task.as_ref().unwrap();
                // We are only interested in whether the future is awoken before it
                // finishes polling, so reset the flag here.
                task.woken.store(false, Relaxed);
                // SAFETY: see the comments of Bomb and this block.
                let waker = unsafe { Task::waker_ref(task) };
                let mut cx = Context::from_waker(&waker);

                // Safety: We won't move the future ever again
                let future = unsafe { Pin::new_unchecked(future) };

                future.poll(&mut cx)
            };
            polled += 1;

            match res {
                Poll::Pending => {
                    let task = bomb.task.take().unwrap();
                    // If the future was awoken during polling, we assume
                    // the future wanted to explicitly yield.
                    yielded += task.woken.load(Relaxed) as usize;
                    bomb.queue.link(task);

                    // If a future yields, we respect it and yield here.
                    // If all futures have been polled, we also yield here to
                    // avoid starving other tasks waiting on the executor.
                    // (polling the same future twice per iteration may cause
                    // the problem: https://github.com/rust-lang/futures-rs/pull/2333)
                    if yielded >= 2 || polled == len {
                        cx.waker().wake_by_ref();
                        return Poll::Pending;
                    }
                    continue;
                }
                Poll::Ready(output) => return Poll::Ready(Some(output)),
            }
        }
    }

Core poll_next implementation for FuturesUnordered. Uses Pin::new_unchecked on futures extracted from UnsafeCell; safe because the task (and future inside it) is heap-allocated via Arc and never moved after insertion. The Bomb struct ensures the future is dropped on the owning thread even if a poll panics. Mutual exclusion for dequeue is upheld by the &mut self receiver. Justifies uses-unsafe and impl-datastructure.

src/stream/futures_unordered/ready_to_run_queue.rs

src/stream/futures_unordered/ready_to_run_queue.rs, line 29-92

impl<Fut> ReadyToRunQueue<Fut> {
    // FIXME: this takes raw pointer without safety conditions.

    /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
    pub(super) fn enqueue(&self, task: *const Task<Fut>) {
        unsafe {
            debug_assert!((*task).queued.load(Relaxed));

            // This action does not require any coordination
            (*task).next_ready_to_run.store(ptr::null_mut(), Relaxed);

            // Note that these atomic orderings come from 1024cores
            let task = task as *mut _;
            let prev = self.head.swap(task, AcqRel);
            (*prev).next_ready_to_run.store(task, Release);
        }
    }

    /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
    ///
    /// Note that this is unsafe as it required mutual exclusion (only one
    /// thread can call this) to be guaranteed elsewhere.
    pub(super) unsafe fn dequeue(&self) -> Dequeue<Fut> {
        unsafe {
            let mut tail = *self.tail.get();
            let mut next = (*tail).next_ready_to_run.load(Acquire);

            if tail == self.stub() {
                if next.is_null() {
                    return Dequeue::Empty;
                }

                *self.tail.get() = next;
                tail = next;
                next = (*next).next_ready_to_run.load(Acquire);
            }

            if !next.is_null() {
                *self.tail.get() = next;
                debug_assert!(tail != self.stub());
                return Dequeue::Data(tail);
            }

            if !core::ptr::eq(self.head.load(Acquire), tail) {
                return Dequeue::Inconsistent;
            }

            self.enqueue(self.stub());

            next = (*tail).next_ready_to_run.load(Acquire);

            if !next.is_null() {
                *self.tail.get() = next;
                return Dequeue::Data(tail);
            }

            Dequeue::Inconsistent
        }
    }

    pub(super) fn stub(&self) -> *const Task<Fut> {
        Arc::as_ptr(&self.stub)
    }
}

Implementation of the 1024cores MPSC intrusive queue algorithm used as the ready-to-run queue. enqueue is called from multiple threads (wake is called from any thread); dequeue requires external mutual exclusion (satisfied by &mut FuturesUnordered). Memory orderings (AcqRel on head swap, Release on next_ready_to_run write, Acquire on load) follow the published algorithm. Justifies impl-concurrency and concurrency-impl-safe.

src/stream/futures_unordered/ready_to_run_queue.rs, line 29-45

impl<Fut> ReadyToRunQueue<Fut> {
    // FIXME: this takes raw pointer without safety conditions.

    /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
    pub(super) fn enqueue(&self, task: *const Task<Fut>) {
        unsafe {
            debug_assert!((*task).queued.load(Relaxed));

            // This action does not require any coordination
            (*task).next_ready_to_run.store(ptr::null_mut(), Relaxed);

            // Note that these atomic orderings come from 1024cores
            let task = task as *mut _;
            let prev = self.head.swap(task, AcqRel);
            (*prev).next_ready_to_run.store(task, Release);
        }
    }

The FIXME at line 30 notes missing safety preconditions on enqueue. See FINDING-1.

src/stream/futures_unordered/task.rs

src/stream/futures_unordered/task.rs, line 46-75

unsafe impl<Fut> Send for Task<Fut> {}
unsafe impl<Fut> Sync for Task<Fut> {}

impl<Fut> ArcWake for Task<Fut> {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let inner = match arc_self.ready_to_run_queue.upgrade() {
            Some(inner) => inner,
            None => return,
        };

        arc_self.woken.store(true, Relaxed);

        // It's our job to enqueue this task it into the ready to run queue. To
        // do this we set the `queued` flag, and if successful we then do the
        // actual queueing operation, ensuring that we're only queued once.
        //
        // Once the task is inserted call `wake` to notify the parent task,
        // as it'll want to come along and run our task later.
        //
        // Note that we don't change the reference count of the task here,
        // we merely enqueue the raw pointer. The `FuturesUnordered`
        // implementation guarantees that if we set the `queued` flag that
        // there's a reference count held by the main `FuturesUnordered` queue
        // still.
        let prev = arc_self.queued.swap(true, SeqCst);
        if !prev {
            inner.enqueue(Arc::as_ptr(arc_self));
            inner.waker.wake();
        }
    }

unsafe impl Send/Sync for Task: Task never lets outside threads access the UnsafeCell<Option>; future access is gated through the owning FuturesUnordered. The custom waker_ref bypasses the 'static bound on futures_task::waker_ref; caller (FuturesUnordered) ensures the Arc lives for the duration of the waker's use. Justifies uses-unsafe and concurrency-impl-safe.