cargo / moka / audit
cargo : moka @ 0.12.15
PE Patrick Elsen signed 2026-05-28 published 2026-05-28

Claims

algorithm-impl-boundsalgorithm-impl-correctalgorithm-impl-safealgorithm-impl-testedconcurrency-documentedconcurrency-impl-correctconcurrency-impl-documentedconcurrency-impl-safeconcurrency-impl-testedconcurrency-safedatastructure-impl-boundsdatastructure-impl-correctdatastructure-impl-safedatastructure-impl-testedhas-binarieshas-build-exechas-fuzz-testshas-install-exechas-integration-testshas-property-testshas-unit-testsimpl-algorithmimpl-concurrencyimpl-cryptoimpl-datastructureimpl-interpreterimpl-jitimpl-parserimpl-protocolis-benignunsafe-documentedunsafe-minimalunsafe-safeunsafe-testeduses-concurrencyuses-cryptouses-environmentuses-execuses-filesystemuses-interpreteruses-jituses-networkuses-unsafe

Summary

moka 0.12.15 is a W-TinyLFU concurrent cache with sync and async variants; 141 unsafe sites span an intrusive LRU deque, a custom MiniArc, and a lock-free epoch-reclaimed hash table. One low-severity correctness finding: the is_dirty dirty-entry check reads two atomics with Relaxed ordering before an Acquire fence, imprecisely ordering the loads. No I/O, no FFI, no build-time execution.

Report

Subject

moka 0.12.15 is a high-performance in-memory cache for Rust, inspired by Java's Caffeine. It exposes two public cache types: sync::Cache (thread-safe, backed by parking_lot and std::thread) and future::Cache (async-aware, compatible with tokio and actix-rt). An optional sync::SegmentedCache spreads keys across multiple internal Cache shards for higher update throughput. The eviction policy implements W-TinyLFU: a Window/Probation/Protected LRU hierarchy with frequency estimation via a 4-bit Count-Min Sketch (ported from Caffeine). Per-entry TTL and TTI are maintained through a hierarchical timer wheel. The crate is ~35 KLOC across 51 source files and carries 141 unsafe sites.

Methodology

Audit performed with openvet 0.6.0. VCS state verified via diff -rq contents vcs: only Cargo.toml normalization, publish exclusions (.github, CHANGELOG.md, examples, tests/compile_tests), and the Cargo.lock differ. No source file divergence was found.

Source surveyed with ripgrep. Files containing unsafe were read in full: src/common/deque.rs, src/common/concurrent/arc.rs, src/common/concurrent/deques.rs, src/common/concurrent.rs, src/common/concurrent/entry_info.rs, src/common/timer_wheel.rs, src/cht/map/bucket.rs, src/cht/map/bucket_array_ref.rs, src/cht/segment.rs, src/common/iter.rs, src/sync/cache.rs (Send/Sync impls), src/future/cache.rs (Send/Sync impls), src/sync/segment.rs. The FrequencySketch, AtomicInstant, and policy modules were read in full. The two base_cache.rs files (3362 and 3602 lines) were read at the sections exercising unsafe code and the deque/timer-wheel integration.

Results

The published tarball matches VCS byte-for-byte on all source files, establishing is-benign. No FFI, no network I/O, no filesystem access, no process spawning, and no environment reads appear in library code; all such references are in documentation examples. This establishes uses-network, uses-filesystem, uses-exec, uses-environment, uses-crypto, uses-interpreter, and uses-jit as false.

