diff --git a/src/cereggii/atomic_dict/accessor_storage.c b/src/cereggii/atomic_dict/accessor_storage.c index b26db270..b965880e 100644 --- a/src/cereggii/atomic_dict/accessor_storage.c +++ b/src/cereggii/atomic_dict/accessor_storage.c @@ -7,10 +7,10 @@ AtomicDict_AccessorStorage * -AtomicDict_GetAccessorStorage(AtomicDict *self) +AtomicDict_GetOrCreateAccessorStorage(AtomicDict *self) { - assert(self->tss_key != NULL); - AtomicDict_AccessorStorage *storage = PyThread_tss_get(self->tss_key); + assert(self->accessor_key != NULL); + AtomicDict_AccessorStorage *storage = PyThread_tss_get(self->accessor_key); if (storage == NULL) { storage = PyObject_New(AtomicDict_AccessorStorage, &AtomicDictAccessorStorage_Type); @@ -19,12 +19,13 @@ AtomicDict_GetAccessorStorage(AtomicDict *self) storage->self_mutex.v = 0; storage->local_len = 0; + storage->participant_in_migration = 0; storage->reservation_buffer.head = 0; storage->reservation_buffer.tail = 0; storage->reservation_buffer.count = 0; memset(storage->reservation_buffer.reservations, 0, sizeof(AtomicDict_EntryLoc) * RESERVATION_BUFFER_SIZE); - int set = PyThread_tss_set(self->tss_key, storage); + int set = PyThread_tss_set(self->accessor_key, storage); if (set != 0) goto fail; @@ -45,6 +46,15 @@ AtomicDict_GetAccessorStorage(AtomicDict *self) return NULL; } +AtomicDict_AccessorStorage * +AtomicDict_GetAccessorStorage(Py_tss_t *accessor_key) +{ + assert(accessor_key != NULL); + AtomicDict_AccessorStorage *storage = PyThread_tss_get(accessor_key); + assert(storage != NULL); + return storage; +} + void AtomicDict_BeginSynchronousOperation(AtomicDict *self) { diff --git a/src/cereggii/atomic_dict/atomic_dict.c b/src/cereggii/atomic_dict/atomic_dict.c index b0cf41ec..ca6294f5 100644 --- a/src/cereggii/atomic_dict/atomic_dict.c +++ b/src/cereggii/atomic_dict/atomic_dict.c @@ -30,7 +30,7 @@ AtomicDict_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject *Py_UNUSE if (PyThread_tss_create(tss_key)) goto fail; assert(PyThread_tss_is_created(tss_key) != 0); - self->tss_key = tss_key; + self->accessor_key = tss_key; self->accessors = NULL; self->accessors = Py_BuildValue("[]"); @@ -216,7 +216,7 @@ AtomicDict_init(AtomicDict *self, PyObject *args, PyObject *kwargs) meta->inserting_block = self->len >> ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK; if (self->len > 0) { - storage = AtomicDict_GetAccessorStorage(self); + storage = AtomicDict_GetOrCreateAccessorStorage(self); if (storage == NULL) goto fail; @@ -246,7 +246,7 @@ AtomicDict_init(AtomicDict *self, PyObject *args, PyObject *kwargs) self->accessors_lock.v = 0; // https://github.com/colesbury/nogil/blob/043f29ab2afab9cef5edd07875816d3354cb9d2c/Objects/dictobject.c#L334 if (!(AtomicDict_GetEntryAt(0, meta)->flags & ENTRY_FLAGS_RESERVED)) { - storage = AtomicDict_GetAccessorStorage(self); + storage = AtomicDict_GetOrCreateAccessorStorage(self); if (storage == NULL) goto fail; @@ -303,8 +303,8 @@ AtomicDict_dealloc(AtomicDict *self) Py_CLEAR(self->accessors); // this should be enough to deallocate the reservation buffers themselves as well: // the list should be the only reference to them anyway - PyThread_tss_delete(self->tss_key); - PyThread_tss_free(self->tss_key); + PyThread_tss_delete(self->accessor_key); + PyThread_tss_free(self->accessor_key); Py_TYPE(self)->tp_free((PyObject *) self); } @@ -490,16 +490,13 @@ AtomicDict_ApproxLen(AtomicDict *self) } Py_ssize_t -AtomicDict_Len(AtomicDict *self) +AtomicDict_Len_impl(AtomicDict *self) { PyObject *len = NULL, *local_len = NULL; Py_ssize_t int_len; - AtomicDict_BeginSynchronousOperation(self); - if (!self->len_dirty) { int_len = self->len; - AtomicDict_EndSynchronousOperation(self); return int_len; } @@ -528,16 +525,24 @@ AtomicDict_Len(AtomicDict *self) self->len = int_len; self->len_dirty = 0; - AtomicDict_EndSynchronousOperation(self); - Py_DECREF(len); return int_len; fail: Py_XDECREF(len); - AtomicDict_EndSynchronousOperation(self); return -1; } +Py_ssize_t +AtomicDict_Len(AtomicDict *self) +{ + Py_ssize_t len; + AtomicDict_BeginSynchronousOperation(self); + len = AtomicDict_Len_impl(self); + AtomicDict_EndSynchronousOperation(self); + + return len; +} + PyObject * AtomicDict_Debug(AtomicDict *self) { diff --git a/src/cereggii/atomic_dict/delete.c b/src/cereggii/atomic_dict/delete.c index 971d412a..a171765c 100644 --- a/src/cereggii/atomic_dict/delete.c +++ b/src/cereggii/atomic_dict/delete.c @@ -184,7 +184,7 @@ AtomicDict_DelItem(AtomicDict *self, PyObject *key) goto fail; AtomicDict_AccessorStorage *storage = NULL; - storage = AtomicDict_GetAccessorStorage(self); + storage = AtomicDict_GetOrCreateAccessorStorage(self); if (storage == NULL) goto fail; diff --git a/src/cereggii/atomic_dict/insert.c b/src/cereggii/atomic_dict/insert.c index 4dd41b5b..41a47f81 100644 --- a/src/cereggii/atomic_dict/insert.c +++ b/src/cereggii/atomic_dict/insert.c @@ -80,27 +80,27 @@ AtomicDict_ExpectedUpdateEntry(AtomicDict_Meta *meta, uint64_t entry_ix, } int -AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(AtomicDict_Meta *meta, - PyObject *key, Py_hash_t hash, +AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(AtomicDict_Meta *meta, PyObject *key, Py_hash_t hash, PyObject *expected, PyObject *desired, PyObject **current, - AtomicDict_EntryLoc *entry_loc, - int *must_grow, int *done, int *expectation, - AtomicDict_BufferedNodeReader *reader, - AtomicDict_Node *to_insert, uint64_t distance_0, int skip_entry_check) + AtomicDict_EntryLoc *entry_loc, int *must_grow, int *done, + int *expectation, AtomicDict_Node *to_insert, uint64_t distance_0, + int skip_entry_check) { assert(entry_loc != NULL); + AtomicDict_BufferedNodeReader reader; AtomicDict_Node temp[16]; int begin_write, end_write; beginning: - AtomicDict_ReadNodesFromZoneStartIntoBuffer(distance_0, reader, meta); + reader.zone = -1; + AtomicDict_ReadNodesFromZoneStartIntoBuffer(distance_0, &reader, meta); for (int i = (int) (distance_0 % meta->nodes_in_zone); i < meta->nodes_in_zone; i++) { - if (reader->buffer[i].node == 0) + if (reader.buffer[i].node == 0) goto empty_slot; if (!skip_entry_check) { - int updated = AtomicDict_ExpectedUpdateEntry(meta, reader->buffer[i].index, key, hash, expected, desired, + int updated = AtomicDict_ExpectedUpdateEntry(meta, reader.buffer[i].index, key, hash, expected, desired, current, done, expectation); if (updated < 0) goto fail; @@ -118,7 +118,7 @@ AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(AtomicDict_Meta *meta, return 0; } - AtomicDict_CopyNodeBuffers(reader->buffer, temp); + AtomicDict_CopyNodeBuffers(reader.buffer, temp); to_insert->index = entry_loc->location; to_insert->distance = 0; @@ -137,15 +137,15 @@ AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(AtomicDict_Meta *meta, } assert(rhr == ok); - AtomicDict_ComputeBeginEndWrite(meta, reader->buffer, temp, &begin_write, &end_write); + AtomicDict_ComputeBeginEndWrite(meta, reader.buffer, temp, &begin_write, &end_write); *done = AtomicDict_AtomicWriteNodesAt( distance_0 - (distance_0 % meta->nodes_in_zone) + begin_write, end_write - begin_write, - &reader->buffer[begin_write], &temp[begin_write], meta + &reader.buffer[begin_write], &temp[begin_write], meta ); if (!*done) { - reader->zone = -1; + reader.zone = -1; goto beginning; } @@ -179,27 +179,42 @@ AtomicDict_ExpectedInsertOrUpdate(AtomicDict_Meta *meta, PyObject *key, Py_hash_ int done, expectation; *must_grow = 0; + uint64_t distance_0 = AtomicDict_Distance0Of(hash, meta); + AtomicDict_Node node; + + if (expected == NOT_FOUND || expected == ANY) { + // insert fast-path + if (AtomicDict_ReadRawNodeAt(distance_0, meta) == 0) { + assert(entry_loc != NULL); + + node.index = entry_loc->location; + node.distance = 0; + node.tag = hash; + + if (AtomicDict_AtomicWriteNodesAt(distance_0, 1, &meta->zero, &node, meta)) { + return NOT_FOUND; + } + } + } + beginning: done = 0; expectation = 1; - uint64_t distance_0 = AtomicDict_Distance0Of(hash, meta); -// uint64_t distance = distance_0 % meta->nodes_in_zone; // shorter distances handled by the fast-path uint64_t distance = 0; - AtomicDict_BufferedNodeReader reader; - reader.zone = -1; PyObject *current = NULL; uint8_t is_compact = meta->is_compact; AtomicDict_Node to_insert; if (AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(meta, key, hash, expected, desired, ¤t, entry_loc, - must_grow, &done, &expectation, &reader, &to_insert, + must_grow, &done, &expectation, &to_insert, distance_0, skip_entry_check) < 0) goto fail; while (!done) { - AtomicDict_ReadNodesFromZoneStartIntoBuffer(distance_0 + distance, &reader, meta); + uint64_t ix = (distance_0 + distance) & (meta->size - 1); + AtomicDict_ReadNodeAt(ix, &node, meta); - if (reader.node.node == 0) { + if (node.node == 0) { if (expected != NOT_FOUND && expected != ANY) { expectation = 0; break; @@ -215,27 +230,23 @@ AtomicDict_ExpectedInsertOrUpdate(AtomicDict_Meta *meta, PyObject *key, Py_hash_ to_insert.distance = meta->max_distance; to_insert.tag = hash; - done = AtomicDict_AtomicWriteNodesAt(distance_0 + distance, 1, - &reader.buffer[reader.idx_in_buffer], &to_insert, - meta); + done = AtomicDict_AtomicWriteNodesAt(ix, 1, &node, &to_insert, meta); - if (!done) { - reader.zone = -1; + if (!done) continue; // don't increase distance - } - } else if (reader.node.node == meta->tombstone.node) { + } else if (node.node == meta->tombstone.node) { // pass - } else if (is_compact && !AtomicDict_NodeIsReservation(&reader.node, meta) && ( - (distance_0 + distance - reader.node.distance > distance_0) + } else if (is_compact && !AtomicDict_NodeIsReservation(&node, meta) && ( + (ix - node.distance > distance_0) )) { if (expected != NOT_FOUND && expected != ANY) { expectation = 0; break; } - } else if (reader.node.tag != (hash & meta->tag_mask)) { + } else if (node.tag != (hash & meta->tag_mask)) { // pass } else if (!skip_entry_check) { - int updated = AtomicDict_ExpectedUpdateEntry(meta, reader.node.index, key, hash, expected, desired, + int updated = AtomicDict_ExpectedUpdateEntry(meta, node.index, key, hash, expected, desired, ¤t, &done, &expectation); if (updated < 0) goto fail; @@ -324,7 +335,7 @@ AtomicDict_CompareAndSet(AtomicDict *self, PyObject *key, PyObject *expected, Py goto fail; AtomicDict_AccessorStorage *storage = NULL; - storage = AtomicDict_GetAccessorStorage(self); + storage = AtomicDict_GetOrCreateAccessorStorage(self); if (storage == NULL) goto fail; diff --git a/src/cereggii/atomic_dict/lookup.c b/src/cereggii/atomic_dict/lookup.c index 06559a92..1399b90c 100644 --- a/src/cereggii/atomic_dict/lookup.c +++ b/src/cereggii/atomic_dict/lookup.c @@ -93,6 +93,8 @@ void AtomicDict_LookupEntry(AtomicDict_Meta *meta, uint64_t entry_ix, Py_hash_t hash, AtomicDict_SearchResult *result) { + // index-only search + uint64_t ix = AtomicDict_Distance0Of(hash, meta); uint8_t is_compact; uint64_t probe, reservations; diff --git a/src/cereggii/atomic_dict/meta.c b/src/cereggii/atomic_dict/meta.c index 29b0f284..46593e7a 100644 --- a/src/cereggii/atomic_dict/meta.c +++ b/src/cereggii/atomic_dict/meta.c @@ -95,7 +95,9 @@ AtomicDictMeta_New(uint8_t log_size) meta->new_gen_metadata = NULL; meta->migration_leader = 0; - meta->copy_nodes_locks = NULL; + meta->node_to_migrate = 0; + meta->accessor_key = NULL; + meta->accessors = NULL; meta->new_metadata_ready = (AtomicEvent *) PyObject_CallObject((PyObject *) &AtomicEvent_Type, NULL); if (meta->new_metadata_ready == NULL) @@ -249,12 +251,6 @@ AtomicDictMeta_dealloc(AtomicDict_Meta *self) PyMem_RawFree(self->blocks); } - uint8_t *copy_nodes_locks = self->copy_nodes_locks; - if (copy_nodes_locks != NULL) { - self->copy_nodes_locks = NULL; - PyMem_RawFree(copy_nodes_locks); - } - Py_CLEAR(self->generation); Py_CLEAR(self->new_gen_metadata); Py_CLEAR(self->new_metadata_ready); diff --git a/src/cereggii/atomic_dict/migrate.c b/src/cereggii/atomic_dict/migrate.c index adabaff5..523737dd 100644 --- a/src/cereggii/atomic_dict/migrate.c +++ b/src/cereggii/atomic_dict/migrate.c @@ -125,6 +125,7 @@ AtomicDict_Migrate(AtomicDict *self, AtomicDict_Meta *current_meta /* borrowed * { assert(to_log_size <= from_log_size + 1); assert(to_log_size >= from_log_size - 1); + if (current_meta->migration_leader == 0) { int i_am_leader = CereggiiAtomic_CompareExchangeUIntPtr( ¤t_meta->migration_leader, @@ -143,7 +144,9 @@ int AtomicDict_LeaderMigrate(AtomicDict *self, AtomicDict_Meta *current_meta /* borrowed */, uint8_t from_log_size, uint8_t to_log_size) { + int holding_sync_lock = 0; AtomicDict_Meta *new_meta; + beginning: new_meta = NULL; new_meta = AtomicDictMeta_New(to_log_size); @@ -164,30 +167,39 @@ AtomicDict_LeaderMigrate(AtomicDict *self, AtomicDict_Meta *current_meta /* borr // blocks AtomicDict_BeginSynchronousOperation(self); + holding_sync_lock = 1; if (from_log_size < to_log_size) { int ok = AtomicDictMeta_CopyBlocks(current_meta, new_meta); - if (ok < 0) { - AtomicDict_EndSynchronousOperation(self); + if (ok < 0) goto fail; - } } else { int ok = AtomicDictMeta_InitBlocks(new_meta); - if (ok < 0) { - AtomicDict_EndSynchronousOperation(self); + if (ok < 0) goto fail; - } AtomicDictMeta_ShrinkBlocks(self, current_meta, new_meta); } - AtomicDict_EndSynchronousOperation(self); for (uint64_t block_i = 0; block_i <= new_meta->greatest_allocated_block; ++block_i) { Py_INCREF(new_meta->blocks[block_i]); } - AtomicDictMeta_ClearIndex(new_meta); + if (from_log_size >= to_log_size) { + AtomicDictMeta_ClearIndex(new_meta); + AtomicDict_EndSynchronousOperation(self); + holding_sync_lock = 0; + } else { + current_meta->accessor_key = self->accessor_key; + current_meta->accessors = self->accessors; + + for (int64_t block = 0; block <= new_meta->greatest_allocated_block; ++block) { + new_meta->blocks[block]->generation = new_meta->generation; + } + + // AtomicDictMeta_ClearIndex(new_meta); + } // 👀 Py_INCREF(new_meta); @@ -200,16 +212,30 @@ AtomicDict_LeaderMigrate(AtomicDict *self, AtomicDict_Meta *current_meta /* borr // 🎉 int set = AtomicRef_CompareAndSet(self->metadata, (PyObject *) current_meta, (PyObject *) new_meta); assert(set); + + if (holding_sync_lock) { + for (int i = 0; i < PyList_Size(self->accessors); ++i) { + AtomicDict_AccessorStorage *accessor = (AtomicDict_AccessorStorage *) PyList_GetItem(self->accessors, i); + accessor->participant_in_migration = 0; + } + } + AtomicEvent_Set(current_meta->migration_done); - Py_DECREF(new_meta); // this may seem strange: why decref'ing the new meta? + Py_DECREF(new_meta); // this may seem strange: why decref the new meta? // the reason is that AtomicRef_CompareAndSet also increases new_meta's refcount, // which is exactly what we want. but the reference count was already 1, as it // was set during the initialization of new_meta. that's what we're decref'ing // for in here. + if (holding_sync_lock) { + AtomicDict_EndSynchronousOperation(self); + } return 1; fail: + if (holding_sync_lock) { + AtomicDict_EndSynchronousOperation(self); + } // don't block other threads indefinitely AtomicEvent_Set(current_meta->migration_done); AtomicEvent_Set(current_meta->node_migration_done); @@ -230,6 +256,16 @@ AtomicDict_FollowerMigrate(AtomicDict_Meta *current_meta) void AtomicDict_CommonMigrate(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta) +{ + if (current_meta->log_size < new_meta->log_size) { + AtomicDict_QuickMigrate(current_meta, new_meta); + } else { + AtomicDict_SlowMigrate(current_meta, new_meta); + } +} + +void +AtomicDict_SlowMigrate(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta) { if (!AtomicEvent_IsSet(current_meta->node_migration_done)) { int node_migration_done = AtomicDict_MigrateReInsertAll(current_meta, new_meta); @@ -241,17 +277,35 @@ AtomicDict_CommonMigrate(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_met } } +void +AtomicDict_QuickMigrate(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta) +{ + if (!AtomicEvent_IsSet(current_meta->node_migration_done)) { + // assert(self->accessors_lock is held by leader); + AtomicDict_AccessorStorage *storage = AtomicDict_GetAccessorStorage(current_meta->accessor_key); + CereggiiAtomic_StoreInt(&storage->participant_in_migration, 1); + + int node_migration_done = AtomicDict_MigrateNodes(current_meta, new_meta); + + if (node_migration_done) { + AtomicEvent_Set(current_meta->node_migration_done); + } + AtomicEvent_Wait(current_meta->node_migration_done); + } +} + int AtomicDict_MigrateReInsertAll(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta) { - uint64_t tid = _Py_ThreadId(); + uint64_t thread_id = _Py_ThreadId(); int64_t copy_lock; for (copy_lock = 0; copy_lock <= new_meta->greatest_allocated_block; ++copy_lock) { - uint64_t lock = (copy_lock + tid) % (new_meta->greatest_allocated_block + 1); + uint64_t lock = (copy_lock + thread_id) % (new_meta->greatest_allocated_block + 1); - int locked = CereggiiAtomic_CompareExchangePtr((void **) &new_meta->blocks[lock]->generation, current_meta->generation, NULL); + int locked = CereggiiAtomic_CompareExchangePtr((void **) &new_meta->blocks[lock]->generation, + current_meta->generation, NULL); if (!locked) continue; @@ -299,3 +353,175 @@ AtomicDict_MigrateReInsertAll(AtomicDict_Meta *current_meta, AtomicDict_Meta *ne return done; } + +__attribute__((unused)) int +AtomicDict_IndexNotFound(uint64_t index, AtomicDict_Meta *meta) +{ + AtomicDict_Node node; + + for (uint64_t i = 0; i < meta->size; ++i) { + AtomicDict_ReadNodeAt(i, &node, meta); + + if (node.index == index) { + return 0; + } + } + + return 1; +} + +__attribute__((unused)) uint64_t +AtomicDict_DebugLen(AtomicDict_Meta *meta) +{ + uint64_t len = 0; + AtomicDict_Node node; + + for (uint64_t i = 0; i < meta->size; ++i) { + AtomicDict_ReadNodeAt(i, &node, meta); + + if (node.node != 0 && !AtomicDict_NodeIsTombstone(&node, meta)) + len++; + } + + return len; +} + +inline void +AtomicDict_MigrateNode(AtomicDict_Node *node, uint64_t *distance_0, uint64_t *distance, AtomicDict_Meta *new_meta, + uint64_t size_mask) +{ + // assert(AtomicDict_IndexNotFound(node->index, new_meta)); + Py_hash_t hash = AtomicDict_GetEntryAt(node->index, new_meta)->hash; + uint64_t ix; + uint64_t d0 = AtomicDict_Distance0Of(hash, new_meta); + + if (d0 != *distance_0) { + *distance_0 = d0; + *distance = 0; + } + + node->tag = hash; + + if (!new_meta->is_compact) { + do { + ix = (d0 + *distance) & size_mask; + (*distance)++; + } while (AtomicDict_ReadRawNodeAt(ix, new_meta) != 0); + + node->distance = new_meta->max_distance; + + assert(AtomicDict_ReadRawNodeAt(ix, new_meta) == 0); + AtomicDict_WriteNodeAt(ix, node, new_meta); + + return; + } + + AtomicDict_RobinHoodResult inserted = AtomicDict_RobinHoodInsertRaw(new_meta, node, (int64_t) d0); + + assert(inserted != failed); + + if (inserted == grow) { + new_meta->is_compact = 0; + } + + (*distance)++; +} + +#define ATOMIC_DICT_BLOCKWISE_MIGRATE_SIZE 4096 +//#define ATOMIC_DICT_BLOCKWISE_MIGRATE_SIZE 64 + +void +AtomicDict_BlockWiseMigrate(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta, int64_t start_of_block) +{ + uint64_t current_size = current_meta->size; + uint64_t current_size_mask = current_size - 1; + uint64_t i = start_of_block; + + uint64_t end_of_block = start_of_block + ATOMIC_DICT_BLOCKWISE_MIGRATE_SIZE; + if (end_of_block > current_size) { + end_of_block = current_size; + } + assert(end_of_block > i); + + AtomicDict_Node node; + + // find first empty slot + while (i < end_of_block) { + if (AtomicDict_ReadRawNodeAt(i, current_meta) == 0) + break; + + i++; + } + + if (i >= end_of_block) + return; + + // initialize slots in range [i, end_of_block] + memset(AtomicDict_IndexAddressOf(i * 2, new_meta), 0, (end_of_block - i) * 2 * new_meta->node_size / 8); + + uint64_t new_size_mask = new_meta->size - 1; + uint64_t distance = 0; + uint64_t distance_0 = ULONG_LONG_MAX; + + for (; i < end_of_block; i++) { + AtomicDict_ReadNodeAt(i, &node, current_meta); + + if (node.node == 0 || AtomicDict_NodeIsTombstone(&node, current_meta)) + continue; + + AtomicDict_MigrateNode(&node, &distance_0, &distance, new_meta, new_size_mask); + } + + assert(i == end_of_block); + + while (node.node != 0 || i == end_of_block) { + memset( + AtomicDict_IndexAddressOf((i * 2) & new_size_mask, new_meta), 0, + ((i + 1) * 2 - (i * 2)) * new_meta->node_size / 8 + ); + AtomicDict_ReadNodeAt(i & current_size_mask, &node, current_meta); + + if (node.node != 0 && !AtomicDict_NodeIsTombstone(&node, current_meta)) { + AtomicDict_MigrateNode(&node, &distance_0, &distance, new_meta, new_size_mask); + } + + i++; + } +} + +int +AtomicDict_MigrateNodes(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta) +{ + uint64_t current_size = current_meta->size; + + int64_t node_to_migrate = CereggiiAtomic_AddInt64(¤t_meta->node_to_migrate, + ATOMIC_DICT_BLOCKWISE_MIGRATE_SIZE); + + while (node_to_migrate < current_size) { + AtomicDict_BlockWiseMigrate(current_meta, new_meta, node_to_migrate); + node_to_migrate = CereggiiAtomic_AddInt64(¤t_meta->node_to_migrate, ATOMIC_DICT_BLOCKWISE_MIGRATE_SIZE); + } + + AtomicDict_AccessorStorage *storage = AtomicDict_GetAccessorStorage(current_meta->accessor_key); + CereggiiAtomic_StoreInt(&storage->participant_in_migration, 2); + + return AtomicDict_NodesMigrationDone(current_meta); +} + +int +AtomicDict_NodesMigrationDone(const AtomicDict_Meta *current_meta) +{ + int done = 1; + + for (Py_ssize_t migrator = 0; migrator < PyList_Size(current_meta->accessors); ++migrator) { + AtomicDict_AccessorStorage *storage = + (AtomicDict_AccessorStorage *) PyList_GetItem(current_meta->accessors, migrator); + + if (storage->participant_in_migration == 1) { + done = 0; + break; + } + } + + return done; +} diff --git a/src/cereggii/atomic_dict/node_ops.c b/src/cereggii/atomic_dict/node_ops.c index b65a0108..c48a6d79 100644 --- a/src/cereggii/atomic_dict/node_ops.c +++ b/src/cereggii/atomic_dict/node_ops.c @@ -46,8 +46,6 @@ AtomicDict_ZoneOf(uint64_t ix, AtomicDict_Meta *meta) return AtomicDict_RegionOf(ix, meta) & (ULONG_MAX - 1UL); } -#define ABS(x) (((x) ^ ((x) >> (SIZEOF_PY_HASH_T * CHAR_BIT - 1))) - ((x) >> (SIZEOF_PY_HASH_T * CHAR_BIT - 1))) - #define UPPER_SEED 12923598712359872066ull #define LOWER_SEED 7467732452331123588ull #define REHASH(x) (uint64_t) (__builtin_ia32_crc32di((x), LOWER_SEED) | (__builtin_ia32_crc32di((x), UPPER_SEED) << 32)) @@ -106,6 +104,15 @@ AtomicDict_ParseNodeFromRegion(uint64_t ix, uint64_t region, AtomicDict_Node *no AtomicDict_ParseNodeFromRaw(node_raw, node, meta); } +inline uint64_t +AtomicDict_ParseRawNodeFromRegion(uint64_t ix, uint64_t region, AtomicDict_Meta *meta) +{ + uint64_t shift = AtomicDict_ShiftInRegionOf(ix, meta); + uint64_t node_raw = + (region & (meta->node_mask << (shift * meta->node_size))) >> (shift * meta->node_size); + return node_raw; +} + inline void AtomicDict_CopyNodeBuffers(AtomicDict_Node *from_buffer, AtomicDict_Node *to_buffer) { @@ -114,7 +121,7 @@ AtomicDict_CopyNodeBuffers(AtomicDict_Node *from_buffer, AtomicDict_Node *to_buf } } -void +inline void AtomicDict_ComputeBeginEndWrite(AtomicDict_Meta *meta, AtomicDict_Node *read_buffer, AtomicDict_Node *temp, int *begin_write, int *end_write) { @@ -168,13 +175,20 @@ AtomicDict_ComputeBeginEndWrite(AtomicDict_Meta *meta, AtomicDict_Node *read_buf } } -void +inline void AtomicDict_ReadNodeAt(uint64_t ix, AtomicDict_Node *node, AtomicDict_Meta *meta) { uint64_t node_region = meta->index[AtomicDict_RegionOf(ix, meta)]; AtomicDict_ParseNodeFromRegion(ix, node_region, node, meta); } +inline int64_t +AtomicDict_ReadRawNodeAt(uint64_t ix, AtomicDict_Meta *meta) +{ + uint64_t node_region = meta->index[AtomicDict_RegionOf(ix, meta)]; + return (int64_t) AtomicDict_ParseRawNodeFromRegion(ix, node_region, meta); +} + /** * the following functions read more than one node at a time. * NB: they expect all the nodes to be in two successive words of memory! @@ -334,12 +348,31 @@ AtomicDict_WriteNodeAt(uint64_t ix, AtomicDict_Node *node, AtomicDict_Meta *meta inline void AtomicDict_WriteRawNodeAt(uint64_t ix, uint64_t raw_node, AtomicDict_Meta *meta) { - uint64_t shift = ix & meta->shift_mask; + assert(ix >= 0); + assert(ix < meta->size); + + uint64_t shift = AtomicDict_ShiftInRegionOf(ix, meta); + assert(shift < meta->nodes_in_region); uint64_t region = AtomicDict_RegionOf(ix, meta); - uint64_t node_raw = meta->index[region]; - node_raw &= ~(meta->node_mask << (shift * meta->node_size)); - node_raw |= raw_node << (shift * meta->node_size); - meta->index[region] = node_raw; + assert(region < meta->size / meta->nodes_in_region); + uint64_t *region_address = &meta->index[region]; + + switch (meta->node_size) { + case 8: + *((uint8_t *) region_address + shift) = raw_node; + break; + case 16: + *((uint16_t *) region_address + shift) = raw_node; + break; + case 32: + *((uint32_t *) region_address + shift) = raw_node; + break; + case 64: + *((uint64_t *) region_address + shift) = raw_node; + break; + default: + assert(0); + } } inline int @@ -362,10 +395,13 @@ AtomicDict_MustWriteBytes(int n, AtomicDict_Meta *meta) return 16; } -int +inline int AtomicDict_AtomicWriteNodesAt(uint64_t ix, int n, AtomicDict_Node *expected, AtomicDict_Node *desired, AtomicDict_Meta *meta) { + assert(ix >= 0); + assert(ix < meta->size); + assert(ix <= meta->size - n); // XXX implement index circular behavior assert(n > 0); assert(n <= meta->nodes_in_zone); @@ -391,12 +427,7 @@ AtomicDict_AtomicWriteNodesAt(uint64_t ix, int n, AtomicDict_Node *expected, Ato int must_write = AtomicDict_MustWriteBytes(n, meta); int must_write_nodes = must_write / (meta->node_size / 8); - for (; i < must_write_nodes; ++i) { - node = expected[i].node; - node <<= meta->node_size * i; - expected_raw |= node; - desired_raw |= node; - } + assert(i == must_write_nodes); uint8_t *index_address = AtomicDict_IndexAddressOf(ix, meta); switch (must_write) { diff --git a/src/cereggii/atomic_dict/robin_hood.c b/src/cereggii/atomic_dict/robin_hood.c index 3cc1bba7..03d909cc 100644 --- a/src/cereggii/atomic_dict/robin_hood.c +++ b/src/cereggii/atomic_dict/robin_hood.c @@ -62,6 +62,58 @@ AtomicDict_RobinHoodInsert(AtomicDict_Meta *meta, AtomicDict_Node *nodes, Atomic return failed; } +AtomicDict_RobinHoodResult +AtomicDict_RobinHoodInsertRaw(AtomicDict_Meta *meta, AtomicDict_Node *to_insert, int64_t distance_0_ix) +{ + AtomicDict_Node current = *to_insert; + current.distance = 0; + AtomicDict_Node temp; + int64_t probe = 0; + int64_t cursor; + + beginning: + for (; probe < meta->size; probe++) { + cursor = (distance_0_ix + probe) & ((int64_t) meta->size - 1); + + if (probe >= meta->max_distance) { + while (AtomicDict_ReadRawNodeAt(cursor, meta) != 0) { + probe++; + cursor = (distance_0_ix + probe) & ((int64_t) meta->size - 1); + } + + current.distance = meta->max_distance; + assert(AtomicDict_ReadRawNodeAt(cursor, meta) == 0); + AtomicDict_WriteNodeAt(cursor, ¤t, meta); + + return grow; + } + + if (AtomicDict_ReadRawNodeAt(cursor, meta) == 0) { + assert(!AtomicDict_NodeIsReservation(¤t, meta)); + current.distance = probe; + + assert(AtomicDict_ReadRawNodeAt(cursor, meta) == 0); + AtomicDict_WriteNodeAt(cursor, ¤t, meta); + return ok; + } + + AtomicDict_ReadNodeAt(cursor, &temp, meta); + + if (temp.distance < probe) { + current.distance = probe; + assert(!AtomicDict_NodeIsReservation(¤t, meta)); + + AtomicDict_WriteNodeAt(cursor, ¤t, meta); + + current = temp; + distance_0_ix = cursor - temp.distance; + probe = temp.distance; + goto beginning; + } + } + return failed; +} + AtomicDict_RobinHoodResult AtomicDict_RobinHoodDelete(AtomicDict_Meta *meta, AtomicDict_Node *nodes, int to_delete) { diff --git a/src/include/atomic_dict.h b/src/include/atomic_dict.h index a026a128..910b41b6 100644 --- a/src/include/atomic_dict.h +++ b/src/include/atomic_dict.h @@ -25,7 +25,7 @@ typedef struct AtomicDict { Py_ssize_t len; uint8_t len_dirty; - Py_tss_t *tss_key; + Py_tss_t *accessor_key; PyMutex accessors_lock; PyObject *accessors; // PyListObject } AtomicDict; @@ -58,6 +58,8 @@ PyObject *AtomicDict_ApproxLen(AtomicDict *self); Py_ssize_t AtomicDict_Len(AtomicDict *self); +Py_ssize_t AtomicDict_Len_impl(AtomicDict *self); + PyObject *AtomicDict_FastIter(AtomicDict *self, PyObject *args, PyObject *kwargs); PyObject *AtomicDict_BatchGetItem(AtomicDict *self, PyObject *args, PyObject *kwargs); diff --git a/src/include/atomic_dict_internal.h b/src/include/atomic_dict_internal.h index 1e70e5a8..f3a092b0 100644 --- a/src/include/atomic_dict_internal.h +++ b/src/include/atomic_dict_internal.h @@ -116,7 +116,9 @@ struct AtomicDict_Meta { // migration AtomicDict_Meta *new_gen_metadata; uintptr_t migration_leader; - uint8_t *copy_nodes_locks; + int64_t node_to_migrate; + Py_tss_t *accessor_key; + PyObject *accessors; AtomicEvent *new_metadata_ready; AtomicEvent *node_migration_done; AtomicEvent *migration_done; @@ -158,6 +160,8 @@ void AtomicDict_ParseNodeFromRaw(uint64_t node_raw, AtomicDict_Node *node, void AtomicDict_ParseNodeFromRegion(uint64_t ix, uint64_t region, AtomicDict_Node *node, AtomicDict_Meta *meta); +uint64_t AtomicDict_ParseRawNodeFromRegion(uint64_t ix, uint64_t region, AtomicDict_Meta *meta); + uint64_t AtomicDict_RegionOf(uint64_t ix, AtomicDict_Meta *meta); uint64_t AtomicDict_ZoneOf(uint64_t ix, AtomicDict_Meta *meta); @@ -172,6 +176,8 @@ int AtomicDict_IndexAddressIsAligned(uint64_t ix, int alignment, AtomicDict_Meta void AtomicDict_ReadNodeAt(uint64_t ix, AtomicDict_Node *node, AtomicDict_Meta *meta); +int64_t AtomicDict_ReadRawNodeAt(uint64_t ix, AtomicDict_Meta *meta); + void AtomicDict_Read1NodeAt(uint64_t ix, AtomicDict_Node *nodes, AtomicDict_Meta *meta); void AtomicDict_Read2NodesAt(uint64_t ix, AtomicDict_Node *nodes, AtomicDict_Meta *meta); @@ -250,8 +256,27 @@ void AtomicDict_FollowerMigrate(AtomicDict_Meta *current_meta); void AtomicDict_CommonMigrate(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta); +void AtomicDict_QuickMigrate(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta); + +void AtomicDict_SlowMigrate(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta); + int AtomicDict_MigrateReInsertAll(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta); +int AtomicDict_PrepareHashArray(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta); + +void +AtomicDict_MigrateNode(AtomicDict_Node *node, uint64_t *distance_0, uint64_t *distance, AtomicDict_Meta *new_meta, + uint64_t size_mask); + +int AtomicDict_MigrateNodes(AtomicDict_Meta *current_meta, AtomicDict_Meta *new_meta); + +void +AtomicDict_SeekToProbeStart(AtomicDict_Meta *meta, uint64_t *pos, uint64_t displacement, uint64_t current_size_mask); + +void AtomicDict_SeekToProbeEnd(AtomicDict_Meta *meta, uint64_t *pos, uint64_t displacement, uint64_t current_size_mask); + +int AtomicDict_NodesMigrationDone(const AtomicDict_Meta *current_meta); + /// reservation buffer (see ./reservation_buffer.c) #define RESERVATION_BUFFER_SIZE 64 @@ -280,12 +305,16 @@ typedef struct AtomicDict_AccessorStorage { int32_t local_len; + int participant_in_migration; + AtomicDict_ReservationBuffer reservation_buffer; } AtomicDict_AccessorStorage; extern PyTypeObject AtomicDictAccessorStorage_Type; -AtomicDict_AccessorStorage *AtomicDict_GetAccessorStorage(AtomicDict *self); +AtomicDict_AccessorStorage *AtomicDict_GetOrCreateAccessorStorage(AtomicDict *self); + +AtomicDict_AccessorStorage *AtomicDict_GetAccessorStorage(Py_tss_t *accessor_key); void AtomicDict_BeginSynchronousOperation(AtomicDict *self); @@ -304,6 +333,9 @@ typedef enum AtomicDict_RobinHoodResult { AtomicDict_RobinHoodResult AtomicDict_RobinHoodInsert(AtomicDict_Meta *meta, AtomicDict_Node *nodes, AtomicDict_Node *to_insert, int distance_0_ix); +AtomicDict_RobinHoodResult AtomicDict_RobinHoodInsertRaw(AtomicDict_Meta *meta, AtomicDict_Node *to_insert, + int64_t distance_0_ix); + AtomicDict_RobinHoodResult AtomicDict_RobinHoodDelete(AtomicDict_Meta *meta, AtomicDict_Node *nodes, int to_delete); diff --git a/tests/test_atomic_dict.py b/tests/test_atomic_dict.py index fa0ec5ca..a9322384 100644 --- a/tests/test_atomic_dict.py +++ b/tests/test_atomic_dict.py @@ -526,8 +526,7 @@ def test_compact(): d.compact() for _ in range(20): assert d[keys[0][_]] is None - # assert d.debug()["meta"]["log_size"] == 9 - assert d.debug()["meta"]["log_size"] == 11 + assert d.debug()["meta"]["log_size"] == 9 d = AtomicDict({}, min_size=2**16) assert len(d.debug()["index"]) == 2**16