-
Notifications
You must be signed in to change notification settings - Fork 16
feat(db): Implement importer and listener for permanode #631
base: develop
Are you sure you want to change the base?
Conversation
b95e2fd
to
1271e47
Compare
The listener subscribes newly confirmed transactions from IRI and adds inserting tasks into the task queue of thread pool. The importer reads confirmed transactions from historical transaction files dumped from IRI and adds inserting tasks into the task queue of thread pool. Each worker in the thread pool establishes a session connected to ScyllaDB cluster and takes inserting tasks.
1271e47
to
7475196
Compare
|
||
static status_t init_importer_data(db_importer_thread_t* thread_data, db_permanode_pool_t* pool, char* file_list) { | ||
status_t ret = SC_OK; | ||
pthread_mutex_init(&thread_data->thread_mutex, NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need error checking here?
status_t ret = SC_OK; | ||
pthread_mutex_init(&thread_data->thread_mutex, NULL); | ||
thread_data->pool = pool; | ||
thread_data->file_path = strdup(file_list); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
status_t ret = SC_OK; | ||
db_importer_thread_t* thread_data = (db_importer_thread_t*)data; | ||
pthread_mutex_lock(&thread_data->thread_mutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
ta_log_error("Failed to open file %s\n", file_name_buffer); | ||
goto exit; | ||
} | ||
ta_log_info("%s %s\n", "starting to import file : ", file_name_buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will get an extra space after the colon
|
||
exit: | ||
if (ret == SC_OK) { | ||
ta_log_info("%s %s\n", "Successfully import file : ", thread_data->file_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
worker_threads = malloc(thread_num * sizeof(pthread_t)); | ||
worker_data = malloc(thread_num * sizeof(db_worker_thread_t)); | ||
|
||
db_permanode_thpool_init(&pool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
db_permanode_thpool_init_worker(worker_data + i, &pool, db_host); | ||
pthread_create(&worker_threads[i], NULL, db_permanode_worker_handler, (void*)&worker_data[i]); | ||
} | ||
init_importer_data(&importer_data, &pool, file_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
init_importer_data(&importer_data, &pool, file_path); | ||
pthread_create(&importer_thread, NULL, (void*)importer_handler, (void*)&importer_data); | ||
|
||
pthread_join(importer_thread, NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
* | ||
* @param[in] data pointer to db_permanode_pool_t | ||
* | ||
* @return NULL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no returning NULL here
#include "scylladb_identity.h" | ||
#include "scylladb_permanode.h" | ||
|
||
#include "scylladb_permanode_thpool.h" | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a new line between line 15 and 16
The listener subscribes newly confirmed transactions from IRI and adds inserting tasks into the task queue of thread pool.
The importer reads confirmed transactions from historical transaction files dumped from IRI and adds inserting tasks into the task queue of thread pool.
Each worker in the thread pool establishes a session connected to ScyllaDB cluster and takes inserting tasks.