The crate carries no binaries, no build script, and no install targets (has-binaries, has-build-exec, has-install-exec). Unit tests are present throughout src/ and 7 integration test files are declared (has-unit-tests, has-integration-tests). No fuzz corpus or property-test harness was found (has-fuzz-tests, has-property-tests). The FrequencySketch module includes Kani formal-verification proofs (gated on #[cfg(kani)]), which together with the inline unit tests establish algorithm-impl-tested, datastructure-impl-tested, concurrency-impl-tested, and unsafe-tested.

The codebase makes pervasive use of unsafe (uses-unsafe) and implements concurrency primitives (uses-concurrency, impl-concurrency). The main unsafe regions are:

Intrusive deque (src/common/deque.rs): A doubly-linked list with NonNull<DeqNode<T>> pointers. All mutating methods (move_to_back, unlink, unlink_and_drop) are unsafe fn, and callers hold NonNull pointers obtained from push_back. Access is always serialized by parking_lot::Mutex in the housekeeper, so no concurrent mutation occurs. The Drop impl uses a DropGuard pattern to handle panicking element destructors. Invariants are documented in comments. These facts establish unsafe-safe, unsafe-documented, unsafe-minimal, datastructure-impl-safe, and datastructure-impl-correct.

MiniArc (src/common/concurrent/arc.rs): A std::sync::Arc variant without weak references, using AtomicU32 for the reference count. The clone/drop protocol (Relaxed increment, Release decrement, Acquire fence before deallocation) matches the standard Arc pattern cited in comments (Mara Bos's book, triomphe, std Arc). A loom model exercises the concurrent drop path. Bounds on T: Send + Sync are the same as std::sync::Arc. These facts establish concurrency-impl-safe, concurrency-impl-correct, concurrency-impl-documented, and concurrency-documented.

Concurrent hash table (src/cht/map/bucket.rs, bucket_array_ref.rs, segment.rs): Lock-free open-addressing table using crossbeam_epoch for deferred reclamation. Pointers to Bucket<K,V> encode three tag bits in the low bits; Bucket carries #[repr(align(8))] to guarantee the low bits are zero. All CAS operations use AcqRel/Relaxed ordering. Deferred destructors insert an Acquire fence before dropping the value, ensuring visibility. Rehashing is serialized via a per-array Mutex. These facts establish datastructure-impl-bounds and datastructure-impl-correct.

Timer wheel and entry lifecycle (src/common/timer_wheel.rs, src/common/concurrent/entry_info.rs): Timer nodes store an expiry_gen that is compared against EntryInfo::expiration_state() before operating on any raw pointer, preventing use-after-free on stale pointers. The expiration state packs a 52-bit timestamp and 12-bit generation counter into a single AtomicU64, updated atomically via CAS loop. The W-TinyLFU algorithm bounds access-frequency counters at 15 and resets on overflow, with the reset logic ported from Caffeine. These facts establish algorithm-impl-bounds, algorithm-impl-safe, and algorithm-impl-correct.

Send/Sync impls: Manual unsafe impl Send for Deques<K>, DeqNodes<K>, and TimerWheel<K> are justified because these types hold NonNull raw pointers (not Send by default) but are accessed exclusively under the housekeeper mutex. Cache<K,V,S> and SegmentedCache<K,V,S> implement Send and Sync with K: Send+Sync, V: Send+Sync bounds matching Arc semantics.

One low-severity correctness finding was raised: the is_dirty method in EntryInfo reads entry_gen and policy_gen with Relaxed ordering, then inserts an Acquire fence. The fence comes after the loads and does not guarantee that the Relaxed reads observe the latest values written by other threads. The consequence is bounded: a false-positive dirty result causes an eviction candidate to be skipped conservatively.

The crate implements the W-TinyLFU eviction algorithm (impl-algorithm) and an intrusive deque data structure (impl-datastructure). No cryptographic operations are performed and no parsers are implemented (impl-crypto, impl-parser, impl-protocol, impl-interpreter, impl-jit).

Conclusion

moka 0.12.15 implements a W-TinyLFU concurrent cache with 141 unsafe sites concentrated in the intrusive LRU deques, a custom MiniArc reference counter, and a lock-free epoch-reclaimed concurrent hash table. The unsafe regions carry correctness arguments in comments or by reference to cited sources (Mara Bos's Rust Atomics and Locks, Caffeine). One low-severity correctness finding documents an imprecise atomic ordering in the is_dirty dirty-entry heuristic. No I/O, no FFI, no build-time execution, and no malicious or obfuscated code was found. The published tarball is byte-for-byte consistent with the VCS source at the audited commit.

Findings(1)

FINDING-1 correctness low

is_dirty uses Relaxed loads before Acquire fence, ordering imprecise

In src/common/concurrent/entry_info.rs:80-85, the is_dirty method performs two Relaxed-ordered atomic loads (of entry_gen and policy_gen) and then inserts an atomic::fence(Ordering::Acquire). The Acquire fence comes after the loads, so it does not establish happens-before with the writes that produced those values; it only orders subsequent reads/writes in the calling thread against prior stores by other threads. Under the C++ and Rust memory models, the two Relaxed loads may observe stale values independently of the fence.

In practice the consequence is bounded: a false-positive dirty result causes an eviction candidate to be skipped (the entry stays in the cache). False-negative dirty results (seeing the entry as clean when it is dirty) are the concern, but these are also bounded: incr_entry_gen uses AcqRel and set_policy_gen uses AcqRel, so the underlying writes are properly published. A Relaxed load on the same thread can observe any previously-seen value, so the window is the time between two Relaxed loads when one has advanced and one has not.

The correctness risk is low because is_dirty is used conservatively: false-dirty means the entry is skipped, false-clean is the unsafe direction, and the fence mitigates this by ordering subsequent accesses. The code comment at line 1684 acknowledges the heuristic nature of this check. No data corruption results from a wrong answer; at worst an entry is evicted or retained incorrectly once.

Justifies unsafe-safe and concurrency-safe.

Annotations(4)

src/cht/map/bucket.rs

src/cht/map/bucket.rs, line 1-714

use std::{
    hash::{BuildHasher, Hash},
    mem::{self, MaybeUninit},
    ptr,
    sync::{
        atomic::{self, AtomicUsize, Ordering},
        Arc, Mutex, TryLockError,
    },
};

#[cfg(feature = "unstable-debug-counters")]
use crate::common::concurrent::debug_counters;

use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared};

pub(crate) const BUCKET_ARRAY_DEFAULT_LENGTH: usize = 128;

pub(crate) struct BucketArray<K, V> {
    pub(crate) buckets: Box<[Atomic<Bucket<K, V>>]>,
    pub(crate) next: Atomic<BucketArray<K, V>>,
    pub(crate) epoch: usize,
    pub(crate) rehash_lock: Arc<Mutex<()>>,
    pub(crate) tombstone_count: AtomicUsize,
}

impl<K, V> Default for BucketArray<K, V> {
    fn default() -> Self {
        Self::with_length(0, BUCKET_ARRAY_DEFAULT_LENGTH)
    }
}

impl<K, V> BucketArray<K, V> {
    pub(crate) fn with_length(epoch: usize, length: usize) -> Self {
        assert!(length.is_power_of_two());
        let mut buckets = Vec::with_capacity(length);

        unsafe {
            ptr::write_bytes(buckets.as_mut_ptr(), 0, length);
            buckets.set_len(length);
        }

        let buckets = buckets.into_boxed_slice();

        #[cfg(feature = "unstable-debug-counters")]
        {
            use debug_counters::InternalGlobalDebugCounters as Counters;

            let size = (buckets.len() * std::mem::size_of::<Atomic<Bucket<K, V>>>()) as u64;
            Counters::bucket_array_created(size);
        }

        Self {
            buckets,
            next: Atomic::null(),
            epoch,
            rehash_lock: Arc::new(Mutex::new(())),
            tombstone_count: AtomicUsize::default(),
        }
    }

    pub(crate) fn capacity(&self) -> usize {
        assert!(self.buckets.len().is_power_of_two());

        self.buckets.len() / 2
    }
}

#[cfg(feature = "unstable-debug-counters")]
impl<K, V> Drop for BucketArray<K, V> {
    fn drop(&mut self) {
        use debug_counters::InternalGlobalDebugCounters as Counters;

        let size = (self.buckets.len() * std::mem::size_of::<Atomic<Bucket<K, V>>>()) as u64;
        Counters::bucket_array_dropped(size);
    }
}

impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
    pub(crate) fn get(
        &self,
        guard: &'g Guard,
        hash: u64,
        mut eq: impl FnMut(&K) -> bool,
    ) -> Result<Shared<'g, Bucket<K, V>>, RelocatedError> {
        for bucket in self.probe(guard, hash) {
            let Ok((_, _, this_bucket_ptr)) = bucket else {
                return Err(RelocatedError);
            };

            let Some(this_bucket_ref) = (unsafe { this_bucket_ptr.as_ref() }) else {
                // Not found.
                return Ok(Shared::null());
            };

            if !eq(&this_bucket_ref.key) {
                // Different key. Try next bucket
                continue;
            }

            if is_tombstone(this_bucket_ptr) {
                // Not found. (It has been removed)
                return Ok(Shared::null());
            } else {
                // Found.
                return Ok(this_bucket_ptr);
            }
        }

        Ok(Shared::null())
    }

    pub(crate) fn remove_if<F>(
        &self,
        guard: &'g Guard,
        hash: u64,
        mut eq: impl FnMut(&K) -> bool,
        mut condition: F,
    ) -> Result<Shared<'g, Bucket<K, V>>, F>
    where
        F: FnMut(&K, &V) -> bool,
    {
        let mut probe = self.probe(guard, hash);
        while let Some(bucket) = probe.next() {
            let Ok((_, this_bucket, this_bucket_ptr)) = bucket else {
                return Err(condition);
            };

            let Some(this_bucket_ref) = (unsafe { this_bucket_ptr.as_ref() }) else {
                // Nothing to remove.
                return Ok(Shared::null());
            };

            let this_key = &this_bucket_ref.key;

            if !eq(this_key) {
                // Different key. Try next bucket.
                continue;
            }

            if is_tombstone(this_bucket_ptr) {
                // Already removed.
                return Ok(Shared::null());
            }

            let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };

            if !condition(this_key, this_value) {
                // Found but the condition is false. Do not remove.
                return Ok(Shared::null());
            }

            // Found and the condition is true. Remove it. (Make it a tombstone)

            let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG);

            match this_bucket.compare_exchange_weak(
                this_bucket_ptr,
                new_bucket_ptr,
                Ordering::AcqRel,
                Ordering::Relaxed,
                guard,
            ) {
                // Succeeded. Return the removed value. (can be null)
                Ok(_) => return Ok(new_bucket_ptr),
                // Failed. Reload to retry.
                Err(_) => probe.reload(),
            }
        }

        Ok(Shared::null())
    }

    pub(crate) fn insert_if_not_present<F>(
        &self,
        guard: &'g Guard,
        hash: u64,
        mut state: InsertOrModifyState<K, V, F>,
    ) -> Result<InsertionResult<'g, K, V>, InsertOrModifyState<K, V, F>>
    where
        F: FnOnce() -> V,
    {
        let mut probe = self.probe(guard, hash);
        while let Some(Ok((_, this_bucket, this_bucket_ptr))) = probe.next() {
            if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
                if &this_bucket_ref.key != state.key() {
                    // Different key. Try next bucket.
                    continue;
                }

                if !is_tombstone(this_bucket_ptr) {
                    // Found. Return it.
                    return Ok(InsertionResult::AlreadyPresent(this_bucket_ptr));
                }
            }

            // Not found or found a tombstone. Insert it.

            let new_bucket = state.into_insert_bucket();

            if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak(
                this_bucket_ptr,
                new_bucket,
                Ordering::AcqRel,
                Ordering::Relaxed,
                guard,
            ) {
                state = InsertOrModifyState::from_bucket_value(new, None);
                probe.reload();
            } else if unsafe { this_bucket_ptr.as_ref() }.is_some() {
                // Inserted by replacing a tombstone.
                return Ok(InsertionResult::ReplacedTombstone(this_bucket_ptr));
            } else {
                // Inserted.
                return Ok(InsertionResult::Inserted);
            }
        }

        Err(state)
    }

    // https://rust-lang.github.io/rust-clippy/master/index.html#type_complexity
    #[allow(clippy::type_complexity)]
    pub(crate) fn insert_or_modify<F, G>(
        &self,
        guard: &'g Guard,
        hash: u64,
        mut state: InsertOrModifyState<K, V, F>,
        mut modifier: G,
    ) -> Result<Shared<'g, Bucket<K, V>>, (InsertOrModifyState<K, V, F>, G)>
    where
        F: FnOnce() -> V,
        G: FnMut(&K, &V) -> V,
    {
        let mut probe = self.probe(guard, hash);
        while let Some(bucket) = probe.next() {
            let Ok((_, this_bucket, this_bucket_ptr)) = bucket else {
                return Err((state, modifier));
            };

            let (new_bucket, maybe_insert_value) =
                if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
                    let this_key = &this_bucket_ref.key;

                    if this_key != state.key() {
                        // Different key. Try next bucket.
                        continue;
                    }

                    if is_tombstone(this_bucket_ptr) {
                        // Found a tombstone for this key. Replace it.
                        (state.into_insert_bucket(), None)
                    } else {
                        // Found. Modify it.
                        let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };
                        let new_value = modifier(this_key, this_value);

                        let (new_bucket, insert_value) = state.into_modify_bucket(new_value);

                        (new_bucket, Some(insert_value))
                    }
                } else {
                    // Not found. Insert it.
                    (state.into_insert_bucket(), None)
                };

            if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak(
                this_bucket_ptr,
                new_bucket,
                Ordering::AcqRel,
                Ordering::Relaxed,
                guard,
            ) {
                // Failed. Reload to retry.
                state = InsertOrModifyState::from_bucket_value(new, maybe_insert_value);
                probe.reload();
            } else {
                // Succeeded. Return the previous value. (can be null)
                return Ok(this_bucket_ptr);
            }
        }

        Err((state, modifier))
    }

    fn insert_for_grow(
        &self,
        guard: &'g Guard,
        hash: u64,
        bucket_ptr: Shared<'g, Bucket<K, V>>,
    ) -> Option<usize> {
        assert!(!bucket_ptr.is_null());
        assert!(!is_sentinel(bucket_ptr));
        assert!(is_borrowed(bucket_ptr));

        let key = &unsafe { bucket_ptr.deref() }.key;

        let mut probe = self.probe(guard, hash);
        while let Some(bucket) = probe.next() {
            let Ok((i, this_bucket, this_bucket_ptr)) = bucket else {
                return None;
            };

            if let Some(Bucket { key: this_key, .. }) = unsafe { this_bucket_ptr.as_ref() } {
                if this_bucket_ptr == bucket_ptr {
                    return None;
                } else if this_key != key {
                    continue;
                } else if !is_borrowed(this_bucket_ptr) {
                    return None;
                }
            }

            if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) {
                return None;
            } else if this_bucket
                .compare_exchange_weak(
                    this_bucket_ptr,
                    bucket_ptr,
                    Ordering::AcqRel,
                    Ordering::Relaxed,
                    guard,
                )
                .is_ok()
            {
                return Some(i);
            } else {
                probe.reload();
            }
        }

        None
    }

    pub(crate) fn keys<F, T>(
        &self,
        guard: &'g Guard,
        with_key: &mut F,
    ) -> Result<Vec<T>, RelocatedError>
    where
        F: FnMut(&K) -> T,
    {
        let mut keys = Vec::new();

        for bucket in self.buckets.iter() {
            let bucket_ptr = bucket.load_consume(guard);

            if is_sentinel(bucket_ptr) {
                return Err(RelocatedError);
            }

            if let Some(bucket_ref) = unsafe { bucket_ptr.as_ref() } {
                if !is_tombstone(bucket_ptr) {
                    keys.push(with_key(&bucket_ref.key));
                }
            }
        }

        Ok(keys)
    }
}

