diff --git a/common/ta_errors.h b/common/ta_errors.h index d9e4001d..b94a4b1b 100644 --- a/common/ta_errors.h +++ b/common/ta_errors.h @@ -244,7 +244,13 @@ typedef enum { SC_STORAGE_INVALID_INPUT = 0x03 | SC_MODULE_STORAGE | SC_SEVERITY_MAJOR, /**< Invalid input parameter, e.g., null pointer */ SC_STORAGE_CASSANDRA_QUERY_FAIL = 0x04 | SC_MODULE_STORAGE | SC_SEVERITY_MAJOR, - /**< Failed to execute Cassandra query */ + /**< Failed to execute Cassandra query */ + SC_STORAGE_SYNC_ERROR = 0x05 | SC_MODULE_STORAGE | SC_SEVERITY_MAJOR, + /**< Failed to synchronize lastest confirmed transactions from IRI */ + SC_STORAGE_THPOOL_ADD_REQUEST_FAIL = 0x06 | SC_MODULE_STORAGE | SC_SEVERITY_MAJOR, + /**< Failed to add requests to permanode thread pool. (request queue full) */ + SC_STORAGE_PTHREAD_ERROR = 0x07 | SC_MODULE_STORAGE | SC_SEVERITY_MAJOR, + /**< Failed when calling pthread library */ // Core module SC_CORE_OOM = 0x01 | SC_MODULE_CORE | SC_SEVERITY_FATAL, diff --git a/docs/permanode.md b/docs/permanode.md index 85cb37a3..421dc5da 100644 --- a/docs/permanode.md +++ b/docs/permanode.md @@ -11,4 +11,34 @@ The ScyllaDB backend in `tangle-accelerator` supports the following APIs: - find_transactions_approvees - get_inclusion_status -See [docs/build.md] for more information about enabling the external storage. +Read [docs/build.md] for more information about enabling the external storage. + +## Listener + +The listener subscribes newly confirmed transactions from IRI and adds inserting tasks into the task queue of thread pool. + +Here are configurations and CLI options you need to specify: + +* `--iri_host`: Listening IRI host for ZMQ events and quering trytes. +* `--db_host`: Connecting ScyllaDB host name. +* `--thread_num`: Workers number in the thread pool to handle receiving transactions. + +Build command: + +`bazel build //storage:scylladb_listener` + +## Importer + +The importer reads confirmed transactions from historical transaction files dumped from IRI and adds inserting tasks into the task queue of thread pool. + +The historical transaction files must consist of lines with the format: `TRANSACTION_HASH,TRYTES,SNAPSHOT_INDEX` + +Here are configurations and CLI options you need to specify: + +* `--db_host`: Connecting ScyllaDB host name. +* `--file`: A file consist of historical transactions file pathes. +* `--thread_num`: Worker's number in the thread pool to handle receiving transactions. + +Build command: + +`bazel build //storage:scylladb_importer` diff --git a/storage/BUILD b/storage/BUILD index afaf7890..7a84a7a3 100644 --- a/storage/BUILD +++ b/storage/BUILD @@ -5,6 +5,28 @@ cc_library( deps = [ ":scylladb_identity", ":scylladb_permanode", + ":scylladb_permanode_thpool", + ], +) + +cc_binary( + name = "scylladb_importer", + srcs = ["scylladb_importer.c"], + deps = [ + ":storage", + ], +) + +cc_binary( + name = "scylladb_listener", + srcs = ["scylladb_listener.c"], + linkopts = [ + "-lzmq", + ], + deps = [ + ":storage", + "//accelerator:ta_config", + "@entangled//cclient/api", ], ) @@ -33,6 +55,16 @@ cc_library( ], ) +cc_library( + name = "scylladb_permanode_thpool", + srcs = ["scylladb_permanode_thpool.c"], + hdrs = ["scylladb_permanode_thpool.h"], + linkopts = ["-lpthread"], + deps = [ + ":scylladb_permanode", + ], +) + cc_library( name = "scylladb_permanode", srcs = ["scylladb_permanode.c"], diff --git a/storage/scylladb_client.h b/storage/scylladb_client.h index 2d34ffb7..fe6d85d6 100644 --- a/storage/scylladb_client.h +++ b/storage/scylladb_client.h @@ -37,10 +37,10 @@ typedef struct { } db_client_service_t; /** - * @brief init ScyllaDB client serivce and connect to specific cluster + * @brief init ScyllaDB client service and connect to specific cluster * * @param[out] service ScyllaDB client service - * @param[in] usage specfic usage for db client serivce + * @param[in] usage specfic usage for db client service * @return * - SC_OK on success * - non-zero on error @@ -48,7 +48,7 @@ typedef struct { status_t db_client_service_init(db_client_service_t* service, db_client_usage_t usage); /** - * @brief free ScyllaDB client serivce + * @brief free ScyllaDB client service * * @param[in] service ScyllaDB client service * @return diff --git a/storage/scylladb_importer.c b/storage/scylladb_importer.c new file mode 100644 index 00000000..5705232b --- /dev/null +++ b/storage/scylladb_importer.c @@ -0,0 +1,165 @@ +/* + * Copyright (C) 2020 BiiLabs Co., Ltd. and Contributors + * All Rights Reserved. + * This is free software; you can redistribute it and/or modify it under the + * terms of the MIT license. A copy of the license can be found in the file + * "LICENSE" at the root of this distribution. + */ + +#include +#include "ta_storage.h" + +#define logger_id scylladb_logger_id + +typedef struct { + pthread_mutex_t thread_mutex; + db_permanode_pool_t* pool; + char* file_path; +} db_importer_thread_t; + +static status_t init_importer_data(db_importer_thread_t* thread_data, db_permanode_pool_t* pool, char* file_list) { + status_t ret = SC_OK; + pthread_mutex_init(&thread_data->thread_mutex, NULL); + thread_data->pool = pool; + thread_data->file_path = strdup(file_list); + return ret; +} + +static void* importer_handler(void* data) { +#define TRANSACTION_BUFFER_SIZE \ + (NUM_FLEX_TRITS_HASH + 1 + NUM_TRYTES_SERIALIZED_TRANSACTION + 12) // 12 is for snapshot_index +#define MAX_FILE_PATH 256 + + status_t ret = SC_OK; + db_importer_thread_t* thread_data = (db_importer_thread_t*)data; + pthread_mutex_lock(&thread_data->thread_mutex); + FILE* list_file = NULL; + char file_name_buffer[MAX_FILE_PATH]; + + if ((list_file = fopen(thread_data->file_path, "r")) == NULL) { + /* The specified configuration file does not exist */ + ret = SC_CONF_FOPEN_ERROR; + ta_log_error("Failed to open file %s\n", thread_data->file_path); + goto exit; + } + + while (fgets(file_name_buffer, MAX_FILE_PATH, list_file) != NULL) { + char input_buffer[TRANSACTION_BUFFER_SIZE]; + FILE* file = NULL; + + int name_len = strlen(file_name_buffer); + if (name_len > 0) { + file_name_buffer[name_len - 1] = 0; + } else { + ta_log_warning("Empty file name\n"); + continue; + } + + if ((file = fopen(file_name_buffer, "r")) == NULL) { + /* The specified configuration file does not exist */ + ret = SC_CONF_FOPEN_ERROR; + ta_log_error("Failed to open file %s\n", file_name_buffer); + goto exit; + } + ta_log_info("%s %s\n", "starting to import file : ", file_name_buffer); + int cnt = 1; + int cnt_base1000 = 0; + while (fgets(input_buffer, TRANSACTION_BUFFER_SIZE, file) != NULL) { + if (cnt % 1000 == 0) { + ta_log_info("Import %d K transactions\n", ++cnt_base1000); + cnt = 0; + } + if (input_buffer[strlen(input_buffer) - 1] != '\n') { + ret = SC_STORAGE_INVALID_INPUT; + ta_log_error("%s\n", "Historical dump file format error"); + continue; + } + + do { + ret = db_permanode_thpool_add((tryte_t*)input_buffer, (tryte_t*)input_buffer + NUM_FLEX_TRITS_HASH + 1, + thread_data->pool); + if (ret != SC_OK) { + pthread_cond_wait(&thread_data->pool->finish_request, &thread_data->thread_mutex); + } + } while (ret != SC_OK); + + cnt++; + } + + ta_log_info("Successfully import file : %s\n", file_name_buffer); + } + +exit: + if (ret == SC_OK) { + ta_log_info("%s %s\n", "Successfully import file : ", thread_data->file_path); + } else { + ta_log_error("Failed to import file : %s\n", thread_data->file_path); + } + return NULL; +} + +int main(int argc, char* argv[]) { + int thread_num = 1; + pthread_t* worker_threads; /* thread's structures */ + pthread_t importer_thread; + db_worker_thread_t* worker_data; + db_importer_thread_t importer_data; + db_permanode_pool_t pool; + + char* db_host = "localhost"; + char* file_path = NULL; + const struct option longOpt[] = {{"db_host", required_argument, NULL, 's'}, + {"file", required_argument, NULL, 'f'}, + {"thread_num", required_argument, NULL, 't'}, + {NULL, 0, NULL, 0}}; + /* Parse the command line options */ + while (1) { + int cmdOpt; + int optIdx; + cmdOpt = getopt_long(argc, argv, "sft:", longOpt, &optIdx); + if (cmdOpt == -1) break; + + /* Invalid option */ + if (cmdOpt == '?') break; + + if (cmdOpt == 's') { + db_host = optarg; + } + if (cmdOpt == 'f') { + file_path = optarg; + } + if (cmdOpt == 't') { + thread_num = atoi(optarg); + } + } + if (file_path == NULL) { + ta_log_error("No specified import file list\n"); + return EXIT_FAILURE; + } + if (ta_logger_init() != SC_OK) { + ta_log_error("Failed to init logger\n"); + return EXIT_FAILURE; + } + scylladb_logger_init(); + worker_threads = malloc(thread_num * sizeof(pthread_t)); + worker_data = malloc(thread_num * sizeof(db_worker_thread_t)); + + db_permanode_thpool_init(&pool); + /* create the request-handling threads */ + for (int i = 0; i < thread_num; i++) { + db_permanode_thpool_init_worker(worker_data + i, &pool, db_host); + pthread_create(&worker_threads[i], NULL, db_permanode_worker_handler, (void*)&worker_data[i]); + } + init_importer_data(&importer_data, &pool, file_path); + pthread_create(&importer_thread, NULL, (void*)importer_handler, (void*)&importer_data); + + pthread_join(importer_thread, NULL); + + db_permanode_tpool_wait(&pool); + free(worker_data); + free(worker_threads); + + scylladb_logger_release(); + + return 0; +} diff --git a/storage/scylladb_listener.c b/storage/scylladb_listener.c new file mode 100644 index 00000000..27a2b6a0 --- /dev/null +++ b/storage/scylladb_listener.c @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2020 BiiLabs Co., Ltd. and Contributors + * All Rights Reserved. + * This is free software; you can redistribute it and/or modify it under the + * terms of the MIT license. A copy of the license can be found in the file + * "LICENSE" at the root of this distribution. + */ +#include +#include +#include "accelerator/config.h" +#include "cclient/api/extended/extended_api.h" +#include "ta_storage.h" + +#define logger_id scylladb_logger_id + +// Receive ZMQ string from socket and convert into C string +// Caller must free returned string. Returns NULL if the context +// is being terminated. +static char* receive_zmq_string_to_heap(void* socket) { +#define BUFFER_LEN 450 + char buffer[BUFFER_LEN]; + int size = zmq_recv(socket, buffer, BUFFER_LEN - 1, 0); + if (size == -1) { + return NULL; + } + if (size < BUFFER_LEN) { + buffer[size] = '\0'; + } + + return strndup(buffer, sizeof(buffer) - 1); +} + +static void init_iota_client_service(iota_client_service_t* const serv, char const* const host, uint16_t const port) { + serv->http.path = "/"; + serv->http.content_type = "application/json"; + serv->http.accept = "application/json"; + serv->http.host = host; + serv->http.port = port; + serv->http.api_version = 1; + serv->http.ca_pem = NULL; + serv->serializer_type = SR_JSON; + iota_client_core_init(serv); +} + +typedef struct { + pthread_mutex_t thread_mutex; + db_permanode_pool_t* pool; + iota_client_service_t* service; +} db_listener_data_t; + +static status_t db_init_listener_data(db_listener_data_t* data, db_permanode_pool_t* pool, + iota_client_service_t* service) { + status_t ret = SC_OK; + pthread_mutex_init(&data->thread_mutex, NULL); + data->pool = pool; + data->service = service; + return ret; +} + +static void* listener_handler(void* in) { + db_listener_data_t* data = (db_listener_data_t*)in; + pthread_mutex_lock(&data->thread_mutex); + char* zmq_server = malloc(strlen("tcp://") + strlen(data->service->http.host) + strlen(":5556") + 1); + strncpy(zmq_server, "tcp://", strlen("tcp://")); + strcat(zmq_server, data->service->http.host); + strcat(zmq_server, ":5556"); + get_trytes_res_t* tx_res = get_trytes_res_new(); + get_trytes_req_t* tx_req = get_trytes_req_new(); + char tx_hash[NUM_FLEX_TRITS_HASH + 1]; + ta_log_info("Start listening from ZMQ server : %s\n", zmq_server); + /* setting ZeroMQ */ + void* context = zmq_ctx_new(); + void* subscriber = zmq_socket(context, ZMQ_SUB); + if (subscriber == NULL) { + ta_log_error("Failed to establish subscriber\n"); + goto exit; + } + if (zmq_connect(subscriber, zmq_server) != 0) { + ta_log_error("Failed to connect to zmq server : %s\n", zmq_server); + goto exit; + } +#ifdef IRI_SUPPORT_SN_TRYTES + if (zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "sn_trytes", strlen("sn_trytes")) != 0) { + goto exit; + } +#else + if (zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "sn", strlen("sn")) != 0) { + ta_log_error("Failed to set setsockopt : sn\n"); + goto exit; + } +#endif + + /* receiving IRI publication */ + while (1) { + char* zmq_receive_string = receive_zmq_string_to_heap(subscriber); +#ifdef IRI_SUPPORT_SN_TRYTES + flex_trit_t tx_trytes[NUM_FLEX_TRITS_SERIALIZED_TRANSACTION + 1]; + sscanf(zmq_receive_string, "sn_trytes %s %s", ta_trytes, tx_hash); + hash8019_queue_push(&tx_res, ta_trytes); + hash243_queue_push(&ta_req->hashes, (flex_trit_t const* const)tx_hash); +#else + + int milestone_idx; + sscanf(zmq_receive_string, "sn %d %s", &milestone_idx, tx_hash); + ta_log_debug("Get hash %s\n", tx_hash); + if (hash243_queue_push(&tx_req->hashes, (flex_trit_t const* const)tx_hash) != RC_OK) { + ta_log_error("%s\n", "SC_STORAGE_OOM"); + goto loop_end; + } + if (iota_client_get_trytes(data->service, tx_req, tx_res) != RC_OK) { + ta_log_error("Failed to get trytes from IRI\n"); + } + +#endif + while (hash8019_queue_empty(tx_res->trytes) == false) { + while (db_permanode_thpool_add((tryte_t*)tx_req->hashes, (tryte_t*)tx_res->trytes->hash, data->pool) != SC_OK) { + /** The request queue is full or unavailable */ + pthread_cond_wait(&data->pool->finish_request, &data->thread_mutex); + } + hash8019_queue_pop(&tx_res->trytes); + hash243_queue_pop(&tx_req->hashes); + } + + loop_end: + free(zmq_receive_string); + } + +exit: + ta_log_info("Exit db listener handler\n"); + free(zmq_server); + zmq_close(subscriber); + zmq_ctx_destroy(context); + get_trytes_req_free(&tx_req); + get_trytes_res_free(&tx_res); + return NULL; +} + +int main(int argc, char* argv[]) { + iota_client_service_t iota_service; + char* db_host = "localhost"; + char* iri_host = "localhost"; + uint16_t iri_port = IRI_PORT; + int thread_num = 1; + pthread_t* worker_threads; /* thread's structures */ + pthread_t listener_thread; + db_worker_thread_t* worker_data; + db_listener_data_t listener_data; + db_permanode_pool_t pool; + + const struct option longOpt[] = {{"db_host", required_argument, NULL, 's'}, + {"iri_host", required_argument, NULL, 'i'}, + {"iri_port", required_argument, NULL, 'p'}, + {"thread_num", required_argument, NULL, 't'}, + {NULL, 0, NULL, 0}}; + /* Parse the command line options */ + while (1) { + int cmdOpt; + int optIdx; + cmdOpt = getopt_long(argc, argv, "sfpt:", longOpt, &optIdx); + if (cmdOpt == -1) break; + + /* Invalid option */ + if (cmdOpt == '?') break; + + if (cmdOpt == 's') { + db_host = optarg; + } + if (cmdOpt == 'i') { + iri_host = optarg; + } + if (cmdOpt == 'p') { + iri_port = atoi(optarg); + } + if (cmdOpt == 't') { + thread_num = atoi(optarg); + } + } + if (ta_logger_init() != SC_OK) { + ta_log_error("Failed to init logger\n"); + return EXIT_FAILURE; + } + scylladb_logger_init(); + + ta_log_info("Connect db_host = %s, iri_host = %s\n", db_host, iri_host); + init_iota_client_service(&iota_service, iri_host, iri_port); + worker_threads = malloc(thread_num * sizeof(pthread_t)); + worker_data = malloc(thread_num * sizeof(db_worker_thread_t)); + + if (db_permanode_thpool_init(&pool) != SC_OK) { + ta_log_error("Failed to init permanode threadpool\n"); + return EXIT_FAILURE; + } + /* create the request-handling threads */ + for (int i = 0; i < thread_num; i++) { + if (db_permanode_thpool_init_worker(worker_data + i, &pool, db_host) != SC_OK) { + ta_log_error("Failed to init permanode thpool worker\n"); + return EXIT_FAILURE; + } + pthread_create(&worker_threads[i], NULL, (void*)db_permanode_worker_handler, (void*)&worker_data[i]); + } + db_init_listener_data(&listener_data, &pool, &iota_service); + pthread_create(&listener_thread, NULL, (void*)listener_handler, (void*)&listener_data); + pthread_join(listener_thread, NULL); + free(worker_data); + free(worker_threads); + + scylladb_logger_release(); + + return 0; +} diff --git a/storage/scylladb_permanode_thpool.c b/storage/scylladb_permanode_thpool.c new file mode 100644 index 00000000..460de6ad --- /dev/null +++ b/storage/scylladb_permanode_thpool.c @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2020 BiiLabs Co., Ltd. and Contributors + * All Rights Reserved. + * This is free software; you can redistribute it and/or modify it under the + * terms of the MIT license. A copy of the license can be found in the file + * "LICENSE" at the root of this distribution. + */ + +#include "scylladb_permanode_thpool.h" + +#define logger_id scylladb_logger_id + +status_t db_permanode_thpool_init(db_permanode_pool_t* pool) { + if (pthread_mutex_init(&pool->request_mutex, NULL)) { + ta_log_error("Failed to init pthread mutex\n"); + return SC_STORAGE_PTHREAD_ERROR; + } + pthread_cond_init(&pool->got_request, NULL); + pthread_cond_init(&pool->finish_request, NULL); + pool->working_thread_num = 0; + pool->num_requests = 0; + pool->requests = NULL; + pool->last_request = NULL; + return SC_OK; +} + +status_t db_permanode_thpool_init_worker(db_worker_thread_t* thread_data, db_permanode_pool_t* pool, char* host) { + status_t ret; + pthread_mutex_init(&thread_data->thread_mutex, NULL); + thread_data->pool = pool; + thread_data->service.host = strdup(host); + ret = db_client_service_init(&thread_data->service, DB_USAGE_PERMANODE); + return ret; +} + +status_t db_permanode_thpool_add(const tryte_t* hash, const tryte_t* trytes, db_permanode_pool_t* pool) { +#define MAX_REQUEST_QUEUE_SIZE 100 + status_t ret = SC_OK; + struct request* a_request; /* pointer to newly added request. */ + pthread_mutex_lock(&pool->request_mutex); + if (pool->num_requests >= MAX_REQUEST_QUEUE_SIZE) { + pthread_mutex_unlock(&pool->request_mutex); + return SC_STORAGE_THPOOL_ADD_REQUEST_FAIL; + } + pthread_mutex_unlock(&pool->request_mutex); + + /* create structure with new request */ + a_request = (struct request*)malloc(sizeof(struct request)); + if (!a_request) { + ta_log_error("%s\n", ta_error_to_string(SC_TA_OOM)); + return SC_TA_OOM; + } + memcpy(a_request->hash, hash, sizeof(a_request->hash)); + memcpy(a_request->trytes, trytes, sizeof(a_request->trytes)); + + pthread_mutex_lock(&pool->request_mutex); + + if (pool->num_requests == 0) { /* special case - list is empty */ + pool->requests = a_request; + pool->last_request = a_request; + } else { + pool->last_request->next = a_request; + pool->last_request = a_request; + } + + pool->num_requests++; + pthread_mutex_unlock(&pool->request_mutex); + + /* signal the condition variable - there's a new request to handle */ + pthread_cond_broadcast(&pool->got_request); + + return ret; +} + +static struct request* get_request(db_permanode_pool_t* pool) { + struct request* a_request; /* pointer to request. */ + + pthread_mutex_lock(&pool->request_mutex); + + if (pool->num_requests > 0) { + a_request = pool->requests; + pool->requests = a_request->next; + if (pool->requests == NULL) { /* this was the last request on the list */ + pool->last_request = NULL; + } + /* decrease the total number of pending requests */ + pool->num_requests--; + pool->working_thread_num++; + } else { /* requests list is empty */ + a_request = NULL; + } + + pthread_mutex_unlock(&pool->request_mutex); + + /* return the request to the caller. */ + return a_request; +} + +static void handle_request(struct request* a_request, db_client_service_t* service) { +#define MAX_RETRY_CNT 10 + if (a_request) { + status_t ret; + int retry_cnt = 0; + do { + ret = db_permanode_insert_transaction(service, a_request->hash, a_request->trytes); + if (ret != SC_OK) { + ta_log_error("Failed to insert transaction %s\n", a_request->hash); + retry_cnt++; + + if (retry_cnt >= MAX_RETRY_CNT) { + do { + ta_log_warning("Failed %d times, try to reconnect\n", MAX_RETRY_CNT); + char* host = strdup(service->host); + db_client_service_free(service); + service->host = host; + + } while (db_client_service_init(service, DB_USAGE_PERMANODE) != SC_OK); + + ta_log_info("Init db service successed\n"); + retry_cnt = 0; + } + } + } while (ret != SC_OK); + } +} + +void* db_permanode_worker_handler(void* data) { + struct request* a_request; + db_worker_thread_t* thread_data = (db_worker_thread_t*)data; + pthread_mutex_lock(&thread_data->thread_mutex); + + while (1) { + a_request = get_request(thread_data->pool); + if (a_request) { /* got a request - handle it and free it */ + + handle_request(a_request, &thread_data->service); + free(a_request); + pthread_mutex_lock(&thread_data->pool->request_mutex); + thread_data->pool->working_thread_num--; + pthread_mutex_unlock(&thread_data->pool->request_mutex); + + pthread_cond_signal(&thread_data->pool->finish_request); + } else { + pthread_cond_wait(&thread_data->pool->got_request, &thread_data->thread_mutex); + } + } + return NULL; +} + +void db_permanode_tpool_wait(db_permanode_pool_t* pool) { + if (pool == NULL) { + ta_log_error("%s\n", ta_error_to_string(SC_TA_NULL)); + return; + } + pthread_mutex_lock(&(pool->request_mutex)); + while (1) { + if (pool->num_requests != 0 || pool->working_thread_num != 0) { + pthread_cond_wait(&(pool->finish_request), &(pool->request_mutex)); + } else { + break; + } + } + pthread_mutex_unlock(&(pool->request_mutex)); +} \ No newline at end of file diff --git a/storage/scylladb_permanode_thpool.h b/storage/scylladb_permanode_thpool.h new file mode 100644 index 00000000..98adb48d --- /dev/null +++ b/storage/scylladb_permanode_thpool.h @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2020 BiiLabs Co., Ltd. and Contributors + * All Rights Reserved. + * This is free software; you can redistribute it and/or modify it under the + * terms of the MIT license. A copy of the license can be found in the file + * "LICENSE" at the root of this distribution. + */ +#ifndef STORAGE_SCYLLADB_PERMANODE_THPOOL_H_ +#define STORAGE_SCYLLADB_PERMANODE_THPOOL_H_ +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include "scylladb_permanode.h" + +/** + * @file storage/scylladb_permanode_thpool.h + * @brief Thread pool implematation for inserting data into permanode + */ + +struct request { + tryte_t hash[NUM_TRYTES_HASH]; /* number of the request */ + tryte_t trytes[NUM_TRYTES_SERIALIZED_TRANSACTION]; + struct request* next; /* pointer to next request, NULL if none. */ +}; + +typedef struct { + pthread_mutex_t request_mutex; + pthread_cond_t got_request; + pthread_cond_t finish_request; + int num_requests; /* number of pending requests, initially none */ + int working_thread_num; + struct request* requests; /* head of linked list of requests. */ + struct request* last_request; /* pointer to last request. */ + +} db_permanode_pool_t; + +typedef struct { + pthread_mutex_t thread_mutex; + db_permanode_pool_t* pool; + db_client_service_t service; +} db_worker_thread_t; + +/** + * @brief Initialize request, pthread mutex, pthread cond in threadpool struct + * + * @param[in] poll pointer to db_permanode_pool_t + * + * - SC_OK on success + * - non-zero on error + */ +status_t db_permanode_thpool_init(db_permanode_pool_t* pool); + +/** + * @brief Initialize work thread data + * + * @param[in] thread_data target worker thread struct + * @param[in] pool connected thread pool + * @param[in] host ScyllaDB host name + * + * - SC_OK on success + * - non-zero on error + */ +status_t db_permanode_thpool_init_worker(db_worker_thread_t* thread_data, db_permanode_pool_t* pool, char* host); + +/** + * @brief Add request into permanode request threadpoll + * + * @param[in] hash input transaction hash + * @param[in] tryte input transaction trytes + * @param[in] pool connected thread pool + * + * - SC_OK on success + * - non-zero on error + */ +status_t db_permanode_thpool_add(const tryte_t* hash, const tryte_t* trytes, db_permanode_pool_t* pool); + +/** + * @brief infinite loop of requests handling + * + * Loop forever, if there are requests to handle, take the first and handle it. + * Then wait on the given condition variable, and when it is signaled, re-do the loop. + * + * @param[in] data pointer to db_permanode_pool_t + * + * @return NULL + */ +void* db_permanode_worker_handler(void* data); + +/** + * @brief Wait until the pool is empty and all workers are idle + * + * @param[in] data pointer to db_permanode_pool_t + * + * @return NULL + */ +void db_permanode_tpool_wait(db_permanode_pool_t* pool); + +#ifdef __cplusplus +} +#endif + +#endif // STORAGE_SCYLLADB_PERMANODE_THPOOL_H_ diff --git a/storage/ta_storage.h b/storage/ta_storage.h index 93254bdc..4687aa7e 100644 --- a/storage/ta_storage.h +++ b/storage/ta_storage.h @@ -10,10 +10,9 @@ #ifdef __cplusplus extern "C" { #endif - #include "scylladb_identity.h" #include "scylladb_permanode.h" - +#include "scylladb_permanode_thpool.h" /** * @file storage/ta_storage.h * @brief The high level header for TA storage driver.