avoid atomics in mi_free_try_collect_mt

This commit is contained in:
Daan Leijen 2025-02-04 14:58:08 -08:00
parent 59eeeadc34
commit db7930f961
4 changed files with 67 additions and 38 deletions

View file

@ -199,7 +199,8 @@ void _mi_heap_collect_retired(mi_heap_t* heap, bool force);
size_t _mi_page_queue_append(mi_heap_t* heap, mi_page_queue_t* pq, mi_page_queue_t* append);
void _mi_deferred_free(mi_heap_t* heap, bool force);
void _mi_page_free_collect(mi_page_t* page,bool force);
void _mi_page_free_collect(mi_page_t* page, bool force);
void _mi_page_free_collect_partly(mi_page_t* page, mi_block_t* head);
void _mi_page_init(mi_heap_t* heap, mi_page_t* page);
size_t _mi_bin_size(uint8_t bin); // for stats

View file

@ -48,7 +48,7 @@ static inline void mi_free_block_local(mi_page_t* page, mi_block_t* block, bool
}
// Forward declaration for multi-threaded collect
static void mi_decl_noinline mi_free_try_collect_mt(mi_page_t* page) mi_attr_noexcept;
static void mi_decl_noinline mi_free_try_collect_mt(mi_page_t* page, mi_block_t* mt_free) mi_attr_noexcept;
// Free a block multi-threaded
static inline void mi_free_block_mt(mi_page_t* page, mi_block_t* block) mi_attr_noexcept
@ -69,14 +69,14 @@ static inline void mi_free_block_mt(mi_page_t* page, mi_block_t* block) mi_attr_
mi_thread_free_t tf_old = mi_atomic_load_relaxed(&page->xthread_free);
do {
mi_block_set_next(page, block, mi_tf_block(tf_old));
tf_new = mi_tf_create(block, true /* always owned: try to claim it if abandoned */);
tf_new = mi_tf_create(block, true /* always use owned: try to claim it if the page is abandoned */);
} while (!mi_atomic_cas_weak_acq_rel(&page->xthread_free, &tf_old, tf_new)); // todo: release is enough?
// and atomically try to collect the page if it was abandoned
const bool is_owned_now = !mi_tf_is_owned(tf_old);
if (is_owned_now) {
mi_assert_internal(mi_page_is_abandoned(page));
mi_free_try_collect_mt(page);
mi_free_try_collect_mt(page,block);
}
}
@ -194,18 +194,20 @@ void mi_free(void* p) mi_attr_noexcept
// ------------------------------------------------------
static void mi_decl_noinline mi_free_try_collect_mt(mi_page_t* page) mi_attr_noexcept {
static void mi_decl_noinline mi_free_try_collect_mt(mi_page_t* page, mi_block_t* mt_free) mi_attr_noexcept {
mi_assert_internal(mi_page_is_owned(page));
mi_assert_internal(mi_page_is_abandoned(page));
// we own the page now..
// safe to collect the thread atomic free list
_mi_page_free_collect(page, false); // update `used` count
// use the `_partly` version to avoid atomic operations since we already have the `mt_free` pointing into the thread free list
_mi_page_free_collect_partly(page, mt_free);
#if MI_DEBUG > 1
if (mi_page_is_singleton(page)) { mi_assert_internal(mi_page_all_free(page)); }
#endif
// 1. free if the page is free now
// 1. free if the page is free now (this is updated by `_mi_page_free_collect_partly`)
if (mi_page_all_free(page))
{
// first remove it from the abandoned pages in the arena (if mapped, this waits for any readers to finish)

View file

@ -115,14 +115,14 @@ static void mi_heap_collect_ex(mi_heap_t* heap, mi_collect_t collect)
// collect retired pages
_mi_heap_collect_retired(heap, force);
// if (_mi_is_main_thread()) { mi_debug_show_arenas(true, false, false); }
// collect all pages owned by this thread
mi_heap_visit_pages(heap, &mi_heap_page_collect, &collect, NULL);
// collect arenas (this is program wide so don't force purges on abandonment of threads)
//mi_atomic_storei64_release(&heap->tld->subproc->purge_expire, 1);
// collect arenas (this is program wide so don't force purges on abandonment of threads)
//mi_atomic_storei64_release(&heap->tld->subproc->purge_expire, 1);
_mi_arenas_collect(collect == MI_FORCE /* force purge? */, collect >= MI_FORCE /* visit all? */, heap->tld);
}

View file

@ -137,9 +137,39 @@ bool _mi_page_is_valid(mi_page_t* page) {
Page collect the `local_free` and `thread_free` lists
----------------------------------------------------------- */
// Collect the local `thread_free` list using an atomic exchange.
static void _mi_page_thread_free_collect(mi_page_t* page)
static void mi_page_thread_collect_to_local(mi_page_t* page, mi_block_t* head)
{
if (head == NULL) return;
// find the last block in the list -- also to get a proper use count (without data races)
size_t max_count = page->capacity; // cannot collect more than capacity
size_t count = 1;
mi_block_t* last = head;
mi_block_t* next;
while ((next = mi_block_next(page, last)) != NULL && count <= max_count) {
count++;
last = next;
}
// if `count > max_count` there was a memory corruption (possibly infinite list due to double multi-threaded free)
if (count > max_count) {
_mi_error_message(EFAULT, "corrupted thread-free list\n");
return; // the thread-free items cannot be freed
}
// and append the current local free list
mi_block_set_next(page, last, page->local_free);
page->local_free = head;
// update counts now
mi_assert_internal(count <= UINT16_MAX);
page->used = page->used - (uint16_t)count;
}
// Collect the local `thread_free` list using an atomic exchange.
static void mi_page_thread_free_collect(mi_page_t* page)
{
// atomically capture the thread free list
mi_block_t* head;
mi_thread_free_t tfreex;
mi_thread_free_t tfree = mi_atomic_load_relaxed(&page->xthread_free);
@ -150,35 +180,15 @@ static void _mi_page_thread_free_collect(mi_page_t* page)
} while (!mi_atomic_cas_weak_acq_rel(&page->xthread_free, &tfree, tfreex)); // release is enough?
mi_assert_internal(head != NULL);
// find the tail -- also to get a proper count (without data races)
size_t max_count = page->capacity; // cannot collect more than capacity
size_t count = 1;
mi_block_t* tail = head;
mi_block_t* next;
while( (next = mi_block_next(page,tail)) != NULL && count <= max_count) {
count++;
tail = next;
}
// if `count > max_count` there was a memory corruption (possibly infinite list due to double multi-threaded free)
if (count > max_count) {
_mi_error_message(EFAULT, "corrupted thread-free list\n");
return; // the thread-free items cannot be freed
}
// and append the current local free list
mi_block_set_next(page,tail, page->local_free);
page->local_free = head;
// update counts now
page->used -= (uint16_t)count;
// and move it to the local list
mi_page_thread_collect_to_local(page, head);
}
void _mi_page_free_collect(mi_page_t* page, bool force) {
mi_assert_internal(page!=NULL);
// collect the thread free list
_mi_page_thread_free_collect(page);
mi_page_thread_free_collect(page);
// and the local free list
if (page->local_free != NULL) {
@ -205,6 +215,23 @@ void _mi_page_free_collect(mi_page_t* page, bool force) {
mi_assert_internal(!force || page->local_free == NULL);
}
// collect elements in the thread-free list starting at `head`.
void _mi_page_free_collect_partly(mi_page_t* page, mi_block_t* head) {
if (head == NULL) return;
mi_block_t* next = mi_block_next(page,head); // we cannot collect the head element itself as `page->thread_free` may point at it (and we want to avoid atomic ops)
if (next != NULL) {
mi_page_thread_collect_to_local(page, next);
if (page->local_free != NULL && page->free == NULL) {
page->free = page->local_free;
page->local_free = NULL;
page->free_is_zero = false;
}
}
if (page->used == 1) {
// all elements are free'd since we skipped the `head` element itself
_mi_page_free_collect(page, false); // collect the final element
}
}
/* -----------------------------------------------------------
@ -333,9 +360,8 @@ static void mi_page_to_full(mi_page_t* page, mi_page_queue_t* pq) {
// abandon full pages
_mi_page_abandon(page, pq);
}
else {
else if (!mi_page_is_in_full(page)) {
// put full pages in a heap local queue
if (mi_page_is_in_full(page)) return;
mi_page_queue_enqueue_from(&mi_page_heap(page)->pages[MI_BIN_FULL], pq, page);
_mi_page_free_collect(page, false); // try to collect right away in case another thread freed just before MI_USE_DELAYED_FREE was set
}