struct Probe<'b, 'g, K: 'g, V: 'g> {
    buckets: &'b [Atomic<Bucket<K, V>>],
    guard: &'g Guard,
    this_bucket: (usize, &'b Atomic<Bucket<K, V>>),
    offset: usize,

    i: usize,
    reload: bool,
}

impl<'g, K: 'g, V: 'g> Probe<'_, 'g, K, V> {
    fn reload(&mut self) {
        self.reload = true;
    }
}

impl<'b, 'g, K: 'g, V: 'g> Iterator for Probe<'b, 'g, K, V> {
    type Item = Result<(usize, &'b Atomic<Bucket<K, V>>, Shared<'g, Bucket<K, V>>), ()>;

    fn next(&mut self) -> Option<Self::Item> {
        if !self.reload {
            let max = self.buckets.len() - 1;
            if self.i >= max {
                return None;
            }
            self.i += 1;
            let i = self.i.wrapping_add(self.offset) & max;
            self.this_bucket = (i, &self.buckets[i]);
        }
        self.reload = false;

        let this_bucket_ptr = self.this_bucket.1.load_consume(self.guard);

        if is_sentinel(this_bucket_ptr) {
            return Some(Err(()));
        }

        let val = (self.this_bucket.0, self.this_bucket.1, this_bucket_ptr);
        Some(Ok(val))
    }
}

impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
    fn probe(&self, guard: &'g Guard, hash: u64) -> Probe<'_, 'g, K, V> {
        let buckets = &self.buckets;
        let offset = hash as usize & (buckets.len() - 1);
        // SAFETY: `len()` is never be 0 so this index access will never panic.
        // This invariant is ensured by the `assert!()` at the beginning of
        // `with_length()` because 0 is not a power of two.
        let this_bucket = (offset, &buckets[offset]);
        Probe {
            buckets,
            guard,
            this_bucket,
            offset,

            i: 0,
            reload: true,
        }
    }

    pub(crate) fn rehash<H>(
        &self,
        guard: &'g Guard,
        build_hasher: &H,
        rehash_op: RehashOp,
    ) -> Option<&'g BucketArray<K, V>>
    where
        K: Hash + Eq,
        H: BuildHasher,
    {
        // Ensure that the rehashing is not performed concurrently.
        let lock = match self.rehash_lock.try_lock() {
            Ok(lk) => lk,
            Err(TryLockError::WouldBlock) => {
                // Wait until the lock become available.
                std::mem::drop(self.rehash_lock.lock());
                // We need to return here to see if rehashing is still needed.
                return None;
            }
            Err(e @ TryLockError::Poisoned(_)) => panic!("{e:?}"),
        };

        let next_array = self.next_array(guard, rehash_op);

        for this_bucket in self.buckets.iter() {
            let mut maybe_state: Option<(usize, Shared<'g, Bucket<K, V>>)> = None;

            loop {
                let this_bucket_ptr = this_bucket.load_consume(guard);

                if is_sentinel(this_bucket_ptr) {
                    break;
                }

                let to_put_ptr = this_bucket_ptr.with_tag(this_bucket_ptr.tag() | BORROWED_TAG);

                if let Some((index, mut next_bucket_ptr)) = maybe_state {
                    assert!(!this_bucket_ptr.is_null());

                    let next_bucket = &next_array.buckets[index];

                    while is_borrowed(next_bucket_ptr)
                        && next_bucket
                            .compare_exchange_weak(
                                next_bucket_ptr,
                                to_put_ptr,
                                Ordering::AcqRel,
                                Ordering::Relaxed,
                                guard,
                            )
                            .is_err()
                    {
                        next_bucket_ptr = next_bucket.load_consume(guard);
                    }
                } else if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
                    let key = &this_bucket_ref.key;
                    let hash = hash(build_hasher, key);

                    if let Some(index) = next_array.insert_for_grow(guard, hash, to_put_ptr) {
                        maybe_state = Some((index, to_put_ptr));
                    }
                }

                if this_bucket
                    .compare_exchange_weak(
                        this_bucket_ptr,
                        Shared::null().with_tag(SENTINEL_TAG),
                        Ordering::AcqRel,
                        Ordering::Relaxed,
                        guard,
                    )
                    .is_ok()
                {
                    // TODO: If else, we may need to count tombstone.
                    if !this_bucket_ptr.is_null()
                        && is_tombstone(this_bucket_ptr)
                        && maybe_state.is_none()
                    {
                        unsafe { defer_destroy_bucket(guard, this_bucket_ptr) };
                    }

                    break;
                }
            }
        }

        guard.flush();
        std::mem::drop(lock);

        Some(next_array)
    }

    fn next_array(&self, guard: &'g Guard, rehash_op: RehashOp) -> &'g BucketArray<K, V> {
        let mut maybe_new_next = None;

        loop {
            let next_ptr = self.next.load_consume(guard);

            if let Some(next_ref) = unsafe { next_ptr.as_ref() } {
                return next_ref;
            }

            let new_length = rehash_op.new_len(self.buckets.len());
            let new_next = maybe_new_next.unwrap_or_else(|| {
                Owned::new(BucketArray::with_length(self.epoch + 1, new_length))
            });

            match self.next.compare_exchange_weak(
                Shared::null(),
                new_next,
                Ordering::AcqRel,
                Ordering::Relaxed,
                guard,
            ) {
                Ok(p) => return unsafe { p.deref() },
                Err(CompareExchangeError { new, .. }) => {
                    maybe_new_next = Some(new);
                }
            }
        }
    }
}

