cargo / moka / audit
cargo : moka @ 0.12.15
PE Patrick Elsen signed 2026-05-28 published 2026-05-28

src/cht/map/bucket.rs

917 lines · rust · 1 line annotation

use std::{    hash::{BuildHasher, Hash},    mem::{self, MaybeUninit},    ptr,    sync::{        atomic::{self, AtomicUsize, Ordering},        Arc, Mutex, TryLockError,    },};#[cfg(feature = "unstable-debug-counters")]use crate::common::concurrent::debug_counters;use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared};pub(crate) const BUCKET_ARRAY_DEFAULT_LENGTH: usize = 128;pub(crate) struct BucketArray<K, V> {    pub(crate) buckets: Box<[Atomic<Bucket<K, V>>]>,    pub(crate) next: Atomic<BucketArray<K, V>>,    pub(crate) epoch: usize,    pub(crate) rehash_lock: Arc<Mutex<()>>,    pub(crate) tombstone_count: AtomicUsize,}impl<K, V> Default for BucketArray<K, V> {    fn default() -> Self {        Self::with_length(0, BUCKET_ARRAY_DEFAULT_LENGTH)    }}impl<K, V> BucketArray<K, V> {    pub(crate) fn with_length(epoch: usize, length: usize) -> Self {        assert!(length.is_power_of_two());        let mut buckets = Vec::with_capacity(length);        unsafe {            ptr::write_bytes(buckets.as_mut_ptr(), 0, length);            buckets.set_len(length);        }        let buckets = buckets.into_boxed_slice();        #[cfg(feature = "unstable-debug-counters")]        {            use debug_counters::InternalGlobalDebugCounters as Counters;            let size = (buckets.len() * std::mem::size_of::<Atomic<Bucket<K, V>>>()) as u64;            Counters::bucket_array_created(size);        }        Self {            buckets,            next: Atomic::null(),            epoch,            rehash_lock: Arc::new(Mutex::new(())),            tombstone_count: AtomicUsize::default(),        }    }    pub(crate) fn capacity(&self) -> usize {        assert!(self.buckets.len().is_power_of_two());        self.buckets.len() / 2    }}#[cfg(feature = "unstable-debug-counters")]impl<K, V> Drop for BucketArray<K, V> {    fn drop(&mut self) {        use debug_counters::InternalGlobalDebugCounters as Counters;        let size = (self.buckets.len() * std::mem::size_of::<Atomic<Bucket<K, V>>>()) as u64;        Counters::bucket_array_dropped(size);    }}impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {    pub(crate) fn get(        &self,        guard: &'g Guard,        hash: u64,        mut eq: impl FnMut(&K) -> bool,    ) -> Result<Shared<'g, Bucket<K, V>>, RelocatedError> {        for bucket in self.probe(guard, hash) {            let Ok((_, _, this_bucket_ptr)) = bucket else {                return Err(RelocatedError);            };            let Some(this_bucket_ref) = (unsafe { this_bucket_ptr.as_ref() }) else {                // Not found.                return Ok(Shared::null());            };            if !eq(&this_bucket_ref.key) {                // Different key. Try next bucket                continue;            }            if is_tombstone(this_bucket_ptr) {                // Not found. (It has been removed)                return Ok(Shared::null());            } else {                // Found.                return Ok(this_bucket_ptr);            }        }        Ok(Shared::null())    }    pub(crate) fn remove_if<F>(        &self,        guard: &'g Guard,        hash: u64,        mut eq: impl FnMut(&K) -> bool,        mut condition: F,    ) -> Result<Shared<'g, Bucket<K, V>>, F>    where        F: FnMut(&K, &V) -> bool,    {        let mut probe = self.probe(guard, hash);        while let Some(bucket) = probe.next() {            let Ok((_, this_bucket, this_bucket_ptr)) = bucket else {                return Err(condition);            };            let Some(this_bucket_ref) = (unsafe { this_bucket_ptr.as_ref() }) else {                // Nothing to remove.                return Ok(Shared::null());            };            let this_key = &this_bucket_ref.key;            if !eq(this_key) {                // Different key. Try next bucket.                continue;            }            if is_tombstone(this_bucket_ptr) {                // Already removed.                return Ok(Shared::null());            }            let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };            if !condition(this_key, this_value) {                // Found but the condition is false. Do not remove.                return Ok(Shared::null());            }            // Found and the condition is true. Remove it. (Make it a tombstone)            let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG);            match this_bucket.compare_exchange_weak(                this_bucket_ptr,                new_bucket_ptr,                Ordering::AcqRel,                Ordering::Relaxed,                guard,            ) {                // Succeeded. Return the removed value. (can be null)                Ok(_) => return Ok(new_bucket_ptr),                // Failed. Reload to retry.                Err(_) => probe.reload(),            }        }        Ok(Shared::null())    }    pub(crate) fn insert_if_not_present<F>(        &self,        guard: &'g Guard,        hash: u64,        mut state: InsertOrModifyState<K, V, F>,    ) -> Result<InsertionResult<'g, K, V>, InsertOrModifyState<K, V, F>>    where        F: FnOnce() -> V,    {        let mut probe = self.probe(guard, hash);        while let Some(Ok((_, this_bucket, this_bucket_ptr))) = probe.next() {            if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {                if &this_bucket_ref.key != state.key() {                    // Different key. Try next bucket.                    continue;                }                if !is_tombstone(this_bucket_ptr) {                    // Found. Return it.                    return Ok(InsertionResult::AlreadyPresent(this_bucket_ptr));                }            }            // Not found or found a tombstone. Insert it.            let new_bucket = state.into_insert_bucket();            if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak(                this_bucket_ptr,                new_bucket,                Ordering::AcqRel,                Ordering::Relaxed,                guard,            ) {                state = InsertOrModifyState::from_bucket_value(new, None);                probe.reload();            } else if unsafe { this_bucket_ptr.as_ref() }.is_some() {                // Inserted by replacing a tombstone.                return Ok(InsertionResult::ReplacedTombstone(this_bucket_ptr));            } else {                // Inserted.                return Ok(InsertionResult::Inserted);            }        }        Err(state)    }    // https://rust-lang.github.io/rust-clippy/master/index.html#type_complexity    #[allow(clippy::type_complexity)]    pub(crate) fn insert_or_modify<F, G>(        &self,        guard: &'g Guard,        hash: u64,        mut state: InsertOrModifyState<K, V, F>,        mut modifier: G,    ) -> Result<Shared<'g, Bucket<K, V>>, (InsertOrModifyState<K, V, F>, G)>    where        F: FnOnce() -> V,        G: FnMut(&K, &V) -> V,    {        let mut probe = self.probe(guard, hash);        while let Some(bucket) = probe.next() {            let Ok((_, this_bucket, this_bucket_ptr)) = bucket else {                return Err((state, modifier));            };            let (new_bucket, maybe_insert_value) =                if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {                    let this_key = &this_bucket_ref.key;                    if this_key != state.key() {                        // Different key. Try next bucket.                        continue;                    }                    if is_tombstone(this_bucket_ptr) {                        // Found a tombstone for this key. Replace it.                        (state.into_insert_bucket(), None)                    } else {                        // Found. Modify it.                        let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };                        let new_value = modifier(this_key, this_value);                        let (new_bucket, insert_value) = state.into_modify_bucket(new_value);                        (new_bucket, Some(insert_value))                    }                } else {                    // Not found. Insert it.                    (state.into_insert_bucket(), None)                };            if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak(                this_bucket_ptr,                new_bucket,                Ordering::AcqRel,                Ordering::Relaxed,                guard,            ) {                // Failed. Reload to retry.                state = InsertOrModifyState::from_bucket_value(new, maybe_insert_value);                probe.reload();            } else {                // Succeeded. Return the previous value. (can be null)                return Ok(this_bucket_ptr);            }        }        Err((state, modifier))    }    fn insert_for_grow(        &self,        guard: &'g Guard,        hash: u64,        bucket_ptr: Shared<'g, Bucket<K, V>>,    ) -> Option<usize> {        assert!(!bucket_ptr.is_null());        assert!(!is_sentinel(bucket_ptr));        assert!(is_borrowed(bucket_ptr));        let key = &unsafe { bucket_ptr.deref() }.key;        let mut probe = self.probe(guard, hash);        while let Some(bucket) = probe.next() {            let Ok((i, this_bucket, this_bucket_ptr)) = bucket else {                return None;            };            if let Some(Bucket { key: this_key, .. }) = unsafe { this_bucket_ptr.as_ref() } {                if this_bucket_ptr == bucket_ptr {                    return None;                } else if this_key != key {                    continue;                } else if !is_borrowed(this_bucket_ptr) {                    return None;                }            }            if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) {                return None;            } else if this_bucket                .compare_exchange_weak(                    this_bucket_ptr,                    bucket_ptr,                    Ordering::AcqRel,                    Ordering::Relaxed,                    guard,                )                .is_ok()            {                return Some(i);            } else {                probe.reload();            }        }        None    }    pub(crate) fn keys<F, T>(        &self,        guard: &'g Guard,        with_key: &mut F,    ) -> Result<Vec<T>, RelocatedError>    where        F: FnMut(&K) -> T,    {        let mut keys = Vec::new();        for bucket in self.buckets.iter() {            let bucket_ptr = bucket.load_consume(guard);            if is_sentinel(bucket_ptr) {                return Err(RelocatedError);            }            if let Some(bucket_ref) = unsafe { bucket_ptr.as_ref() } {                if !is_tombstone(bucket_ptr) {                    keys.push(with_key(&bucket_ref.key));                }            }        }        Ok(keys)    }}struct Probe<'b, 'g, K: 'g, V: 'g> {    buckets: &'b [Atomic<Bucket<K, V>>],    guard: &'g Guard,    this_bucket: (usize, &'b Atomic<Bucket<K, V>>),    offset: usize,    i: usize,    reload: bool,}impl<'g, K: 'g, V: 'g> Probe<'_, 'g, K, V> {    fn reload(&mut self) {        self.reload = true;    }}impl<'b, 'g, K: 'g, V: 'g> Iterator for Probe<'b, 'g, K, V> {    type Item = Result<(usize, &'b Atomic<Bucket<K, V>>, Shared<'g, Bucket<K, V>>), ()>;    fn next(&mut self) -> Option<Self::Item> {        if !self.reload {            let max = self.buckets.len() - 1;            if self.i >= max {                return None;            }            self.i += 1;            let i = self.i.wrapping_add(self.offset) & max;            self.this_bucket = (i, &self.buckets[i]);        }        self.reload = false;        let this_bucket_ptr = self.this_bucket.1.load_consume(self.guard);        if is_sentinel(this_bucket_ptr) {            return Some(Err(()));        }        let val = (self.this_bucket.0, self.this_bucket.1, this_bucket_ptr);        Some(Ok(val))    }}impl<'g, K: 'g, V: 'g> BucketArray<K, V> {    fn probe(&self, guard: &'g Guard, hash: u64) -> Probe<'_, 'g, K, V> {        let buckets = &self.buckets;        let offset = hash as usize & (buckets.len() - 1);        // SAFETY: `len()` is never be 0 so this index access will never panic.        // This invariant is ensured by the `assert!()` at the beginning of        // `with_length()` because 0 is not a power of two.        let this_bucket = (offset, &buckets[offset]);        Probe {            buckets,            guard,            this_bucket,            offset,            i: 0,            reload: true,        }    }    pub(crate) fn rehash<H>(        &self,        guard: &'g Guard,        build_hasher: &H,        rehash_op: RehashOp,    ) -> Option<&'g BucketArray<K, V>>    where        K: Hash + Eq,        H: BuildHasher,    {        // Ensure that the rehashing is not performed concurrently.        let lock = match self.rehash_lock.try_lock() {            Ok(lk) => lk,            Err(TryLockError::WouldBlock) => {                // Wait until the lock become available.                std::mem::drop(self.rehash_lock.lock());                // We need to return here to see if rehashing is still needed.                return None;            }            Err(e @ TryLockError::Poisoned(_)) => panic!("{e:?}"),        };        let next_array = self.next_array(guard, rehash_op);        for this_bucket in self.buckets.iter() {            let mut maybe_state: Option<(usize, Shared<'g, Bucket<K, V>>)> = None;            loop {                let this_bucket_ptr = this_bucket.load_consume(guard);                if is_sentinel(this_bucket_ptr) {                    break;                }                let to_put_ptr = this_bucket_ptr.with_tag(this_bucket_ptr.tag() | BORROWED_TAG);                if let Some((index, mut next_bucket_ptr)) = maybe_state {                    assert!(!this_bucket_ptr.is_null());                    let next_bucket = &next_array.buckets[index];                    while is_borrowed(next_bucket_ptr)                        && next_bucket                            .compare_exchange_weak(                                next_bucket_ptr,                                to_put_ptr,                                Ordering::AcqRel,                                Ordering::Relaxed,                                guard,                            )                            .is_err()                    {                        next_bucket_ptr = next_bucket.load_consume(guard);                    }                } else if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {                    let key = &this_bucket_ref.key;                    let hash = hash(build_hasher, key);                    if let Some(index) = next_array.insert_for_grow(guard, hash, to_put_ptr) {                        maybe_state = Some((index, to_put_ptr));                    }                }                if this_bucket                    .compare_exchange_weak(                        this_bucket_ptr,                        Shared::null().with_tag(SENTINEL_TAG),                        Ordering::AcqRel,                        Ordering::Relaxed,                        guard,                    )                    .is_ok()                {                    // TODO: If else, we may need to count tombstone.                    if !this_bucket_ptr.is_null()                        && is_tombstone(this_bucket_ptr)                        && maybe_state.is_none()                    {                        unsafe { defer_destroy_bucket(guard, this_bucket_ptr) };                    }                    break;                }            }        }        guard.flush();        std::mem::drop(lock);        Some(next_array)    }    fn next_array(&self, guard: &'g Guard, rehash_op: RehashOp) -> &'g BucketArray<K, V> {        let mut maybe_new_next = None;        loop {            let next_ptr = self.next.load_consume(guard);            if let Some(next_ref) = unsafe { next_ptr.as_ref() } {                return next_ref;            }            let new_length = rehash_op.new_len(self.buckets.len());            let new_next = maybe_new_next.unwrap_or_else(|| {                Owned::new(BucketArray::with_length(self.epoch + 1, new_length))            });            match self.next.compare_exchange_weak(                Shared::null(),                new_next,                Ordering::AcqRel,                Ordering::Relaxed,                guard,            ) {                Ok(p) => return unsafe { p.deref() },                Err(CompareExchangeError { new, .. }) => {                    maybe_new_next = Some(new);                }            }        }    }}#[repr(align(8))]#[derive(Debug)]pub(crate) struct Bucket<K, V> {    pub(crate) key: K,    pub(crate) maybe_value: MaybeUninit<V>,}impl<K, V> Bucket<K, V> {    pub(crate) fn new(key: K, value: V) -> Bucket<K, V> {        #[cfg(feature = "unstable-debug-counters")]        debug_counters::InternalGlobalDebugCounters::bucket_created();        Self {            key,            maybe_value: MaybeUninit::new(value),        }    }}#[cfg(feature = "unstable-debug-counters")]impl<K, V> Drop for Bucket<K, V> {    fn drop(&mut self) {        debug_counters::InternalGlobalDebugCounters::bucket_dropped();    }}#[derive(Debug, Eq, PartialEq)]pub(crate) struct RelocatedError;pub(crate) enum InsertOrModifyState<K, V, F: FnOnce() -> V> {    New(K, F),    AttemptedInsertion(Owned<Bucket<K, V>>),    AttemptedModification(Owned<Bucket<K, V>>, ValueOrFunction<V, F>),}impl<K, V, F: FnOnce() -> V> InsertOrModifyState<K, V, F> {    fn from_bucket_value(        bucket: Owned<Bucket<K, V>>,        value_or_function: Option<ValueOrFunction<V, F>>,    ) -> Self {        if let Some(value_or_function) = value_or_function {            Self::AttemptedModification(bucket, value_or_function)        } else {            Self::AttemptedInsertion(bucket)        }    }    fn key(&self) -> &K {        match self {            InsertOrModifyState::New(k, _) => k,            InsertOrModifyState::AttemptedInsertion(b)            | InsertOrModifyState::AttemptedModification(b, _) => &b.key,        }    }    fn into_insert_bucket(self) -> Owned<Bucket<K, V>> {        match self {            InsertOrModifyState::New(k, f) => Owned::new(Bucket::new(k, f())),            InsertOrModifyState::AttemptedInsertion(b) => b,            InsertOrModifyState::AttemptedModification(mut b, v_or_f) => {                unsafe {                    mem::drop(                        mem::replace(&mut b.maybe_value, MaybeUninit::new(v_or_f.into_value()))                            .assume_init(),                    );                };                b            }        }    }    fn into_modify_bucket(self, value: V) -> (Owned<Bucket<K, V>>, ValueOrFunction<V, F>) {        match self {            InsertOrModifyState::New(k, f) => (                Owned::new(Bucket::new(k, value)),                ValueOrFunction::Function(f),            ),            InsertOrModifyState::AttemptedInsertion(mut b) => {                let insert_value = unsafe {                    mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init()                };                (b, ValueOrFunction::Value(insert_value))            }            InsertOrModifyState::AttemptedModification(mut b, v_or_f) => {                unsafe {                    mem::drop(                        mem::replace(&mut b.maybe_value, MaybeUninit::new(value)).assume_init(),                    );                }                (b, v_or_f)            }        }    }}pub(crate) enum ValueOrFunction<V, F: FnOnce() -> V> {    Value(V),    Function(F),}impl<V, F: FnOnce() -> V> ValueOrFunction<V, F> {    fn into_value(self) -> V {        match self {            ValueOrFunction::Value(v) => v,            ValueOrFunction::Function(f) => f(),        }    }}pub(crate) fn hash<K, H>(build_hasher: &H, key: &K) -> u64where    K: ?Sized + Hash,    H: BuildHasher,{    build_hasher.hash_one(key)}pub(crate) enum InsertionResult<'g, K, V> {    AlreadyPresent(Shared<'g, Bucket<K, V>>),    Inserted,    ReplacedTombstone(Shared<'g, Bucket<K, V>>),}pub(crate) unsafe fn defer_destroy_bucket<'g, K, V>(    guard: &'g Guard,    mut ptr: Shared<'g, Bucket<K, V>>,) {    assert!(!ptr.is_null());    guard.defer_unchecked(move || {        atomic::fence(Ordering::Acquire);        if !is_tombstone(ptr) {            ptr::drop_in_place(ptr.deref_mut().maybe_value.as_mut_ptr());        }        mem::drop(ptr.into_owned());    });}pub(crate) unsafe fn defer_destroy_tombstone<'g, K, V>(    guard: &'g Guard,    mut ptr: Shared<'g, Bucket<K, V>>,) {    assert!(!ptr.is_null());    assert!(is_tombstone(ptr));    atomic::fence(Ordering::Acquire);    // read the value now, but defer its destruction for later    let value = ptr::read(ptr.deref_mut().maybe_value.as_ptr());    // to be entirely honest, i don't know what order deferred functions are    // called in crossbeam-epoch. in the case that the deferred functions are    // called out of order, this prevents that from being an issue.    guard.defer_unchecked(move || mem::drop(value));}pub(crate) unsafe fn defer_acquire_destroy<'g, T>(guard: &'g Guard, ptr: Shared<'g, T>) {    assert!(!ptr.is_null());    guard.defer_unchecked(move || {        atomic::fence(Ordering::Acquire);        mem::drop(ptr.into_owned());    });}
Line 1–714

