Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quick migrate & insert fast-path #21

Merged
merged 11 commits into from
Apr 24, 2024
18 changes: 14 additions & 4 deletions src/cereggii/atomic_dict/accessor_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@


AtomicDict_AccessorStorage *
AtomicDict_GetAccessorStorage(AtomicDict *self)
AtomicDict_GetOrCreateAccessorStorage(AtomicDict *self)
{
assert(self->tss_key != NULL);
AtomicDict_AccessorStorage *storage = PyThread_tss_get(self->tss_key);
assert(self->accessor_key != NULL);
AtomicDict_AccessorStorage *storage = PyThread_tss_get(self->accessor_key);

if (storage == NULL) {
storage = PyObject_New(AtomicDict_AccessorStorage, &AtomicDictAccessorStorage_Type);
Expand All @@ -19,12 +19,13 @@ AtomicDict_GetAccessorStorage(AtomicDict *self)

storage->self_mutex.v = 0;
storage->local_len = 0;
storage->participant_in_migration = 0;
storage->reservation_buffer.head = 0;
storage->reservation_buffer.tail = 0;
storage->reservation_buffer.count = 0;
memset(storage->reservation_buffer.reservations, 0, sizeof(AtomicDict_EntryLoc) * RESERVATION_BUFFER_SIZE);

int set = PyThread_tss_set(self->tss_key, storage);
int set = PyThread_tss_set(self->accessor_key, storage);
if (set != 0)
goto fail;

Expand All @@ -45,6 +46,15 @@ AtomicDict_GetAccessorStorage(AtomicDict *self)
return NULL;
}

AtomicDict_AccessorStorage *
AtomicDict_GetAccessorStorage(Py_tss_t *accessor_key)
{
assert(accessor_key != NULL);
AtomicDict_AccessorStorage *storage = PyThread_tss_get(accessor_key);
assert(storage != NULL);
return storage;
}

void
AtomicDict_BeginSynchronousOperation(AtomicDict *self)
{
Expand Down
29 changes: 17 additions & 12 deletions src/cereggii/atomic_dict/atomic_dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ AtomicDict_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject *Py_UNUSE
if (PyThread_tss_create(tss_key))
goto fail;
assert(PyThread_tss_is_created(tss_key) != 0);
self->tss_key = tss_key;
self->accessor_key = tss_key;

self->accessors = NULL;
self->accessors = Py_BuildValue("[]");
Expand Down Expand Up @@ -216,7 +216,7 @@ AtomicDict_init(AtomicDict *self, PyObject *args, PyObject *kwargs)
meta->inserting_block = self->len >> ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK;

if (self->len > 0) {
storage = AtomicDict_GetAccessorStorage(self);
storage = AtomicDict_GetOrCreateAccessorStorage(self);
if (storage == NULL)
goto fail;

Expand Down Expand Up @@ -246,7 +246,7 @@ AtomicDict_init(AtomicDict *self, PyObject *args, PyObject *kwargs)
self->accessors_lock.v = 0; // https://github.com/colesbury/nogil/blob/043f29ab2afab9cef5edd07875816d3354cb9d2c/Objects/dictobject.c#L334

if (!(AtomicDict_GetEntryAt(0, meta)->flags & ENTRY_FLAGS_RESERVED)) {
storage = AtomicDict_GetAccessorStorage(self);
storage = AtomicDict_GetOrCreateAccessorStorage(self);
if (storage == NULL)
goto fail;

Expand Down Expand Up @@ -303,8 +303,8 @@ AtomicDict_dealloc(AtomicDict *self)
Py_CLEAR(self->accessors);
// this should be enough to deallocate the reservation buffers themselves as well:
// the list should be the only reference to them anyway
PyThread_tss_delete(self->tss_key);
PyThread_tss_free(self->tss_key);
PyThread_tss_delete(self->accessor_key);
PyThread_tss_free(self->accessor_key);

Py_TYPE(self)->tp_free((PyObject *) self);
}
Expand Down Expand Up @@ -490,16 +490,13 @@ AtomicDict_ApproxLen(AtomicDict *self)
}

Py_ssize_t
AtomicDict_Len(AtomicDict *self)
AtomicDict_Len_impl(AtomicDict *self)
{
PyObject *len = NULL, *local_len = NULL;
Py_ssize_t int_len;

AtomicDict_BeginSynchronousOperation(self);

if (!self->len_dirty) {
int_len = self->len;
AtomicDict_EndSynchronousOperation(self);
return int_len;
}

Expand Down Expand Up @@ -528,16 +525,24 @@ AtomicDict_Len(AtomicDict *self)
self->len = int_len;
self->len_dirty = 0;

AtomicDict_EndSynchronousOperation(self);

Py_DECREF(len);
return int_len;
fail:
Py_XDECREF(len);
AtomicDict_EndSynchronousOperation(self);
return -1;
}

Py_ssize_t
AtomicDict_Len(AtomicDict *self)
{
Py_ssize_t len;
AtomicDict_BeginSynchronousOperation(self);
len = AtomicDict_Len_impl(self);
AtomicDict_EndSynchronousOperation(self);

return len;
}

PyObject *
AtomicDict_Debug(AtomicDict *self)
{
Expand Down
2 changes: 1 addition & 1 deletion src/cereggii/atomic_dict/delete.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ AtomicDict_DelItem(AtomicDict *self, PyObject *key)
goto fail;

AtomicDict_AccessorStorage *storage = NULL;
storage = AtomicDict_GetAccessorStorage(self);
storage = AtomicDict_GetOrCreateAccessorStorage(self);

if (storage == NULL)
goto fail;
Expand Down
75 changes: 43 additions & 32 deletions src/cereggii/atomic_dict/insert.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,27 @@ AtomicDict_ExpectedUpdateEntry(AtomicDict_Meta *meta, uint64_t entry_ix,
}

int
AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(AtomicDict_Meta *meta,
PyObject *key, Py_hash_t hash,
AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(AtomicDict_Meta *meta, PyObject *key, Py_hash_t hash,
PyObject *expected, PyObject *desired, PyObject **current,
AtomicDict_EntryLoc *entry_loc,
int *must_grow, int *done, int *expectation,
AtomicDict_BufferedNodeReader *reader,
AtomicDict_Node *to_insert, uint64_t distance_0, int skip_entry_check)
AtomicDict_EntryLoc *entry_loc, int *must_grow, int *done,
int *expectation, AtomicDict_Node *to_insert, uint64_t distance_0,
int skip_entry_check)
{
assert(entry_loc != NULL);
AtomicDict_BufferedNodeReader reader;
AtomicDict_Node temp[16];
int begin_write, end_write;

beginning:
AtomicDict_ReadNodesFromZoneStartIntoBuffer(distance_0, reader, meta);
reader.zone = -1;
AtomicDict_ReadNodesFromZoneStartIntoBuffer(distance_0, &reader, meta);

for (int i = (int) (distance_0 % meta->nodes_in_zone); i < meta->nodes_in_zone; i++) {
if (reader->buffer[i].node == 0)
if (reader.buffer[i].node == 0)
goto empty_slot;

if (!skip_entry_check) {
int updated = AtomicDict_ExpectedUpdateEntry(meta, reader->buffer[i].index, key, hash, expected, desired,
int updated = AtomicDict_ExpectedUpdateEntry(meta, reader.buffer[i].index, key, hash, expected, desired,
current, done, expectation);
if (updated < 0)
goto fail;
Expand All @@ -118,7 +118,7 @@ AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(AtomicDict_Meta *meta,
return 0;
}

AtomicDict_CopyNodeBuffers(reader->buffer, temp);
AtomicDict_CopyNodeBuffers(reader.buffer, temp);

to_insert->index = entry_loc->location;
to_insert->distance = 0;
Expand All @@ -137,15 +137,15 @@ AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(AtomicDict_Meta *meta,
}
assert(rhr == ok);

AtomicDict_ComputeBeginEndWrite(meta, reader->buffer, temp, &begin_write, &end_write);
AtomicDict_ComputeBeginEndWrite(meta, reader.buffer, temp, &begin_write, &end_write);

*done = AtomicDict_AtomicWriteNodesAt(
distance_0 - (distance_0 % meta->nodes_in_zone) + begin_write, end_write - begin_write,
&reader->buffer[begin_write], &temp[begin_write], meta
&reader.buffer[begin_write], &temp[begin_write], meta
);

if (!*done) {
reader->zone = -1;
reader.zone = -1;
goto beginning;
}

Expand Down Expand Up @@ -179,27 +179,42 @@ AtomicDict_ExpectedInsertOrUpdate(AtomicDict_Meta *meta, PyObject *key, Py_hash_
int done, expectation;
*must_grow = 0;

uint64_t distance_0 = AtomicDict_Distance0Of(hash, meta);
AtomicDict_Node node;

if (expected == NOT_FOUND || expected == ANY) {
// insert fast-path
if (AtomicDict_ReadRawNodeAt(distance_0, meta) == 0) {
assert(entry_loc != NULL);

node.index = entry_loc->location;
node.distance = 0;
node.tag = hash;

if (AtomicDict_AtomicWriteNodesAt(distance_0, 1, &meta->zero, &node, meta)) {
return NOT_FOUND;
}
}
}

beginning:
done = 0;
expectation = 1;
uint64_t distance_0 = AtomicDict_Distance0Of(hash, meta);
// uint64_t distance = distance_0 % meta->nodes_in_zone; // shorter distances handled by the fast-path
uint64_t distance = 0;
AtomicDict_BufferedNodeReader reader;
reader.zone = -1;
PyObject *current = NULL;
uint8_t is_compact = meta->is_compact;
AtomicDict_Node to_insert;

if (AtomicDict_ExpectedInsertOrUpdateCloseToDistance0(meta, key, hash, expected, desired, &current, entry_loc,
must_grow, &done, &expectation, &reader, &to_insert,
must_grow, &done, &expectation, &to_insert,
distance_0, skip_entry_check) < 0)
goto fail;

while (!done) {
AtomicDict_ReadNodesFromZoneStartIntoBuffer(distance_0 + distance, &reader, meta);
uint64_t ix = (distance_0 + distance) & (meta->size - 1);
AtomicDict_ReadNodeAt(ix, &node, meta);

if (reader.node.node == 0) {
if (node.node == 0) {
if (expected != NOT_FOUND && expected != ANY) {
expectation = 0;
break;
Expand All @@ -215,27 +230,23 @@ AtomicDict_ExpectedInsertOrUpdate(AtomicDict_Meta *meta, PyObject *key, Py_hash_
to_insert.distance = meta->max_distance;
to_insert.tag = hash;

done = AtomicDict_AtomicWriteNodesAt(distance_0 + distance, 1,
&reader.buffer[reader.idx_in_buffer], &to_insert,
meta);
done = AtomicDict_AtomicWriteNodesAt(ix, 1, &node, &to_insert, meta);

if (!done) {
reader.zone = -1;
if (!done)
continue; // don't increase distance
}
} else if (reader.node.node == meta->tombstone.node) {
} else if (node.node == meta->tombstone.node) {
// pass
} else if (is_compact && !AtomicDict_NodeIsReservation(&reader.node, meta) && (
(distance_0 + distance - reader.node.distance > distance_0)
} else if (is_compact && !AtomicDict_NodeIsReservation(&node, meta) && (
(ix - node.distance > distance_0)
)) {
if (expected != NOT_FOUND && expected != ANY) {
expectation = 0;
break;
}
} else if (reader.node.tag != (hash & meta->tag_mask)) {
} else if (node.tag != (hash & meta->tag_mask)) {
// pass
} else if (!skip_entry_check) {
int updated = AtomicDict_ExpectedUpdateEntry(meta, reader.node.index, key, hash, expected, desired,
int updated = AtomicDict_ExpectedUpdateEntry(meta, node.index, key, hash, expected, desired,
&current, &done, &expectation);
if (updated < 0)
goto fail;
Expand Down Expand Up @@ -324,7 +335,7 @@ AtomicDict_CompareAndSet(AtomicDict *self, PyObject *key, PyObject *expected, Py
goto fail;

AtomicDict_AccessorStorage *storage = NULL;
storage = AtomicDict_GetAccessorStorage(self);
storage = AtomicDict_GetOrCreateAccessorStorage(self);
if (storage == NULL)
goto fail;

Expand Down
2 changes: 2 additions & 0 deletions src/cereggii/atomic_dict/lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ void
AtomicDict_LookupEntry(AtomicDict_Meta *meta, uint64_t entry_ix, Py_hash_t hash,
AtomicDict_SearchResult *result)
{
// index-only search

uint64_t ix = AtomicDict_Distance0Of(hash, meta);
uint8_t is_compact;
uint64_t probe, reservations;
Expand Down
10 changes: 3 additions & 7 deletions src/cereggii/atomic_dict/meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ AtomicDictMeta_New(uint8_t log_size)

meta->new_gen_metadata = NULL;
meta->migration_leader = 0;
meta->copy_nodes_locks = NULL;
meta->node_to_migrate = 0;
meta->accessor_key = NULL;
meta->accessors = NULL;

meta->new_metadata_ready = (AtomicEvent *) PyObject_CallObject((PyObject *) &AtomicEvent_Type, NULL);
if (meta->new_metadata_ready == NULL)
Expand Down Expand Up @@ -249,12 +251,6 @@ AtomicDictMeta_dealloc(AtomicDict_Meta *self)
PyMem_RawFree(self->blocks);
}

uint8_t *copy_nodes_locks = self->copy_nodes_locks;
if (copy_nodes_locks != NULL) {
self->copy_nodes_locks = NULL;
PyMem_RawFree(copy_nodes_locks);
}

Py_CLEAR(self->generation);
Py_CLEAR(self->new_gen_metadata);
Py_CLEAR(self->new_metadata_ready);
Expand Down
Loading
Loading