#[repr(align(8))]
#[derive(Debug)]
pub(crate) struct Bucket<K, V> {
    pub(crate) key: K,
    pub(crate) maybe_value: MaybeUninit<V>,
}

impl<K, V> Bucket<K, V> {
    pub(crate) fn new(key: K, value: V) -> Bucket<K, V> {
        #[cfg(feature = "unstable-debug-counters")]
        debug_counters::InternalGlobalDebugCounters::bucket_created();

        Self {
            key,
            maybe_value: MaybeUninit::new(value),
        }
    }
}

#[cfg(feature = "unstable-debug-counters")]
impl<K, V> Drop for Bucket<K, V> {
    fn drop(&mut self) {
        debug_counters::InternalGlobalDebugCounters::bucket_dropped();
    }
}

#[derive(Debug, Eq, PartialEq)]
pub(crate) struct RelocatedError;

pub(crate) enum InsertOrModifyState<K, V, F: FnOnce() -> V> {
    New(K, F),
    AttemptedInsertion(Owned<Bucket<K, V>>),
    AttemptedModification(Owned<Bucket<K, V>>, ValueOrFunction<V, F>),
}

impl<K, V, F: FnOnce() -> V> InsertOrModifyState<K, V, F> {
    fn from_bucket_value(
        bucket: Owned<Bucket<K, V>>,
        value_or_function: Option<ValueOrFunction<V, F>>,
    ) -> Self {
        if let Some(value_or_function) = value_or_function {
            Self::AttemptedModification(bucket, value_or_function)
        } else {
            Self::AttemptedInsertion(bucket)
        }
    }

    fn key(&self) -> &K {
        match self {
            InsertOrModifyState::New(k, _) => k,
            InsertOrModifyState::AttemptedInsertion(b)
            | InsertOrModifyState::AttemptedModification(b, _) => &b.key,
        }
    }

    fn into_insert_bucket(self) -> Owned<Bucket<K, V>> {
        match self {
            InsertOrModifyState::New(k, f) => Owned::new(Bucket::new(k, f())),
            InsertOrModifyState::AttemptedInsertion(b) => b,
            InsertOrModifyState::AttemptedModification(mut b, v_or_f) => {
                unsafe {
                    mem::drop(
                        mem::replace(&mut b.maybe_value, MaybeUninit::new(v_or_f.into_value()))
                            .assume_init(),
                    );
                };

                b
            }
        }
    }

    fn into_modify_bucket(self, value: V) -> (Owned<Bucket<K, V>>, ValueOrFunction<V, F>) {
        match self {
            InsertOrModifyState::New(k, f) => (
                Owned::new(Bucket::new(k, value)),
                ValueOrFunction::Function(f),
            ),
            InsertOrModifyState::AttemptedInsertion(mut b) => {
                let insert_value = unsafe {
                    mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init()
                };

                (b, ValueOrFunction::Value(insert_value))
            }
            InsertOrModifyState::AttemptedModification(mut b, v_or_f) => {
                unsafe {
                    mem::drop(
                        mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init(),
                    );
                }

                (b, v_or_f)
            }
        }
    }
}

pub(crate) enum ValueOrFunction<V, F: FnOnce() -> V> {
    Value(V),
    Function(F),
}

impl<V, F: FnOnce() -> V> ValueOrFunction<V, F> {
    fn into_value(self) -> V {
        match self {
            ValueOrFunction::Value(v) => v,
            ValueOrFunction::Function(f) => f(),
        }
    }
}

pub(crate) fn hash<K, H>(build_hasher: &H, key: &K) -> u64
where
    K: ?Sized + Hash,
    H: BuildHasher,
{
    build_hasher.hash_one(key)
}

pub(crate) enum InsertionResult<'g, K, V> {
    AlreadyPresent(Shared<'g, Bucket<K, V>>),
    Inserted,
    ReplacedTombstone(Shared<'g, Bucket<K, V>>),
}

pub(crate) unsafe fn defer_destroy_bucket<'g, K, V>(
    guard: &'g Guard,
    mut ptr: Shared<'g, Bucket<K, V>>,
) {
    assert!(!ptr.is_null());

    guard.defer_unchecked(move || {
        atomic::fence(Ordering::Acquire);

        if !is_tombstone(ptr) {
            ptr::drop_in_place(ptr.deref_mut().maybe_value.as_mut_ptr());
        }

        mem::drop(ptr.into_owned());
    });
}

pub(crate) unsafe fn defer_destroy_tombstone<'g, K, V>(
    guard: &'g Guard,
    mut ptr: Shared<'g, Bucket<K, V>>,
) {
    assert!(!ptr.is_null());
    assert!(is_tombstone(ptr));

    atomic::fence(Ordering::Acquire);
    // read the value now, but defer its destruction for later
    let value = ptr::read(ptr.deref_mut().maybe_value.as_ptr());

    // to be entirely honest, i don't know what order deferred functions are
    // called in crossbeam-epoch. in the case that the deferred functions are
    // called out of order, this prevents that from being an issue.
    guard.defer_unchecked(move || mem::drop(value));
}

pub(crate) unsafe fn defer_acquire_destroy<'g, T>(guard: &'g Guard, ptr: Shared<'g, T>) {
    assert!(!ptr.is_null());

    guard.defer_unchecked(move || {
        atomic::fence(Ordering::Acquire);
        mem::drop(ptr.into_owned());
    });
}

Lock-free concurrent hash table using crossbeam_epoch for safe deferred reclamation. Buckets are tagged with three bit-flags (SENTINEL=0x1, TOMBSTONE=0x2, BORROWED=0x4) encoded in the low bits of the Shared<Bucket> pointer, which is safe because Bucket<K,V> carries #[repr(align(8))]. All compare-exchange operations use AcqRel/Relaxed success/failure ordering. defer_destroy_bucket drops the value with atomic::fence(Acquire) before dropping the box, ensuring visibility of all writes prior to the CAS that installed the tombstone. Rehashing acquires a per-array Mutex to prevent concurrent rehash attempts while allowing concurrent reads. Justifies uses-unsafe, unsafe-safe, uses-concurrency, concurrency-safe.