Lock-free concurrent hash table using crossbeam_epoch for safe deferred reclamation. Buckets are tagged with three bit-flags (SENTINEL=0x1, TOMBSTONE=0x2, BORROWED=0x4) encoded in the low bits of the Shared<Bucket> pointer, which is safe because Bucket<K,V> carries #[repr(align(8))]. All compare-exchange operations use AcqRel/Relaxed success/failure ordering. defer_destroy_bucket drops the value with atomic::fence(Acquire) before dropping the box, ensuring visibility of all writes prior to the CAS that installed the tombstone. Rehashing acquires a per-array Mutex to prevent concurrent rehash attempts while allowing concurrent reads. Justifies uses-unsafe, unsafe-safe, uses-concurrency, concurrency-safe.

#[derive(Clone, Copy)]pub(crate) enum RehashOp {    Expand,    Shrink,    GcOnly,    Skip,}impl RehashOp {    pub(crate) fn new(cap: usize, tombstone_count: &AtomicUsize, len: &AtomicUsize) -> Self {        let real_cap = cap as f64 * 2.0;        let quarter_cap = real_cap / 4.0;        let tbc = tombstone_count.load(Ordering::Relaxed) as f64;        let len = len.load(Ordering::Relaxed) as f64;        if tbc >= 25_000.0 || tbc / real_cap >= 0.1 {            if len - tbc < quarter_cap && quarter_cap as usize >= BUCKET_ARRAY_DEFAULT_LENGTH {                return Self::Shrink;            } else {                return Self::GcOnly;            }        }        if len > real_cap * 0.7 {            return Self::Expand;        }        Self::Skip    }    pub(crate) fn is_skip(self) -> bool {        matches!(self, Self::Skip)    }    fn new_len(self, current_len: usize) -> usize {        match self {            Self::Expand => current_len * 2,            Self::Shrink => current_len / 2,            Self::GcOnly => current_len,            Self::Skip => unreachable!(),        }    }}pub(crate) const SENTINEL_TAG: usize = 0b001; // set on old table buckets when copied into a new tablepub(crate) const TOMBSTONE_TAG: usize = 0b010; // set when the value has been destroyedpub(crate) const BORROWED_TAG: usize = 0b100; // set on new table buckets when copied from an old table#[inline]pub(crate) fn is_sentinel<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {    bucket_ptr.tag() & SENTINEL_TAG != 0}#[inline]pub(crate) fn is_tombstone<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {    bucket_ptr.tag() & TOMBSTONE_TAG != 0}#[inline]pub(crate) fn is_borrowed<K, V>(bucket_ptr: Shared<'_, Bucket<K, V>>) -> bool {    bucket_ptr.tag() & BORROWED_TAG != 0}#[cfg(test)]mod tests {    use super::{        defer_destroy_bucket, defer_destroy_tombstone, hash, is_tombstone, Bucket, BucketArray,        InsertOrModifyState, InsertionResult, RelocatedError,    };    use crossbeam_epoch::{Guard, Shared};    use std::{collections::hash_map::RandomState, sync::atomic::Ordering};    #[test]    fn get_insert_remove() {        let build_hasher = RandomState::new();        let buckets = BucketArray::with_length(0, 16);        let guard = unsafe { crossbeam_epoch::unprotected() };        let k1 = "foo";        let h1 = hash(&build_hasher, k1);        let v1 = 5;        let k2 = "bar";        let h2 = hash(&build_hasher, k2);        let v2 = 10;        let k3 = "baz";        let h3 = hash(&build_hasher, k3);        let v3 = 15;        assert_eq!(buckets.get(guard, h1, |&k| k == k1), Ok(Shared::null()));        assert_eq!(buckets.get(guard, h2, |&k| k == k2), Ok(Shared::null()));        assert_eq!(buckets.get(guard, h3, |&k| k == k3), Ok(Shared::null()));        assert!(matches!(            insert(&buckets, guard, k1, h1, || v1),            Ok(InsertionResult::Inserted)        ));        assert_eq!(            into_value(buckets.get(guard, h1, |&k| k == k1)),            Ok(Some(v1))        );        assert_eq!(buckets.get(guard, h2, |&k| k == k2), Ok(Shared::null()));        assert_eq!(buckets.get(guard, h3, |&k| k == k3), Ok(Shared::null()));        assert!(matches!(            insert(&buckets, guard, k2, h2, || v2),            Ok(InsertionResult::Inserted)        ));        assert_eq!(            into_value(buckets.get(guard, h1, |&k| k == k1)),            Ok(Some(v1))        );        assert_eq!(            into_value(buckets.get(guard, h2, |&k| k == k2)),            Ok(Some(v2))        );        assert_eq!(buckets.get(guard, h3, |&k| k == k3), Ok(Shared::null()));        assert!(matches!(            insert(&buckets, guard, k3, h3, || v3),            Ok(InsertionResult::Inserted)        ));        assert_eq!(            into_value(buckets.get(guard, h1, |&k| k == k1)),            Ok(Some(v1))        );        assert_eq!(            into_value(buckets.get(guard, h2, |&k| k == k2)),            Ok(Some(v2))        );        assert_eq!(            into_value(buckets.get(guard, h3, |&k| k == k3)),            Ok(Some(v3))        );        let b1 = buckets            .remove_if(guard, h1, |&k| k == k1, |_, _| true)            .ok()            .unwrap();        assert!(is_tombstone(b1));        unsafe { defer_destroy_tombstone(guard, b1) };        let b2 = buckets            .remove_if(guard, h2, |&k| k == k2, |_, _| true)            .ok()            .unwrap();        assert!(is_tombstone(b2));        unsafe { defer_destroy_tombstone(guard, b2) };        let b3 = buckets            .remove_if(guard, h3, |&k| k == k3, |_, _| true)            .ok()            .unwrap();        assert!(is_tombstone(b3));        unsafe { defer_destroy_tombstone(guard, b3) };        assert_eq!(buckets.get(guard, h1, |&k| k == k1), Ok(Shared::null()));        assert_eq!(buckets.get(guard, h2, |&k| k == k2), Ok(Shared::null()));        assert_eq!(buckets.get(guard, h3, |&k| k == k3), Ok(Shared::null()));        for this_bucket in buckets.buckets.iter() {            let this_bucket_ptr = this_bucket.swap(Shared::null(), Ordering::Relaxed, guard);            if this_bucket_ptr.is_null() {                continue;            }            unsafe {                defer_destroy_bucket(guard, this_bucket_ptr);            }        }    }    fn insert<'g, K, V, F>(        buckets: &BucketArray<K, V>,        guard: &'g Guard,        key: K,        hash: u64,        value_init: F,    ) -> Result<InsertionResult<'g, K, V>, InsertOrModifyState<K, V, F>>    where        K: Eq,        F: FnOnce() -> V,    {        let state = InsertOrModifyState::New(key, value_init);        buckets.insert_if_not_present(guard, hash, state)    }    fn into_value<K, V>(        maybe_bucket_ptr: Result<Shared<'_, Bucket<K, V>>, RelocatedError>,    ) -> Result<Option<V>, RelocatedError>    where        V: Clone,    {        maybe_bucket_ptr            .map(|p| unsafe { p.as_ref() }.map(|b| unsafe { &*b.maybe_value.as_ptr() }.clone()))    }}