src/common/concurrent/arc.rs

src/common/concurrent/arc.rs, line 1-150

// This module's source code was written by us, the `moka` developers, referring to
// the following book and code:
//
// - Chapter 6. Building Our Own "Arc" of the Rust Atomics and Locks book.
//     - Rust Atomics and Locks by Mara Bos (O’Reilly). Copyright 2023 Mara Bos,
//       ISBN: 978-1-098-11944-7
//     - https://marabos.nl/atomics/
// - The `triomphe` crate v0.1.13 and v0.1.11 by Manish Goregaokar (Manishearth)
//     - MIT or Apache-2.0 License
//     - https://github.com/Manishearth/triomphe
// - `std::sync::Arc` in the Rust Standard Library (1.81.0).
//     -  MIT or Apache-2.0 License

use std::{
    fmt,
    hash::{Hash, Hasher},
    ops::Deref,
    ptr::NonNull,
};

#[cfg(not(moka_loom))]
use std::sync::atomic::{self, AtomicU32};

#[cfg(moka_loom)]
use loom::sync::atomic::{self, AtomicU32};

/// A thread-safe reference-counting pointer. `MiniArc` is similar to
/// `std::sync::Arc`, Atomically Reference Counted shared pointer, but with a few
/// differences:
///
/// - Smaller memory overhead:
///     - `MiniArc` does not support weak references, so it does not need to store a
///       weak reference count.
///     - `MiniArc` uses `AtomicU32` for the reference count, while `std::sync::Arc`
///       uses `AtomicUsize`. On a 64-bit system, `AtomicU32` is half the size of
///       `AtomicUsize`.
///         - Note: Depending on the value type `T`, the Rust compiler may add
///           padding to the internal struct of `MiniArc<T>`, so the actual memory
///           overhead may vary.
/// - Smaller code size:
///     - Only about 100 lines of code.
///         - This is because `MiniArc` provides only the methods needed for the
///           `moka` and `mini-moka` crates.
///     - Smaller code size means less chance of bugs.
pub(crate) struct MiniArc<T: ?Sized> {
    ptr: NonNull<ArcData<T>>,
}

struct ArcData<T: ?Sized> {
    ref_count: AtomicU32,
    data: T,
}

/// A soft limit on the amount of references that may be made to an `MiniArc`.
///
/// Going above this limit will abort your program (although not necessarily)
/// at _exactly_ `MAX_REFCOUNT + 1` references.
const MAX_REFCOUNT: u32 = (i32::MAX) as u32;

unsafe impl<T: ?Sized + Send + Sync> Send for MiniArc<T> {}
unsafe impl<T: ?Sized + Send + Sync> Sync for MiniArc<T> {}

impl<T> MiniArc<T> {
    pub(crate) fn new(data: T) -> MiniArc<T> {
        MiniArc {
            ptr: NonNull::from(Box::leak(Box::new(ArcData {
                ref_count: AtomicU32::new(1),
                data,
            }))),
        }
    }
}

impl<T: ?Sized> MiniArc<T> {
    /// Gets the number of [`MiniArc`] pointers to this allocation
    pub(crate) fn count(this: &Self) -> u32 {
        use atomic::Ordering::Acquire;

        this.data().ref_count.load(Acquire)
    }

    /// Returns `true` if the two `MiniArc`s point to the same allocation in a
    /// vein similar to [`ptr::eq`].
    ///
    /// # Safety
    ///
    /// This function is unreliable when `T` is a `dyn Trait`. Currently
    /// coercing `MiniArc<SomeTime>` to `MiniArc<dyn Trait>` is not possible, so
    /// this is not a problem in practice. However, if this coercion becomes
    /// possible in the future, this function may return incorrect results when
    /// comparing `MiniArc<dyn Trait>` instances.
    ///
    /// To fix this, we must rise the minimum supported Rust version (MSRV) to
    /// 1.76 and use `std::ptr::addr_eq` internally instead of `eq` (`==`).
    /// `addr_eq` compares the _addresses_ of the pointers for equality,
    /// ignoring any metadata in fat pointers.
    ///
    /// See the following `triomphe` issue for more information:
    /// https://github.com/Manishearth/triomphe/pull/84
    ///
    /// Note that `triomphe` has a feature called `unsize`, which enables the
    /// coercion by using the `unsize` crate. `MiniArc` does not have such a
    /// feature, so we are safe for now.
    #[inline]
    #[allow(ambiguous_wide_pointer_comparisons)] // Remove this when MSRV is 1.76 or newer.
    #[allow(clippy::ptr_eq)] // Remove this when MSRV is 1.76 or newer.
    pub(crate) fn ptr_eq(this: &Self, other: &Self) -> bool {
        // `addr_eq` requires Rust 1.76 or newer.
        // ptr::addr_eq(this.ptr.as_ptr(), other.ptr.as_ptr())
        this.ptr.as_ptr() == other.ptr.as_ptr()
    }

    #[inline]
    fn data(&self) -> &ArcData<T> {
        unsafe { self.ptr.as_ref() }
    }
}

impl<T: ?Sized> Deref for MiniArc<T> {
    type Target = T;

    fn deref(&self) -> &T {
        &self.data().data
    }
}

impl<T: ?Sized> Clone for MiniArc<T> {
    fn clone(&self) -> Self {
        use atomic::Ordering::Relaxed;

        if self.data().ref_count.fetch_add(1, Relaxed) > MAX_REFCOUNT {
            std::process::abort();
        }

        MiniArc { ptr: self.ptr }
    }
}

impl<T: ?Sized> Drop for MiniArc<T> {
    fn drop(&mut self) {
        use std::sync::atomic::Ordering::{Acquire, Release};

        if self.data().ref_count.fetch_sub(1, Release) == 1 {
            atomic::fence(Acquire);
            unsafe {
                drop(Box::from_raw(self.ptr.as_ptr()));
            }
        }
    }
}

Custom reference-counted pointer modelled on std::sync::Arc but without weak-reference support, using AtomicU32 for the reference count. Clone increments with Relaxed ordering and aborts if count exceeds i32::MAX. Drop decrements with Release, and the last dropper inserts an Acquire fence before deallocating, matching the standard Arc pattern from "Rust Atomics and Locks". Send and Sync are implemented with the same bounds as std::sync::Arc (T: Send + Sync). The code cites its sources explicitly (Mara Bos's book, triomphe, std Arc). A loom model is included. Justifies uses-unsafe, unsafe-safe, concurrency-safe.

src/common/concurrent/deques.rs

src/common/concurrent/deques.rs, line 18-23

// TODO: https://github.com/moka-rs/moka/issues/54
#[allow(clippy::non_send_fields_in_send_ty)]
// Multi-threaded async runtimes require base_cache::Inner to be Send, but it will
// not be without this `unsafe impl`. This is because DeqNodes have NonNull
// pointers.
unsafe impl<K> Send for Deques<K> {}

Manual unsafe impl Send for Deques, DeqNodes, TimerWheel, Iter<K,V>, and both Cache<K,V,S> types. These structs contain NonNull raw pointers, which are not Send by default. Access to Deques, DeqNodes, and TimerWheel is serialized by the housekeeper mutex in base_cache::Inner, so shared mutable access across threads cannot occur. Cache Send/Sync impls propagate the standard K: Send+Sync, V: Send+Sync bounds that Arc-based caches require. Justifies concurrency-safe, concurrency-documented, concurrency-impl-documented.

src/common/deque.rs

src/common/deque.rs, line 1-344

// License and Copyright Notice:
//
// Some of the code and doc comments in this module were copied from
// `std::collections::LinkedList` in the Rust standard library.
// https://github.com/rust-lang/rust/blob/master/src/liballoc/collections/linked_list.rs
//
// The original code/comments from LinkedList are dual-licensed under
// the Apache License, Version 2.0 <https://github.com/rust-lang/rust/blob/master/LICENSE-APACHE>
// or the MIT license <https://github.com/rust-lang/rust/blob/master/LICENSE-MIT>
//
// Copyrights of the original code/comments are retained by their contributors.
// For full authorship information, see the version control history of
// https://github.com/rust-lang/rust/ or https://thanks.rust-lang.org

use std::{marker::PhantomData, ptr::NonNull};

use super::CacheRegion;

#[cfg(feature = "unstable-debug-counters")]
use crate::common::concurrent::debug_counters;

// `crate::{sync,unsync}::DeqNodes` uses a `tagptr::TagNonNull<DeqNode<T>, 2>`
// pointer. To reserve the space for the 2-bit tag, use 4 bytes as the *minimum*
// alignment.
// https://doc.rust-lang.org/reference/type-layout.html#the-alignment-modifiers
#[repr(align(4))]
#[derive(PartialEq, Eq)]
pub(crate) struct DeqNode<T> {
    next: Option<NonNull<DeqNode<T>>>,
    prev: Option<NonNull<DeqNode<T>>>,
    pub(crate) element: T,
}

impl<T> std::fmt::Debug for DeqNode<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("DeqNode")
            .field("next", &self.next)
            .field("prev", &self.prev)
            .finish()
    }
}

impl<T> DeqNode<T> {
    pub(crate) fn new(element: T) -> Self {
        #[cfg(feature = "unstable-debug-counters")]
        debug_counters::InternalGlobalDebugCounters::deq_node_created();

        Self {
            next: None,
            prev: None,
            element,
        }
    }

    pub(crate) fn next_node_ptr(this: NonNull<Self>) -> Option<NonNull<DeqNode<T>>> {
        unsafe { this.as_ref() }.next
    }
}

#[cfg(feature = "unstable-debug-counters")]
impl<T> Drop for DeqNode<T> {
    fn drop(&mut self) {
        debug_counters::InternalGlobalDebugCounters::deq_node_dropped();
    }
}

/// Cursor is used to remember the current iterating position.
enum DeqCursor<T> {
    Node(NonNull<DeqNode<T>>),
    Done,
}

pub(crate) struct Deque<T> {
    region: CacheRegion,
    len: usize,
    head: Option<NonNull<DeqNode<T>>>,
    tail: Option<NonNull<DeqNode<T>>>,
    cursor: Option<DeqCursor<T>>,
    marker: PhantomData<Box<DeqNode<T>>>,
}

impl<T> Drop for Deque<T> {
    fn drop(&mut self) {
        struct DropGuard<'a, T>(&'a mut Deque<T>);

        impl<T> Drop for DropGuard<'_, T> {
            fn drop(&mut self) {
                // Continue the same loop we do below. This only runs when a destructor has
                // panicked. If another one panics this will abort.
                while self.0.pop_front().is_some() {}
            }
        }

        while let Some(node) = self.pop_front() {
            let guard = DropGuard(self);
            drop(node);
            std::mem::forget(guard);
        }
    }
}

// Inner crate public function/methods
impl<T> Deque<T> {
    pub(crate) fn new(region: CacheRegion) -> Self {
        Self {
            region,
            len: 0,
            head: None,
            tail: None,
            cursor: None,
            marker: PhantomData,
        }
    }

    pub(crate) fn region(&self) -> CacheRegion {
        self.region
    }

    pub(crate) fn len(&self) -> usize {
        self.len
    }

    pub(crate) fn contains(&self, node: &DeqNode<T>) -> bool {
        node.prev.is_some() || self.is_head(node)
    }

    pub(crate) fn peek_front(&self) -> Option<&DeqNode<T>> {
        self.head.as_ref().map(|node| unsafe { node.as_ref() })
    }

    pub(crate) fn peek_front_ptr(&self) -> Option<NonNull<DeqNode<T>>> {
        self.head.as_ref().copied()
    }

    /// Removes and returns the node at the front of the list.
    pub(crate) fn pop_front(&mut self) -> Option<Box<DeqNode<T>>> {
        // This method takes care not to create mutable references to whole nodes,
        // to maintain validity of aliasing pointers into `element`.
        self.head.map(|node| unsafe {
            if self.is_at_cursor(node.as_ref()) {
                self.advance_cursor();
            }

            let mut node = Box::from_raw(node.as_ptr());
            self.head = node.next;

            match self.head {
                None => self.tail = None,
                // Not creating new mutable (unique!) references overlapping `element`.
                Some(head) => (*head.as_ptr()).prev = None,
            }

            self.len -= 1;

            node.prev = None;
            node.next = None;
            node
        })
    }

    pub(crate) fn peek_back(&self) -> Option<&DeqNode<T>> {
        self.tail.as_ref().map(|node| unsafe { node.as_ref() })
    }

    /// Adds the given node to the back of the list.
    pub(crate) fn push_back(&mut self, mut node: Box<DeqNode<T>>) -> NonNull<DeqNode<T>> {
        // This method takes care not to create mutable references to whole nodes,
        // to maintain validity of aliasing pointers into `element`.
        unsafe {
            node.next = None;
            node.prev = self.tail;
            let node = NonNull::new(Box::into_raw(node)).expect("Got a null ptr");

            match self.tail {
                None => self.head = Some(node),
                // Not creating new mutable (unique!) references overlapping `element`.
                Some(tail) => (*tail.as_ptr()).next = Some(node),
            }

            self.tail = Some(node);
            self.len += 1;
            node
        }
    }

    pub(crate) unsafe fn move_to_back(&mut self, mut node: NonNull<DeqNode<T>>) {
        if self.is_tail(node.as_ref()) {
            // Already at the tail. Nothing to do.
            return;
        }

        if self.is_at_cursor(node.as_ref()) {
            self.advance_cursor();
        }

        // Extract the prev and next pointers before we start modifying the node
        let (prev, next) = {
            let node_ref = node.as_ref();
            (node_ref.prev, node_ref.next)
        };

        // Not creating new mutable (unique!) references overlapping `element`.
        match prev {
            Some(prev_node) if next.is_some() => (*prev_node.as_ptr()).next = next,
            Some(..) => (),
            // This node is the head node.
            None => self.head = next,
        };

        // This node is not the tail node.
        if let Some(next_node) = next {
            (*next_node.as_ptr()).prev = prev;

            // Update the node's pointers directly without creating conflicting references
            let node_mut = node.as_mut();
            node_mut.prev = self.tail;
            node_mut.next = None;

            match self.tail {
                // Not creating new mutable (unique!) references overlapping `element`.
                Some(tail) => (*tail.as_ptr()).next = Some(node),
                None => unreachable!(),
            }
            self.tail = Some(node);
        }
    }

    pub(crate) fn move_front_to_back(&mut self) {
        if let Some(node) = self.head {
            unsafe { self.move_to_back(node) };
        }
    }

    /// Unlinks the specified node from the current list.
    ///
    /// This method takes care not to create mutable references to `element`, to
    /// maintain validity of aliasing pointers.
    ///
    /// IMPORTANT: This method does not drop the node. If the node is no longer
    /// needed, use `unlink_and_drop` instead, or drop it at the caller side.
    /// Otherwise, the node will leak.
    pub(crate) unsafe fn unlink(&mut self, mut node: NonNull<DeqNode<T>>) {
        if self.is_at_cursor(node.as_ref()) {
            self.advance_cursor();
        }

        let node = node.as_mut(); // this one is ours now, we can create an &mut.

        // Not creating new mutable (unique!) references overlapping `element`.
        match node.prev {
            Some(prev) => (*prev.as_ptr()).next = node.next,
            // this node is the head node
            None => self.head = node.next,
        };

        match node.next {
            Some(next) => (*next.as_ptr()).prev = node.prev,
            // this node is the tail node
            None => self.tail = node.prev,
        };

        node.prev = None;
        node.next = None;

        self.len -= 1;
    }

    /// Unlinks the specified node from the current list, and then drop the node.
    ///
    /// This method takes care not to create mutable references to `element`, to
    /// maintain validity of aliasing pointers.
    ///
    /// Panics:
    pub(crate) unsafe fn unlink_and_drop(&mut self, node: NonNull<DeqNode<T>>) {
        self.unlink(node);
        std::mem::drop(Box::from_raw(node.as_ptr()));
    }

    pub(crate) fn reset_cursor(&mut self) {
        self.cursor = None;
    }
}

impl<'a, T> Iterator for &'a mut Deque<T> {
    type Item = &'a T;

    fn next(&mut self) -> Option<Self::Item> {
        if self.cursor.is_none() {
            if let Some(head) = self.head {
                self.cursor = Some(DeqCursor::Node(head));
            }
        }
        let elem = if let Some(DeqCursor::Node(node)) = self.cursor {
            unsafe { Some(&(*node.as_ptr()).element) }
        } else {
            None
        };
        self.advance_cursor();
        elem
    }
}

// Private function/methods
impl<T> Deque<T> {
    fn is_head(&self, node: &DeqNode<T>) -> bool {
        if let Some(head) = self.head {
            std::ptr::eq(unsafe { head.as_ref() }, node)
        } else {
            false
        }
    }

    fn is_tail(&self, node: &DeqNode<T>) -> bool {
        if let Some(tail) = self.tail {
            std::ptr::eq(unsafe { tail.as_ref() }, node)
        } else {
            false
        }
    }

    fn is_at_cursor(&self, node: &DeqNode<T>) -> bool {
        if let Some(DeqCursor::Node(cur_node)) = self.cursor {
            std::ptr::eq(unsafe { cur_node.as_ref() }, node)
        } else {
            false
        }
    }

    fn advance_cursor(&mut self) {
        match self.cursor.take() {
            None => (),
            Some(DeqCursor::Node(node)) => unsafe {
                if let Some(next) = (*node.as_ptr()).next {
                    self.cursor = Some(DeqCursor::Node(next));
                } else {
                    self.cursor = Some(DeqCursor::Done);
                }
            },
            Some(DeqCursor::Done) => {
                self.cursor = None;
            }
        }
    }
}

Intrusive doubly-linked deque with raw NonNull<DeqNode<T>> pointers. All unsafe calls (move_to_back, unlink, unlink_and_drop, pop_front) are pub(crate) unsafe fn or callers pass NonNull obtained from push_back, which returns a valid, heap-allocated pointer. The deque is always accessed under a parking_lot::Mutex at higher call sites (Deques<K>, TimerWheel<K>), so no concurrent mutation occurs. Drop is implemented with a DropGuard pattern that ensures cleanup even if an element's destructor panics. Justifies uses-unsafe and unsafe-safe.