From a45c848715659f52f2f3ec0fb625c207c06db6b4 Mon Sep 17 00:00:00 2001 From: delli Date: Wed, 20 Jan 2021 12:53:25 +0800 Subject: [PATCH 01/25] Update marian-backend --- .../triton-aml/marian_backend/src/marian.cc | 23 +++++++++++++++---- .../triton-aml/marian_backend/src/marian.h | 5 ++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/contrib/triton-aml/marian_backend/src/marian.cc b/contrib/triton-aml/marian_backend/src/marian.cc index a6c078bea..50f51d62e 100644 --- a/contrib/triton-aml/marian_backend/src/marian.cc +++ b/contrib/triton-aml/marian_backend/src/marian.cc @@ -118,7 +118,6 @@ ModelState::SetMarianConfigPath() // Set the Marian config path. std::string config_path("/var/azureml-app/"); config_path.append(std::getenv("AZUREML_MODEL_DIR")); - config_path.append("/nlxseq2seq/triton/nlxseq2seq/1/data/model/"); config_path.append(config_filepath_str); marian_config_path_ = config_path; @@ -199,6 +198,17 @@ ModelInstanceState::ModelInstanceState( extern "C" { +void +handler(int sig) { + void* array[30]; + size_t size; + + size = backtrace(array, 30); + + fprintf(stderr, "Error: signal %d, Exception info:\n", sig); + backtrace_symbols_fd(array, size, STDERR_FILENO); +} + TRITONSERVER_Error* TRITONBACKEND_ModelInitialize(TRITONBACKEND_Model* model) { @@ -209,6 +219,9 @@ TRITONBACKEND_ModelInitialize(TRITONBACKEND_Model* model) TRITONBACKEND_ModelSetState(model, reinterpret_cast(model_state)) ); + signal(SIGSEGV, handler); + signal(SIGABRT, handler); + return nullptr; // success } @@ -308,7 +321,6 @@ TRITONBACKEND_ModelInstanceExecute( std::vector request_input; std::vector request_batch_size; - std::vector inputs; std::string input_strings; // Create a single response object for each request. If something @@ -389,14 +401,13 @@ TRITONBACKEND_ModelInstanceExecute( } content_buffer.insert( content_buffer.end(), reinterpret_cast(input_buffer) + 4, - reinterpret_cast(input_buffer) + buffer_byte_size - 4 + reinterpret_cast(input_buffer) + buffer_byte_size ); } std::string s(content_buffer.begin(), content_buffer.end()); int count = std::count(s.begin(), s.end(), '\n'); request_batch_size.push_back(count + 1); - inputs.push_back(s); content_buffer.clear(); if (input_strings.empty()) { @@ -439,6 +450,10 @@ TRITONBACKEND_ModelInstanceExecute( // Move to next output content. if (p != nullptr) { pos = p + 1; + } else { + // Break if there no left output content, even though batch_size > 0, + // '\n' at the end may be processed by Marian. + break; } batch_size--; } diff --git a/contrib/triton-aml/marian_backend/src/marian.h b/contrib/triton-aml/marian_backend/src/marian.h index 0cd389b70..ccd3eeb9c 100644 --- a/contrib/triton-aml/marian_backend/src/marian.h +++ b/contrib/triton-aml/marian_backend/src/marian.h @@ -1,4 +1,9 @@ #pragma once +#include +#include +#include +#include +#include #ifdef _WIN32 #define DLLEXPORT extern "C" __declspec(dllexport) From 67dfe27bb3c6bd1e1d637e1d544fb5417156a9a4 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Wed, 20 Jan 2021 17:41:54 -0800 Subject: [PATCH 02/25] Adds new model task to create tasks with callbacks --- src/models/model_task.h | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/models/model_task.h b/src/models/model_task.h index 96dfadd0c..d65515d2b 100644 --- a/src/models/model_task.h +++ b/src/models/model_task.h @@ -1,7 +1,7 @@ #pragma once #include - +#include namespace marian { struct ModelTask { @@ -9,8 +9,13 @@ struct ModelTask { virtual void run() = 0; }; +struct ModelCallbackTask { + virtual ~ModelCallbackTask() {} + virtual void run(std::function) = 0; +}; + struct ModelServiceTask { virtual ~ModelServiceTask() {} - virtual std::string run(const std::string&) = 0; + virtual std::string run(const std::string&, std::function) = 0; }; } // namespace marian From 6a2c1a6661b1f7e2a5c3245ee5c5157826ebc9e7 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Wed, 20 Jan 2021 17:42:40 -0800 Subject: [PATCH 03/25] Adds support for beam search to take in a callback to run on finished sentences during translation --- src/translator/beam_search.cpp | 17 +++++++++++++++-- src/translator/beam_search.h | 8 ++++++-- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/translator/beam_search.cpp b/src/translator/beam_search.cpp index 9335c55bd..ffe6daf15 100755 --- a/src/translator/beam_search.cpp +++ b/src/translator/beam_search.cpp @@ -1,4 +1,5 @@ #include "translator/beam_search.h" +#include #include "data/factored_vocab.h" #include "translator/helpers.h" @@ -248,7 +249,8 @@ Beams BeamSearch::purgeBeams(const Beams& beams, /*in/out=*/std::vector graph, Ptr batch) { +Histories BeamSearch::search(Ptr graph, Ptr batch, std::function callback) { + const bool nbest = options_->get("n-best"); auto factoredVocab = trgVocab_->tryAs(); size_t numFactorGroups = factoredVocab ? factoredVocab->getNumGroups() : 1; if (numFactorGroups == 1) // if no factors then we didn't need this object in the first place @@ -500,7 +502,18 @@ Histories BeamSearch::search(Ptr graph, Ptr if (histories[batchIdx]->size() >= options_->get("max-length-factor") * batch->front()->batchWidth()) maxLengthReached = true; histories[batchIdx]->add(beams[batchIdx], trgEosId, purgedNewBeams[batchIdx].empty() || maxLengthReached); - } + + // If this is the last beam and translation and the function passed in has a callable target, run the callback function on + // the translated sentence. + if (callback != nullptr && (purgedNewBeams[batchIdx].empty() || maxLengthReached)) { + std::stringstream best1; + std::stringstream bestn; + printer_->print(histories[batchIdx], best1, bestn); + std::string result = nbest ? bestn.str() : best1.str(); + callback(histories[batchIdx]->getLineNum(), result); + } + + } } if (maxLengthReached) // early exit if max length limit was reached break; diff --git a/src/translator/beam_search.h b/src/translator/beam_search.h index eabf7b5d4..511606086 100644 --- a/src/translator/beam_search.h +++ b/src/translator/beam_search.h @@ -2,7 +2,9 @@ #include "marian.h" #include "translator/history.h" +#include "translator/output_printer.h" #include "translator/scorers.h" +#include namespace marian { @@ -12,13 +14,15 @@ class BeamSearch { std::vector> scorers_; size_t beamSize_; Ptr trgVocab_; + Ptr printer_; const float INVALID_PATH_SCORE = std::numeric_limits::lowest(); // @TODO: observe this closely const bool PURGE_BATCH = true; // @TODO: diagnostic, to-be-removed once confirmed there are no issues. public: BeamSearch(Ptr options, const std::vector>& scorers, const Ptr trgVocab) - : options_(options), scorers_(scorers), beamSize_(options_->get("beam-size")), trgVocab_(trgVocab) + : options_(options), scorers_(scorers), beamSize_(options_->get("beam-size")), trgVocab_(trgVocab), + printer_(New(options_, trgVocab_)) {} // combine new expandedPathScores and previous beams into new set of beams @@ -45,7 +49,7 @@ class BeamSearch { Beams purgeBeams(const Beams& beams, /*in/out=*/std::vector& batchIdxMap); // main decoding function - Histories search(Ptr graph, Ptr batch); + Histories search(Ptr graph, Ptr batch, std::function callback = nullptr); }; } // namespace marian From ee7344630ac33f3cda9b06f8e30f373db2f420e8 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Wed, 20 Jan 2021 17:43:08 -0800 Subject: [PATCH 04/25] Adds support for translators to use callbacks within beamsearch --- src/translator/translator.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/translator/translator.h b/src/translator/translator.h index 15eb98702..3882915e3 100755 --- a/src/translator/translator.h +++ b/src/translator/translator.h @@ -27,7 +27,7 @@ namespace marian { template -class Translate : public ModelTask { +class Translate : public ModelCallbackTask { private: Ptr options_; std::vector> graphs_; @@ -124,7 +124,7 @@ class Translate : public ModelTask { } } - void run() override { + void run(std::function callback = nullptr) override { data::BatchGenerator bg(corpus_, options_); ThreadPool threadPool(numDevices_, numDevices_); @@ -149,7 +149,7 @@ class Translate : public ModelTask { } auto search = New(options_, scorers, trgVocab_); - auto histories = search->search(graph, batch); + auto histories = search->search(graph, batch, callback); for(auto history : histories) { std::stringstream best1; @@ -246,7 +246,7 @@ class TranslateService : public ModelServiceTask { } } - std::string run(const std::string& input) override { + std::string run(const std::string& input, std::function callback = nullptr) override { // split tab-separated input into fields if necessary auto inputs = options_->get("tsv", false) ? convertTsvToLists(input, options_->get("tsv-fields", 1)) @@ -274,7 +274,7 @@ class TranslateService : public ModelServiceTask { } auto search = New(options_, scorers, trgVocab_); - auto histories = search->search(graph, batch); + auto histories = search->search(graph, batch, callback); for(auto history : histories) { std::stringstream best1; From 0772459eb0db2c3830618d059df1bdee4ddef677 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Wed, 20 Jan 2021 17:43:58 -0800 Subject: [PATCH 05/25] Adds new file containing callbacks for debugging and timing stubs. EOD. Will complete tomorrow. --- src/translator/translator_callbacks.h | 51 +++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 src/translator/translator_callbacks.h diff --git a/src/translator/translator_callbacks.h b/src/translator/translator_callbacks.h new file mode 100644 index 000000000..f760f9c79 --- /dev/null +++ b/src/translator/translator_callbacks.h @@ -0,0 +1,51 @@ +#include +#include +#include +#include "common/timer.h" +#include +#include +#include + +struct WriteInBatchOrder { + std::ostream& os_; + + + explicit WriteInBatchOrder(std::ostream& os) : os_(os) {} + + void operator()(const int sentenceId, const std::string& sentence) { + + } + + void writeResult() { + + } +}; + +struct TimeSentenceLatencies { + marian::timer::Timer timer_; + std::mutex mutex_; + std::vector times_; + std::vector sentences_; + bool trackSentences_; + + explicit TimeSentenceLatencies(bool trackSentences) : trackSentences_(trackSentences) {} + + void startTimingBatch() { + timer_.start(); + } + + void operator()(const int sentenceId, const std::string& sentence) { + std::lock_guard lock(mutex_); + sentences_.push_back(sentence); + times_.push_back(timer_.elapsed()); + } + + void getTimeStatistics() { + // Get median, average and some latency percentiles + } + + std::vector getAllTranslatedSentences() { + return sentences_; + } + +}; \ No newline at end of file From f6794db38072fde64611d73e0ef228e4a6a8afe0 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Thu, 21 Jan 2021 12:03:12 -0800 Subject: [PATCH 06/25] Add timing functor --- src/translator/translator_callbacks.h | 62 +++++++++++++++++++-------- 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/src/translator/translator_callbacks.h b/src/translator/translator_callbacks.h index f760f9c79..3903b86bd 100644 --- a/src/translator/translator_callbacks.h +++ b/src/translator/translator_callbacks.h @@ -1,34 +1,26 @@ +#pragma once + +#include #include +#include #include #include #include "common/timer.h" #include #include #include - -struct WriteInBatchOrder { - std::ostream& os_; - - - explicit WriteInBatchOrder(std::ostream& os) : os_(os) {} - - void operator()(const int sentenceId, const std::string& sentence) { - - } - - void writeResult() { - - } -}; +#include +#include "common/logging.h" struct TimeSentenceLatencies { marian::timer::Timer timer_; std::mutex mutex_; + std::vector sentenceIds_; std::vector times_; std::vector sentences_; - bool trackSentences_; + std::ostream& os_; - explicit TimeSentenceLatencies(bool trackSentences) : trackSentences_(trackSentences) {} + explicit TimeSentenceLatencies(std:: ostream& os) : os_(os) {} void startTimingBatch() { timer_.start(); @@ -36,16 +28,48 @@ struct TimeSentenceLatencies { void operator()(const int sentenceId, const std::string& sentence) { std::lock_guard lock(mutex_); + sentenceIds_.push_back(sentenceId); sentences_.push_back(sentence); times_.push_back(timer_.elapsed()); } - void getTimeStatistics() { + void getTimeStatistics() const { // Get median, average and some latency percentiles + std::vector sortedTimes(times_); + std::sort(sortedTimes.begin(), sortedTimes.end()); + double sum = std::accumulate(sortedTimes.begin(), sortedTimes.end(), 0.0); + LOG(info, "Average is ", sum / sortedTimes.size()); + LOG(info, "50th percentile ", getPercentile(sortedTimes, 0.5)); + LOG(info, "90th percentile ", getPercentile(sortedTimes, 0.9)); + LOG(info, "95th percentile ", getPercentile(sortedTimes, 0.95)); + LOG(info, "99th percentile ", getPercentile(sortedTimes, 0.99)); + LOG(info, "99.9th percentile ", getPercentile(sortedTimes, 0.999)); } - std::vector getAllTranslatedSentences() { + const std::vector& getAllTranslatedSentences() const { return sentences_; } + void writeInBatchOrder() { + std::vector ids(sentenceIds_); + std::sort(ids.begin(), ids.end()); + for (const auto id : ids) { + os_ << sentences_[id] << "\n"; + } + } + +private: + double getPercentile(const std::vector sortedTimes, double percentile) const { + ABORT_IF(sortedTimes.empty(), "No times available"); + const int numTimes = (int) sortedTimes.size(); + const double floatRank = percentile * (numTimes + 1); + const int zeroIndexRank = std::max(0, ((int) floatRank) - 1); + if (std::floor(floatRank) == floatRank) { + return sortedTimes[zeroIndexRank]; + } + + const double frac = floatRank - std::floor(floatRank); + const int nextElementIndex = std::min(zeroIndexRank + 1, numTimes - 1); + return sortedTimes[zeroIndexRank] + frac * (sortedTimes[nextElementIndex] - sortedTimes[zeroIndexRank]); + } }; \ No newline at end of file From b1b775729c2d876cd944ef7b1fccca4f7efbdd51 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Thu, 21 Jan 2021 17:54:44 -0800 Subject: [PATCH 07/25] Timing for individual sentences working with model. Need to check correctness --- src/translator/translator.h | 26 ++++++- src/translator/translator_callbacks.h | 99 +++++++++++++++++++-------- 2 files changed, 94 insertions(+), 31 deletions(-) diff --git a/src/translator/translator.h b/src/translator/translator.h index 3882915e3..092873be0 100755 --- a/src/translator/translator.h +++ b/src/translator/translator.h @@ -12,6 +12,7 @@ #include "translator/history.h" #include "translator/output_collector.h" #include "translator/output_printer.h" +#include "translator/translator_callbacks.h" #include "models/model_task.h" #include "translator/scorers.h" @@ -128,6 +129,8 @@ class Translate : public ModelCallbackTask { data::BatchGenerator bg(corpus_, options_); ThreadPool threadPool(numDevices_, numDevices_); + TimeSentenceLatencies latencyTimer(numDevices_); + std::mutex mutex; size_t batchId = 0; auto collector = New(options_->get("output")); @@ -139,7 +142,14 @@ class Translate : public ModelCallbackTask { bool doNbest = options_->get("n-best"); for(auto batch : bg) { - auto task = [=](size_t id) { + auto task = [=, &latencyTimer, &mutex](size_t id) { + thread_local int tid = -1; + thread_local bool first = true; + if (tid == -1) { + tid = latencyTimer.getThreadId(mutex); + } + + latencyTimer.resetThreadTimer(tid); thread_local Ptr graph; thread_local std::vector> scorers; @@ -149,7 +159,15 @@ class Translate : public ModelCallbackTask { } auto search = New(options_, scorers, trgVocab_); - auto histories = search->search(graph, batch, callback); + // warm + if(first) { + search->search(graph, batch, nullptr); + first = false; + latencyTimer.resetThreadTimer(tid); + } + + marian::timer::Timer timer; + auto histories = search->search(graph, batch, latencyTimer); for(auto history : histories) { std::stringstream best1; @@ -161,6 +179,7 @@ class Translate : public ModelCallbackTask { doNbest); } + std::cout << "Batch time " << timer.elapsed() << std::endl; // progress heartbeat for MS-internal Philly compute cluster // otherwise this job may be killed prematurely if no log for 4 hrs @@ -176,6 +195,9 @@ class Translate : public ModelCallbackTask { threadPool.enqueue(task, batchId++); } + + threadPool.join_all(); + latencyTimer.getTimeStatistics(); } }; diff --git a/src/translator/translator_callbacks.h b/src/translator/translator_callbacks.h index 3903b86bd..bd10e75f2 100644 --- a/src/translator/translator_callbacks.h +++ b/src/translator/translator_callbacks.h @@ -11,54 +11,95 @@ #include #include #include "common/logging.h" +#include +#include struct TimeSentenceLatencies { - marian::timer::Timer timer_; - std::mutex mutex_; - std::vector sentenceIds_; - std::vector times_; - std::vector sentences_; std::ostream& os_; + int numThreads_; + volatile int currentIndex_; + int batchSize_; - explicit TimeSentenceLatencies(std:: ostream& os) : os_(os) {} + // Things for thread safety with Marian + std::map threadIndex_; - void startTimingBatch() { - timer_.start(); + + // Data + std::shared_ptr> timers_; + std::shared_ptr>> sentenceIds_; + std::shared_ptr>> times_; + std::shared_ptr>> sentences_; + + TimeSentenceLatencies(std:: ostream& os, int numThreads) : os_(os), numThreads_(numThreads), currentIndex_(0) { + timers_ = std::make_shared>(numThreads); + sentenceIds_ = std::make_shared>>(numThreads); + times_ = std::make_shared>>(numThreads); + sentences_ = std::make_shared>>(numThreads); + } + + explicit TimeSentenceLatencies(int numThreads) : TimeSentenceLatencies(std::cout, numThreads) {} + + int getThreadId(std::mutex& mutex) { + int tid = 0; + std::lock_guard lock(mutex); + if (threadIndex_.count(std::this_thread::get_id()) == 0) { + threadIndex_[std::this_thread::get_id()] = currentIndex_; + tid = currentIndex_; + ++currentIndex_; + } else { + tid = threadIndex_.at(std::this_thread::get_id()); + } + return tid; + } + + void resetThreadTimer(const int tid) { + timers_->at(tid).start(); } void operator()(const int sentenceId, const std::string& sentence) { - std::lock_guard lock(mutex_); - sentenceIds_.push_back(sentenceId); - sentences_.push_back(sentence); - times_.push_back(timer_.elapsed()); + int tid = threadIndex_.at(std::this_thread::get_id()); + + sentenceIds_->data()[tid].push_back(sentenceId); + sentences_->data()[tid].push_back(sentence); + times_->data()[tid].push_back(timers_->data()[tid].elapsed()); } void getTimeStatistics() const { - // Get median, average and some latency percentiles - std::vector sortedTimes(times_); + // Get average and some latency percentiles + std::vector sortedTimes; + for (size_t i = 0; i < times_->size(); ++i ) { + sortedTimes.insert(sortedTimes.end(), times_->at(i).begin(), times_->at(i).end()); + } std::sort(sortedTimes.begin(), sortedTimes.end()); double sum = std::accumulate(sortedTimes.begin(), sortedTimes.end(), 0.0); - LOG(info, "Average is ", sum / sortedTimes.size()); - LOG(info, "50th percentile ", getPercentile(sortedTimes, 0.5)); - LOG(info, "90th percentile ", getPercentile(sortedTimes, 0.9)); - LOG(info, "95th percentile ", getPercentile(sortedTimes, 0.95)); - LOG(info, "99th percentile ", getPercentile(sortedTimes, 0.99)); - LOG(info, "99.9th percentile ", getPercentile(sortedTimes, 0.999)); - } - - const std::vector& getAllTranslatedSentences() const { - return sentences_; + std::cout << "Average is " << sum / sortedTimes.size() << std::endl; + std::cout << "50th percentile " << getPercentile(sortedTimes, 0.5) << std::endl; + std::cout << "90th percentile " << getPercentile(sortedTimes, 0.90) << std::endl; + std::cout << "95th percentile " << getPercentile(sortedTimes, 0.95) << std::endl; + std::cout << "99th percentile " << getPercentile(sortedTimes, 0.99) << std::endl; + std::cout << "99.9th percentile " << getPercentile(sortedTimes, 0.999) << std::endl; } void writeInBatchOrder() { - std::vector ids(sentenceIds_); - std::sort(ids.begin(), ids.end()); - for (const auto id : ids) { - os_ << sentences_[id] << "\n"; + // First, flatten the sentence ids and the sentences + std::vector ids; + std::vector sentences; + for (size_t i = 0; i < sentences_->size(); ++i) { + ids.insert(ids.end(), sentenceIds_->at(i).begin(), sentenceIds_->at(i).end()); + sentences.insert(sentences.end(), sentences_->at(i).begin(), sentences_->at(i).end()); + } + + // Get a vector of indices sorted by sentence ids. + std::vector indices(ids.size()); + std::iota(indices.begin(), indices.end(), 0); + std::sort(indices.begin(), indices.end(), [&](const int a, const int b) -> bool {return ids[a] < ids[b];}); + + // Use the sorted vector to write out the sentences in order + for (const auto& idx : indices) { + os_ << sentences[idx] << "\n"; } } -private: double getPercentile(const std::vector sortedTimes, double percentile) const { ABORT_IF(sortedTimes.empty(), "No times available"); const int numTimes = (int) sortedTimes.size(); From 18760041d1d5fccb09d2fa1c409e6a43337a8bd3 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Thu, 21 Jan 2021 21:12:35 -0800 Subject: [PATCH 08/25] Adds temporary timing code --- src/translator/translator.h | 3 +++ src/translator/translator_callbacks.h | 9 +++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/translator/translator.h b/src/translator/translator.h index 092873be0..86a43137a 100755 --- a/src/translator/translator.h +++ b/src/translator/translator.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include "data/batch_generator.h" @@ -198,6 +199,8 @@ class Translate : public ModelCallbackTask { threadPool.join_all(); latencyTimer.getTimeStatistics(); + std::ofstream os("callback.txt"); + latencyTimer.writeInBatchOrder(os); } }; diff --git a/src/translator/translator_callbacks.h b/src/translator/translator_callbacks.h index bd10e75f2..90c78a8ba 100644 --- a/src/translator/translator_callbacks.h +++ b/src/translator/translator_callbacks.h @@ -15,7 +15,6 @@ #include struct TimeSentenceLatencies { - std::ostream& os_; int numThreads_; volatile int currentIndex_; int batchSize_; @@ -30,15 +29,13 @@ struct TimeSentenceLatencies { std::shared_ptr>> times_; std::shared_ptr>> sentences_; - TimeSentenceLatencies(std:: ostream& os, int numThreads) : os_(os), numThreads_(numThreads), currentIndex_(0) { + explicit TimeSentenceLatencies(int numThreads) : numThreads_(numThreads), currentIndex_(0) { timers_ = std::make_shared>(numThreads); sentenceIds_ = std::make_shared>>(numThreads); times_ = std::make_shared>>(numThreads); sentences_ = std::make_shared>>(numThreads); } - explicit TimeSentenceLatencies(int numThreads) : TimeSentenceLatencies(std::cout, numThreads) {} - int getThreadId(std::mutex& mutex) { int tid = 0; std::lock_guard lock(mutex); @@ -80,7 +77,7 @@ struct TimeSentenceLatencies { std::cout << "99.9th percentile " << getPercentile(sortedTimes, 0.999) << std::endl; } - void writeInBatchOrder() { + void writeInBatchOrder(std::ostream& os) { // First, flatten the sentence ids and the sentences std::vector ids; std::vector sentences; @@ -96,7 +93,7 @@ struct TimeSentenceLatencies { // Use the sorted vector to write out the sentences in order for (const auto& idx : indices) { - os_ << sentences[idx] << "\n"; + os << sentences[idx] << "\n"; } } From 451721590fa97de09cbb0a1a5851f6b6203dc907 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Mon, 19 Apr 2021 12:24:18 -0700 Subject: [PATCH 09/25] Adds sync support into beam search and marian triton backend. The triton backend is not tested/compiled as of this commit. --- .../triton-aml/marian_backend/src/marian.cc | 374 +++++++++++++++++- .../triton-aml/marian_backend/src/marian.h | 7 + contrib/triton-aml/src/cmarian.cpp | 30 +- src/models/model_task.h | 3 +- src/translator/beam_search.cpp | 8 +- src/translator/beam_search.h | 6 +- src/translator/translator.h | 136 +++++++ 7 files changed, 556 insertions(+), 8 deletions(-) diff --git a/contrib/triton-aml/marian_backend/src/marian.cc b/contrib/triton-aml/marian_backend/src/marian.cc index 6cafd0401..73c74c394 100644 --- a/contrib/triton-aml/marian_backend/src/marian.cc +++ b/contrib/triton-aml/marian_backend/src/marian.cc @@ -283,8 +283,7 @@ TRITONBACKEND_ModelInstanceFinalize(TRITONBACKEND_ModelInstance* instance) return nullptr; // success } -TRITONSERVER_Error* -TRITONBACKEND_ModelInstanceExecute( +TRITONSERVER_Error* serveRequestsSync( TRITONBACKEND_ModelInstance* instance, TRITONBACKEND_Request** requests, const uint32_t request_count) { @@ -579,6 +578,377 @@ TRITONBACKEND_ModelInstanceExecute( return nullptr; // success } +// Again, this is gross but the exposed API is a C API. These states are needed +// to correctly process sentences asynchronously. They are updated when the async +// execute function is called. + +struct CallbackState { + // A vector of vectors containing requests that are partially completed. This vector + // is of length request_count. Each vector within has size request_batch_size. A + // request is complete when all the vectors for that request are not empty. + std::vector> partially_completed_requests; + + // A vector mapping each request to its orig batch. That is marianBatch_to_tritonRequest_map[b] gives + // the request that element b in the marian batch originated from. This handles the fact + // that some requests may be split into several sentences to Marian. + std::vector marianBatch_to_tritonRequest_map; + + // A vector mapping the marian batch index to the index of the request. + std::vector marianBatchIdx_to_requestBatchIdx_map; + + // 'responses' is initialized with the response objects below and + // if/when an error response is sent the corresponding entry in + // 'responses' is set to nullptr to indicate that that response has + // already been sent. + std::vector responses; + + // State to collect statistics about the sentence in the given batch. + uint64_t exec_start_ns; + + // Request inputs + std::vector request_input; + + // Requests to Triton + TRITONBACKEND_Request** requests; +}; + + + +void sendResponse(int bn, const char* result, void* userData) +{ + CallbackState* state = (CallbackState*) userData; + + // Use at to get bound checking when accessing the vector + int requestNumber = state->marianBatch_to_tritonRequest_map.at(bn); + int requestBatchIdx = state->marianBatchIdx_to_requestBatchIdx_map.at(bn); + size_t requestBatchSize = state->partially_completed_requests.at(requestNumber).size(); + + // For uniformity, I always assign the translated sentence to the partially completed requests array. + const std::vector& requestStaging = state->partially_completed_requests.at(requestNumber); + + if (!requestStaging.at(requestBatchIdx).empty()) { + GUARDED_RESPOND_IF_ERROR( + state->responses, requestNumber, + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_UNSUPPORTED, + "Staging this request will overwrite an existing sentence." + ) + ); + + LOG_MESSAGE( + TRITONSERVER_LOG_ERROR, + (std::string("request ") + std::to_string(requestNumber) + + ": failed to stage request as a sentence seems to exist in the staging area.") + .c_str() + ); + return; + } + requestStaging.at(requestBatchIdx) = result; + + // Now we check if any sentence in the batch of requests still remains to be processed. If so, + // return immediately since we have already stored the translated sentence in the staging area above. + for (const auto& sentence : requestStaging) { + if (sentence.empty()) { + return; + } + } + + // If here, we need to concat all the sentences in the staging area for the given request and immediately + // send a response to the user. + std::string concatedSentences; + for (int sen = 0; sen < (int)requestStaging.size(); ++sen) { + concatedSentences += sentence; + if (sen + 1 != (int)requestStaging.size()) { + concatedSentences += "\n"; + } + } + + std::cout << bn << " " << concatedSentences << std::endl; + + TRITONBACKEND_Input* input = state->request_input[r]; + const char* input_name; + TRITONSERVER_DataType input_datatype; + const int64_t* input_shape; + uint32_t input_dims_count; + uint64_t input_byte_size; + uint32_t input_buffer_count; + + GUARDED_RESPOND_IF_ERROR( + state->responses, requestNumber, + TRITONBACKEND_InputProperties( + input, &input_name, &input_datatype, &input_shape, + &input_dims_count, &input_byte_size, &input_buffer_count + ) + ); + + if (state->responses[requestNumber] == nullptr) { + LOG_MESSAGE( + TRITONSERVER_LOG_ERROR, + (std::string("request ") + std::to_string(requestNumber) + + ": failed to read input properties, error response sent") + .c_str() + ); + return; + } + + TRITONBACKEND_Request* request = state->requests[requestNumber]; + const char* requested_output_name = nullptr; + GUARDED_RESPOND_IF_ERROR( + state->responses, requestNumber, + TRITONBACKEND_RequestOutputName( + request, 0 /* index */, &requested_output_name + ) + ); + + // Create an output tensor in the response, + // input and output have same datatype and shape... + TRITONBACKEND_Response* response = state->responses[requestNumber]; + TRITONBACKEND_Output* output; + GUARDED_RESPOND_IF_ERROR( + state->responses, requestNumber, + TRITONBACKEND_ResponseOutput( + response, &output, requested_output_name, input_datatype, + input_shape, input_dims_count + ) + ); + + // Get the output buffer. We request a buffer in CPU memory + // but we have to handle any returned type. If we get back + // a buffer in GPU memory we just fail the request. + void* output_buffer; + int c_str_size = (int)concatedSentences.size() + 1; + TRITONSERVER_MemoryType output_memory_type = TRITONSERVER_MEMORY_CPU; + int64_t output_memory_type_id = 0; + GUARDED_RESPOND_IF_ERROR( + state->responses, requestNumber, + TRITONBACKEND_OutputBuffer( + output, &output_buffer, c_str_size + 4, + &output_memory_type, &output_memory_type_id + ) + ); + + if ((responses[requestNumber] == nullptr) || + (output_memory_type == TRITONSERVER_MEMORY_GPU)) { + GUARDED_RESPOND_IF_ERROR( + state->responses, requestNumber, + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_UNSUPPORTED, + "failed to create output buffer in CPU memory" + ) + ); + LOG_MESSAGE( + TRITONSERVER_LOG_ERROR, + (std::string("request ") + std::to_string(requestNumber) + + ": failed to create output buffer in CPU memory, error request sent") + .c_str() + ); + return; + } + + // Copy Marian result -> output. + memcpy(output_buffer, reinterpret_cast(&c_str_size), 4); + memcpy(reinterpret_cast(output_buffer) + 4, concatedSentences.c_str(), c_str_size); + + // Send the response. + LOG_IF_ERROR( + TRITONBACKEND_ResponseSend( + responses[requestNumber], TRITONSERVER_RESPONSE_COMPLETE_FINAL, + nullptr /* success */), + "failed sending response" + ); + + // Report statistics for the successful request. + uint64_t request_exec_end_ns = 0; + SET_TIMESTAMP(request_exec_end_ns); + LOG_IF_ERROR( + TRITONBACKEND_ModelInstanceReportStatistics( + instance_state->TritonModelInstance(), request, true /* success */, + state->exec_start_ns, state->exec_start_ns, request_exec_end_ns, request_exec_end_ns), + "failed reporting request statistics" + ); + + // Release each request as soon as we sent the corresponding response. + LOG_IF_ERROR( + TRITONBACKEND_RequestRelease(request, TRITONSERVER_REQUEST_RELEASE_ALL), + "failed releasing request" + ); +} + +TRITONSERVER_Error* serveRequestsAsync( + TRITONBACKEND_ModelInstance* instance, TRITONBACKEND_Request** requests, + const uint32_t request_count) +{ + LOG_MESSAGE( + TRITONSERVER_LOG_INFO, + ("Marian model instance executing " + std::to_string(request_count) + + " requests").c_str() + ); + + CallbackState state; + state.requests = requests; + state.responses.reserve(request_count); + + // Create a single response object for each request. If something + // goes wrong when attempting to create the response objects just + // fail all of the requests by returning an error. + for (uint32_t r = 0; r < request_count; ++r) { + TRITONBACKEND_Request* request = requests[r]; + + TRITONBACKEND_Response* response; + RETURN_IF_ERROR(TRITONBACKEND_ResponseNew(&response, request)); + state.responses.push_back(response); + } + + uint64_t total_batch_size = 0; + + // We will execute all the requests at the same time, and so there + // will be a single compute-start / compute-end time-range. + state.exec_start_ns = 0; + SET_TIMESTAMP(state.exec_start_ns); + + // It is assumed that this is always of size request count + state.partially_completed_requests.resize(request_count); + + std::string input_strings; + // Create a single response object for each request. If something + // goes wrong when attempting to create the response objects just + // fail all of the requests by returning an error. + for (uint32_t r = 0; r < request_count; ++r) { + TRITONBACKEND_Request* request = requests[r]; + + const char* input_name; + GUARDED_RESPOND_IF_ERROR( + state.responses, r, + TRITONBACKEND_RequestInputName(request, 0 /* index */, &input_name) + ); + + TRITONBACKEND_Input* input = nullptr; + GUARDED_RESPOND_IF_ERROR( + state.responses, r, + TRITONBACKEND_RequestInput(request, input_name, &input) + ); + state.request_input.push_back(input); + + // If an error response was sent while getting the input name + // or input then display an error message and move on + // to next request. + if (state.responses[r] == nullptr) { + LOG_MESSAGE( + TRITONSERVER_LOG_ERROR, + (std::string("request ") + std::to_string(r) + + ": failed to read input or requested output name, error response sent") + .c_str() + ); + continue; + } + + // Get input buffer count. + uint32_t input_buffer_count; + GUARDED_RESPOND_IF_ERROR( + state.responses, r, + TRITONBACKEND_InputProperties( + input, nullptr /* input_name */, nullptr, nullptr, + nullptr, nullptr, &input_buffer_count + ) + ); + if (state.responses[r] == nullptr) { + LOG_MESSAGE( + TRITONSERVER_LOG_ERROR, + (std::string("request ") + std::to_string(r) + + ": failed to read input properties, error response sent") + .c_str() + ); + continue; + } + + // Compose all the requests input to make a batch request, + // record the sentences count of each request for further process. + std::vector content_buffer; + for (uint32_t b = 0; b < input_buffer_count; ++b) { + const void* input_buffer = nullptr; + uint64_t buffer_byte_size = 0; + TRITONSERVER_MemoryType input_memory_type = TRITONSERVER_MEMORY_CPU; + int64_t input_memory_type_id = 0; + GUARDED_RESPOND_IF_ERROR( + state.responses, r, + TRITONBACKEND_InputBuffer( + input, b, &input_buffer, &buffer_byte_size, + &input_memory_type, &input_memory_type_id + ) + ); + if ((state.responses[r] == nullptr) || + (input_memory_type == TRITONSERVER_MEMORY_GPU)) { + GUARDED_RESPOND_IF_ERROR( + state.responses, r, + TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_UNSUPPORTED, + "failed to get input buffer in CPU memory" + ) + ); + } + content_buffer.insert( + content_buffer.end(), reinterpret_cast(input_buffer) + 4, + reinterpret_cast(input_buffer) + buffer_byte_size + ); + } + + std::string s(content_buffer.begin(), content_buffer.end()); + int count = std::count(s.begin(), s.end(), '\n'); + content_buffer.clear(); + + // Ensure each request vector has enough space for its batch + state.partially_completed_requests[r].resize(count + 1); + + // Since a request may have multiple sentences, update the map with request_batch_size + // duplicates of the request number. This gives us a fast way to find the request number + // given the batch offset. + // Additionally, we map the marian batch offset to the request batch offset. + for (int request_batch_size = 0; request_batch_size < (count + 1); ++request_batch_size) { + state.marianBatch_to_tritonRequest_map.push_back(r); + state.marianBatchIdx_to_requestBatchIdx_map.push_back(request_batch_size); + } + + if (input_strings.empty()) { + input_strings = s; + } else { + input_strings.append("\n"); + input_strings.append(s); + } + + total_batch_size += (count + 1); + } + + // Operate on the entire batch of requests for improved performance. + void* vstate; + RETURN_IF_ERROR(TRITONBACKEND_ModelInstanceState(instance, &vstate)); + ModelInstanceState* instance_state = + reinterpret_cast(vstate); + void* marian = instance_state->Marian(); + + translate_async(marian, const_cast(input_strings.c_str()), sendResponse, (void*)&state); + + // Report statistics for the entire batch of requests. + uint64_t exec_end_ns = 0; + SET_TIMESTAMP(exec_end_ns); + LOG_IF_ERROR( + TRITONBACKEND_ModelInstanceReportBatchStatistics( + instance_state->TritonModelInstance(), total_batch_size, + state.exec_start_ns, state.exec_start_ns, exec_end_ns, exec_end_ns), + "failed reporting batch request statistics" + ); + + return nullptr; // success +} + +TRITONSERVER_Error* +TRITONBACKEND_ModelInstanceExecute( + TRITONBACKEND_ModelInstance* instance, TRITONBACKEND_Request** requests, + const uint32_t request_count) +{ + return serveRequestsAsync(instance, requests, request_count) + // return serveRequestsSync(instance, requests, request_count); +} + } // extern "C" }}} // namespace triton::backend::marian diff --git a/contrib/triton-aml/marian_backend/src/marian.h b/contrib/triton-aml/marian_backend/src/marian.h index ccd3eeb9c..96277fa61 100644 --- a/contrib/triton-aml/marian_backend/src/marian.h +++ b/contrib/triton-aml/marian_backend/src/marian.h @@ -11,6 +11,13 @@ #define DLLEXPORT extern "C" #endif +// This is gross but necessary since exporting a C interface. The callback function +// takes an integer which corresponds to the sentence id in the batch along with +// a char* which is the translated version of the sentence at the corresponding batch id. +// The callback function is free to do whatever it pleases with this, including immediately +// calling send response. +DLLEXPORT void translate_async(void* marian, char* sent, void(*callback)(int, const char*, void*), void* userData); + DLLEXPORT void* init(char* path, int device_num); DLLEXPORT char* translate(void* marian, char* sent); DLLEXPORT void free_result(char* to_free); diff --git a/contrib/triton-aml/src/cmarian.cpp b/contrib/triton-aml/src/cmarian.cpp index edec5f8a3..bbee07c3e 100644 --- a/contrib/triton-aml/src/cmarian.cpp +++ b/contrib/triton-aml/src/cmarian.cpp @@ -15,12 +15,14 @@ #endif using namespace marian; +typedef void (*callbackFunc)(int, const char*, void*); class CMarian { private: Ptr options_; char* configPath_; Ptr> task_; + Ptr> taskAsync_; public: CMarian(char* configPath, int device_num) : configPath_(configPath) { @@ -37,8 +39,6 @@ class CMarian { strcpy(argv[4], std::to_string(device_num).c_str()); options_ = marian::parseOptions(argc, argv, cli::mode::translation, true); - task_ = New>(options_); - delete[] argv[0]; delete[] argv[1]; delete[] argv[3]; @@ -51,12 +51,33 @@ class CMarian { * @return A string delimited by ||| with newlines separating beams. */ char* translate(char* sent) { + // Lazy initialize so we can use either the sync or async version with this repo + if(!task_) { + task_ = New>(options_); + } std::string strSent(sent); auto outputText = task_->run(strSent); char* ret = (char*) malloc(outputText.length() + 1); snprintf(ret, outputText.length() + 1, "%s", outputText.c_str()); return ret; } + + /** + * @brief Exposes Marian translation capabilities based on the loaded YAML config associated with this class. + * @param sent The sentence to run inference on. + * @param callback the function to run on the translated sentence. It takes the batchID of the source sentence + * along with the translated sentence and processes those. + * @param userData A pointer to any state the user needs to use in their callback + */ + void translate_async(char* sent, callbackFunc callback, void* userData) { + // Lazy initialize so we can use either the sync or async version with this repo + if (!taskAsync_) { + taskAsync_ = New>(options_); + } + std::string strSent(sent); + taskAsync_->registerCallback(callback, userData); + taskAsync_->run(strSent); + } }; DLLEXPORT void* init(char* path, int device_num) { @@ -69,6 +90,11 @@ DLLEXPORT char* translate(void* marian, char* sent) { return m->translate(sent); } +DLLEXPORT void translate_async(void* marian, char* sent, callbackFunc callback, void* userData) { + CMarian* m = static_cast(marian); + m->translate_async(sent, callback, userData); +} + DLLEXPORT void free_result(char* to_free) { free(to_free); } diff --git a/src/models/model_task.h b/src/models/model_task.h index d65515d2b..fc9314d7b 100644 --- a/src/models/model_task.h +++ b/src/models/model_task.h @@ -11,7 +11,8 @@ struct ModelTask { struct ModelCallbackTask { virtual ~ModelCallbackTask() {} - virtual void run(std::function) = 0; + virtual void registerCallback(void (*)(int, const char*, void*), void*) = 0; + virtual void run(const std::string&) = 0; }; struct ModelServiceTask { diff --git a/src/translator/beam_search.cpp b/src/translator/beam_search.cpp index c138bc065..ce2720c4f 100644 --- a/src/translator/beam_search.cpp +++ b/src/translator/beam_search.cpp @@ -1,4 +1,5 @@ #include "translator/beam_search.h" +#include "tensors/tensor_allocator.h" #include #include "data/factored_vocab.h" @@ -249,7 +250,10 @@ Beams BeamSearch::purgeBeams(const Beams& beams, /*in/out=*/std::vector graph, Ptr batch, std::function callback) { +Histories BeamSearch::search(Ptr graph, Ptr batch, + void (*callback)(int, const char*, void*), + void* userData) { + const bool nbest = options_->get("n-best"); auto factoredVocab = trgVocab_->tryAs(); size_t numFactorGroups = factoredVocab ? factoredVocab->getNumGroups() : 1; @@ -516,7 +520,7 @@ Histories BeamSearch::search(Ptr graph, Ptr std::stringstream bestn; printer_->print(histories[batchIdx], best1, bestn); std::string result = nbest ? bestn.str() : best1.str(); - callback(histories[batchIdx]->getLineNum(), result); + callback(histories[batchIdx]->getLineNum(), result.c_str(), userData); } } diff --git a/src/translator/beam_search.h b/src/translator/beam_search.h index d05078fec..8d6c3637a 100644 --- a/src/translator/beam_search.h +++ b/src/translator/beam_search.h @@ -14,6 +14,7 @@ class BeamSearch { std::vector> scorers_; size_t beamSize_; Ptr trgVocab_; + Ptr allocator_; Ptr printer_; const float INVALID_PATH_SCORE; @@ -55,7 +56,10 @@ class BeamSearch { Beams purgeBeams(const Beams& beams, /*in/out=*/std::vector& batchIdxMap); // main decoding function - Histories search(Ptr graph, Ptr batch, std::function callback = nullptr); + Histories search(Ptr graph, + Ptr batch, + void (*callback)(int, const char*, void*) = nullptr, + void* userData = nullptr); }; } // namespace marian diff --git a/src/translator/translator.h b/src/translator/translator.h index fe01065b6..aadddf7fe 100644 --- a/src/translator/translator.h +++ b/src/translator/translator.h @@ -367,4 +367,140 @@ class TranslateService : public ModelServiceTask { return outputFields; } }; + +template +class TranslateServiceAsync : public ModelCallbackTask { +private: + Ptr options_; + std::vector> graphs_; + std::vector>> scorers_; + + std::vector> srcVocabs_; + Ptr trgVocab_; + Ptr shortlistGenerator_; + + size_t numDevices_; + + void (*callback_)(int, const char*, void*) = nullptr; + void* userData_ = nullptr; + +public: + virtual ~TranslateServiceAsync() {} + + TranslateServiceAsync(Ptr options) + : options_(New(options->clone())) { + // initialize vocabs + options_->set("inference", true); + options_->set("shuffle", "none"); + + auto vocabPaths = options_->get>("vocabs"); + std::vector maxVocabs = options_->get>("dim-vocabs"); + + for(size_t i = 0; i < vocabPaths.size() - 1; ++i) { + Ptr vocab = New(options_, i); + vocab->load(vocabPaths[i], maxVocabs[i]); + srcVocabs_.emplace_back(vocab); + } + + trgVocab_ = New(options_, vocabPaths.size() - 1); + trgVocab_->load(vocabPaths.back()); + + // load lexical shortlist + if(options_->hasAndNotEmpty("shortlist")) + shortlistGenerator_ = New( + options_, srcVocabs_.front(), trgVocab_, 0, 1, vocabPaths.front() == vocabPaths.back()); + + // get device IDs + auto devices = Config::getDevices(options_); + numDevices_ = devices.size(); + + // initialize scorers + for(auto device : devices) { + auto graph = New(true); + + auto precison = options_->get>("precision", {"float32"}); + graph->setDefaultElementType(typeFromString(precison[0])); // only use first type, used for parameter type in graph + graph->setDevice(device); + graph->reserveWorkspaceMB(options_->get("workspace")); + graphs_.push_back(graph); + + auto scorers = createScorers(options_); + for(auto scorer : scorers) { + scorer->init(graph); + if(shortlistGenerator_) + scorer->setShortlistGenerator(shortlistGenerator_); + } + scorers_.push_back(scorers); + } + } + + void registerCallback(void (*callback)(int, const char*, void*), void* userData) override { + callback_ = callback; + userData_ = userData; + } + + void run(const std::string& input) override { + // split tab-separated input into fields if necessary + auto inputs = options_->get("tsv", false) + ? convertTsvToLists(input, options_->get("tsv-fields", 1)) + : std::vector({input}); + auto corpus_ = New(inputs, srcVocabs_, options_); + data::BatchGenerator batchGenerator(corpus_, options_); + + auto collector = New(options_->get("quiet-translation", false)); + auto printer = New(options_, trgVocab_); + size_t batchId = 0; + + batchGenerator.prepare(); + + { + ThreadPool threadPool_(numDevices_, numDevices_); + + for(auto batch : batchGenerator) { + auto task = [=](size_t id) { + thread_local Ptr graph; + thread_local std::vector> scorers; + + if(!graph) { + graph = graphs_[id % numDevices_]; + scorers = scorers_[id % numDevices_]; + } + + auto search = New(options_, scorers, trgVocab_); + search->search(graph, batch, callback_); + }; + + threadPool_.enqueue(task, batchId); + batchId++; + } + // ThreadPool destructor called at end of scope and the destructor joins all threads. + } + } + +private: + // Converts a multi-line input with tab-separated source(s) and target sentences into separate lists + // of sentences from source(s) and target sides, e.g. + // "src1 \t trg1 \n src2 \t trg2" -> ["src1 \n src2", "trg1 \n trg2"] + std::vector convertTsvToLists(const std::string& inputText, size_t numFields) { + std::vector outputFields(numFields); + + std::string line; + std::vector lineFields(numFields); + std::istringstream inputStream(inputText); + bool first = true; + while(std::getline(inputStream, line)) { + utils::splitTsv(line, lineFields, numFields); + for(size_t i = 0; i < numFields; ++i) { + if(!first) + outputFields[i] += "\n"; // join sentences with a new line sign + outputFields[i] += lineFields[i]; + } + if(first) + first = false; + } + + return outputFields; + } +}; + } // namespace marian From 9ca8666d8efe59a2291543eb1fb6165d3433fc25 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Thu, 11 Mar 2021 15:45:48 -0800 Subject: [PATCH 10/25] Updates docker to pull from async branch --- contrib/triton-aml/Dockerfile | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index f2d29a5f6..b36abb272 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -42,10 +42,9 @@ RUN ./b2 install --prefix=/usr --with-system --with-thread --with-date_time --wi # Marian install WORKDIR / -RUN git clone --no-checkout https://github.com/marian-nmt/marian-dev +RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git WORKDIR marian-dev -RUN git checkout youki/quantize-embedding -RUN git checkout dad48865fd3b7f1d7b891de81040f7651e824510 +RUN git checkout async_return RUN mkdir src/static RUN mkdir build COPY src/cmarian.cpp /marian-dev/src/static From 6ec9abd8aefa0cea1d63361435756445d8caebc0 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Thu, 11 Mar 2021 18:20:08 -0800 Subject: [PATCH 11/25] Triton backend with async backend now compiles. Need to test --- .../triton-aml/marian_backend/src/marian.cc | 31 ++++++++-------- contrib/triton-aml/src/CMakeLists.txt | 36 +++++++++++++------ 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/contrib/triton-aml/marian_backend/src/marian.cc b/contrib/triton-aml/marian_backend/src/marian.cc index 73c74c394..029e5e5e9 100644 --- a/contrib/triton-aml/marian_backend/src/marian.cc +++ b/contrib/triton-aml/marian_backend/src/marian.cc @@ -610,6 +610,9 @@ struct CallbackState { // Requests to Triton TRITONBACKEND_Request** requests; + + // Model instance state + ModelInstanceState* instance_state; }; @@ -621,10 +624,9 @@ void sendResponse(int bn, const char* result, void* userData) // Use at to get bound checking when accessing the vector int requestNumber = state->marianBatch_to_tritonRequest_map.at(bn); int requestBatchIdx = state->marianBatchIdx_to_requestBatchIdx_map.at(bn); - size_t requestBatchSize = state->partially_completed_requests.at(requestNumber).size(); // For uniformity, I always assign the translated sentence to the partially completed requests array. - const std::vector& requestStaging = state->partially_completed_requests.at(requestNumber); + std::vector& requestStaging = state->partially_completed_requests.at(requestNumber); if (!requestStaging.at(requestBatchIdx).empty()) { GUARDED_RESPOND_IF_ERROR( @@ -643,7 +645,9 @@ void sendResponse(int bn, const char* result, void* userData) ); return; } + requestStaging.at(requestBatchIdx) = result; + const int requestBatchSize = (int)requestStaging.size(); // Now we check if any sentence in the batch of requests still remains to be processed. If so, // return immediately since we have already stored the translated sentence in the staging area above. @@ -656,16 +660,16 @@ void sendResponse(int bn, const char* result, void* userData) // If here, we need to concat all the sentences in the staging area for the given request and immediately // send a response to the user. std::string concatedSentences; - for (int sen = 0; sen < (int)requestStaging.size(); ++sen) { - concatedSentences += sentence; - if (sen + 1 != (int)requestStaging.size()) { + for (int sen = 0; sen < requestBatchSize; ++sen) { + concatedSentences += requestStaging[sen]; + if (sen + 1 != requestBatchSize) { concatedSentences += "\n"; } } std::cout << bn << " " << concatedSentences << std::endl; - TRITONBACKEND_Input* input = state->request_input[r]; + TRITONBACKEND_Input* input = state->request_input[requestNumber]; const char* input_name; TRITONSERVER_DataType input_datatype; const int64_t* input_shape; @@ -727,7 +731,7 @@ void sendResponse(int bn, const char* result, void* userData) ) ); - if ((responses[requestNumber] == nullptr) || + if ((state->responses[requestNumber] == nullptr) || (output_memory_type == TRITONSERVER_MEMORY_GPU)) { GUARDED_RESPOND_IF_ERROR( state->responses, requestNumber, @@ -752,7 +756,7 @@ void sendResponse(int bn, const char* result, void* userData) // Send the response. LOG_IF_ERROR( TRITONBACKEND_ResponseSend( - responses[requestNumber], TRITONSERVER_RESPONSE_COMPLETE_FINAL, + state->responses[requestNumber], TRITONSERVER_RESPONSE_COMPLETE_FINAL, nullptr /* success */), "failed sending response" ); @@ -762,7 +766,7 @@ void sendResponse(int bn, const char* result, void* userData) SET_TIMESTAMP(request_exec_end_ns); LOG_IF_ERROR( TRITONBACKEND_ModelInstanceReportStatistics( - instance_state->TritonModelInstance(), request, true /* success */, + state->instance_state->TritonModelInstance(), request, true /* success */, state->exec_start_ns, state->exec_start_ns, request_exec_end_ns, request_exec_end_ns), "failed reporting request statistics" ); @@ -921,9 +925,8 @@ TRITONSERVER_Error* serveRequestsAsync( // Operate on the entire batch of requests for improved performance. void* vstate; RETURN_IF_ERROR(TRITONBACKEND_ModelInstanceState(instance, &vstate)); - ModelInstanceState* instance_state = - reinterpret_cast(vstate); - void* marian = instance_state->Marian(); + state.instance_state = reinterpret_cast(vstate); + void* marian = state.instance_state->Marian(); translate_async(marian, const_cast(input_strings.c_str()), sendResponse, (void*)&state); @@ -932,7 +935,7 @@ TRITONSERVER_Error* serveRequestsAsync( SET_TIMESTAMP(exec_end_ns); LOG_IF_ERROR( TRITONBACKEND_ModelInstanceReportBatchStatistics( - instance_state->TritonModelInstance(), total_batch_size, + state.instance_state->TritonModelInstance(), total_batch_size, state.exec_start_ns, state.exec_start_ns, exec_end_ns, exec_end_ns), "failed reporting batch request statistics" ); @@ -945,7 +948,7 @@ TRITONBACKEND_ModelInstanceExecute( TRITONBACKEND_ModelInstance* instance, TRITONBACKEND_Request** requests, const uint32_t request_count) { - return serveRequestsAsync(instance, requests, request_count) + return serveRequestsAsync(instance, requests, request_count); // return serveRequestsSync(instance, requests, request_count); } diff --git a/contrib/triton-aml/src/CMakeLists.txt b/contrib/triton-aml/src/CMakeLists.txt index 2e8e37716..ff0ff9b79 100644 --- a/contrib/triton-aml/src/CMakeLists.txt +++ b/contrib/triton-aml/src/CMakeLists.txt @@ -1,3 +1,5 @@ +add_definitions(-DCUB_IGNORE_DEPRECATED_CPP_DIALECT=1) +add_definitions(-DTHRUST_IGNORE_DEPRECATED_CPP_DIALECT=1) add_subdirectory(3rd_party) include_directories(.) @@ -5,10 +7,11 @@ include_directories(3rd_party) include_directories(3rd_party/SQLiteCpp/include) include_directories(3rd_party/sentencepiece) include_directories(3rd_party/fbgemm/include) +include_directories(3rd_party/intgemm) +include_directories(${CMAKE_BINARY_DIR}/src/3rd_party/intgemm) # running cmake on the intgemm submodule triggers config file generation in this directory. include_directories(${CMAKE_BINARY_DIR}/local/include) add_library(marian STATIC - static/cmarian.cpp common/aliases.cpp common/fastopt.cpp common/version.cpp @@ -21,9 +24,12 @@ add_library(marian STATIC common/config_validator.cpp common/options.cpp common/binary.cpp + ${CMAKE_CURRENT_BINARY_DIR}/common/build_info.cpp common/io.cpp common/filesystem.cpp common/file_stream.cpp + common/file_utils.cpp + common/signal_handling.cpp common/types.cpp data/alignment.cpp @@ -40,6 +46,8 @@ add_library(marian STATIC 3rd_party/cnpy/cnpy.cpp 3rd_party/ExceptionWithCallStack.cpp + 3rd_party/onnx/protobuf/onnx-ml.pb-wrapper.cpp + 3rd_party/phf/phf.cc tensors/backend.cpp @@ -47,11 +55,9 @@ add_library(marian STATIC tensors/tensor.cpp tensors/cpu/device.cpp tensors/cpu/prod.cpp + tensors/cpu/topk.cpp tensors/cpu/tensor_operators.cpp - - tensors/cpu/sharp/int_gemm.cpp - tensors/cpu/sharp/avx_gemm.cpp - tensors/cpu/sharp/sse_gemm.cpp + tensors/cpu/integer_common.cpp tensors/cpu/fbgemm/packed_gemm.cpp graph/expression_graph.cpp @@ -60,23 +66,31 @@ add_library(marian STATIC graph/node_operators.cpp graph/node_initializers.cpp + onnx/expression_graph_onnx_exporter.cpp + onnx/expression_graph_onnx_serialization.cpp + layers/convolution.cpp layers/generic.cpp layers/loss.cpp layers/weight.cpp + layers/lsh.cpp rnn/cells.cpp rnn/attention.cpp + optimizers/quantizer.cpp optimizers/clippers.cpp optimizers/optimizers.cpp + optimizers/exponential_smoothing.cpp models/model_factory.cpp models/encoder_decoder.cpp models/transformer_stub.cpp rescorer/score_collector.cpp + embedder/vector_collector.cpp + translator/beam_search.cpp translator/history.cpp translator/output_collector.cpp translator/output_printer.cpp @@ -85,22 +99,21 @@ add_library(marian STATIC translator/scorers.cpp training/graph_group_async.cpp - training/graph_group_async_drop.cpp training/graph_group_sync.cpp + training/graph_group.cpp training/graph_group_singleton.cpp - training/graph_group_multinode.cpp - training/graph_group_multinode_sync.cpp training/validator.cpp training/communicator.cpp - training/scheduler.cpp # this is only compiled to catch build errors, but not linked microsoft/quicksand.cpp + microsoft/cosmos.cpp $ $ $ $ + $ ) target_compile_options(marian PUBLIC ${ALL_WARNINGS}) @@ -138,6 +151,9 @@ cuda_add_library(marian_cuda tensors/gpu/device.cu tensors/gpu/algorithm.cu tensors/gpu/prod.cpp + tensors/gpu/prod.cu + tensors/gpu/prod_sparse.cpp + tensors/gpu/topk.cu tensors/gpu/element.cu tensors/gpu/add.cu tensors/gpu/add_all.cu @@ -145,8 +161,6 @@ cuda_add_library(marian_cuda tensors/gpu/cudnn_wrappers.cu translator/nth_element.cu translator/helpers.cu - training/gradient_dropping/gpu/dropper.cu - training/gradient_dropping/gpu/sparse_algorithm.cu STATIC) target_compile_options(marian_cuda PUBLIC ${ALL_WARNINGS}) From dbe7a2a049e93ff55617e0db63f2fbd86e397a9a Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Wed, 24 Mar 2021 17:27:51 -0700 Subject: [PATCH 12/25] Attempt 1 at setting async mode in config.pbtxt --- .../triton-aml/marian_backend/src/marian.cc | 66 ++++++++++++++++--- 1 file changed, 57 insertions(+), 9 deletions(-) diff --git a/contrib/triton-aml/marian_backend/src/marian.cc b/contrib/triton-aml/marian_backend/src/marian.cc index 029e5e5e9..650baa63e 100644 --- a/contrib/triton-aml/marian_backend/src/marian.cc +++ b/contrib/triton-aml/marian_backend/src/marian.cc @@ -34,6 +34,10 @@ class ModelState { TRITONSERVER_Error* SetMarianConfigPath(); + // Checks the config.pbtxt file and returns requests asynchronously if parameters["async"] + // is "true". Batches will be served synchronously if this flag is false + TRITONSERVER_Error* SetAsyncMode(); + // Get the handle to the TRITONBACKEND model. TRITONBACKEND_Model* TritonModel() { return triton_model_; } @@ -43,6 +47,8 @@ class ModelState { // Get the Marian config path of the model. const std::string& MarianConfigPath() const { return marian_config_path_; } + const unsigned& asyncMode() const { return async_mode_; } + private: ModelState( TRITONBACKEND_Model* triton_model, const char* name, @@ -52,6 +58,7 @@ class ModelState { const std::string name_; common::TritonJson::Value model_config_; std::string marian_config_path_; + unsigned async_mode_; }; TRITONSERVER_Error* @@ -124,6 +131,34 @@ ModelState::SetMarianConfigPath() return nullptr; // success } +TRITONSERVER_Error* +ModelState::SetAsyncMode() +{ + common::TritonJson::WriteBuffer buffer; + RETURN_IF_ERROR(model_config_.PrettyWrite(&buffer)); + LOG_MESSAGE( + TRITONSERVER_LOG_INFO, "Setting Async Mode"); + + unsigned return_async = 0; + common::TritonJson::Value parameters; + if (model_config_.Find("parameters", ¶meters)) { + common::TritonJson::Value config_filepath; + if (parameters.Find("async", &config_filepath)) { + RETURN_IF_ERROR(config_filepath.MemberAsUInt( + "mode", &return_async) + ); + LOG_MESSAGE( + TRITONSERVER_LOG_INFO, + (std::string("Async mode set to : ") + return_async != 0) + .c_str() + ); + } + } + + async_mode_ = return_async; + return nullptr; // success +} + // // ModelInstanceState // @@ -134,7 +169,7 @@ class ModelInstanceState { public: static TRITONSERVER_Error* Create( TRITONBACKEND_ModelInstance* triton_model_instance, - void* marian, ModelInstanceState **state); + void* marian, unsigned async, ModelInstanceState **state); // Get the handle to the TRITONBACKEND model instance. TRITONBACKEND_ModelInstance* TritonModelInstance() @@ -147,24 +182,27 @@ class ModelInstanceState { TRITONSERVER_InstanceGroupKind Kind() const { return kind_; } int32_t DeviceId() const { return device_id_; } void* Marian() const { return marian_; } + unsigned Async() const { return async_; } private: ModelInstanceState( TRITONBACKEND_ModelInstance* triton_model_instance, void* marian, const char* name, - const TRITONSERVER_InstanceGroupKind kind, const int32_t device_id); + const TRITONSERVER_InstanceGroupKind kind, const int32_t device_id, + const unsigned async); TRITONBACKEND_ModelInstance* triton_model_instance_; void* marian_; const std::string name_; const TRITONSERVER_InstanceGroupKind kind_; const int32_t device_id_; + const unsigned async_; }; TRITONSERVER_Error* ModelInstanceState::Create( TRITONBACKEND_ModelInstance* triton_model_instance, - void* marian, ModelInstanceState** state) + void* marian, unsigned async, ModelInstanceState** state) { const char* instance_name; RETURN_IF_ERROR( @@ -180,7 +218,7 @@ ModelInstanceState::Create( *state = new ModelInstanceState( triton_model_instance, marian, instance_name, - instance_kind, instance_id); + instance_kind, instance_id, async); return nullptr; // success } @@ -188,9 +226,10 @@ ModelInstanceState::Create( ModelInstanceState::ModelInstanceState( TRITONBACKEND_ModelInstance* triton_model_instance, void* marian, const char* name, - const TRITONSERVER_InstanceGroupKind kind, const int32_t device_id) + const TRITONSERVER_InstanceGroupKind kind, const int32_t device_id, + const unsigned async) : triton_model_instance_(triton_model_instance), marian_(marian), - name_(name), kind_(kind), device_id_(device_id) + name_(name), kind_(kind), device_id_(device_id), async_(async) { } @@ -214,6 +253,7 @@ TRITONBACKEND_ModelInitialize(TRITONBACKEND_Model* model) ModelState* model_state; RETURN_IF_ERROR(ModelState::Create(model, &model_state)); RETURN_IF_ERROR(model_state->SetMarianConfigPath()); + RETURN_IF_ERROR(model_state->SetAsyncMode()); RETURN_IF_ERROR( TRITONBACKEND_ModelSetState(model, reinterpret_cast(model_state)) ); @@ -250,6 +290,7 @@ TRITONBACKEND_ModelInstanceInitialize(TRITONBACKEND_ModelInstance* instance) ModelState* model_state = reinterpret_cast(vmodelstate); std::string marian_config_path = model_state->MarianConfigPath(); + unsigned async = model_state->asyncMode(); int32_t device; RETURN_IF_ERROR( @@ -259,7 +300,7 @@ TRITONBACKEND_ModelInstanceInitialize(TRITONBACKEND_ModelInstance* instance) ModelInstanceState* instance_state; RETURN_IF_ERROR( - ModelInstanceState::Create(instance, marian_instance, &instance_state)); + ModelInstanceState::Create(instance, marian_instance, async, &instance_state)); RETURN_IF_ERROR(TRITONBACKEND_ModelInstanceSetState( instance, reinterpret_cast(instance_state))); @@ -948,8 +989,15 @@ TRITONBACKEND_ModelInstanceExecute( TRITONBACKEND_ModelInstance* instance, TRITONBACKEND_Request** requests, const uint32_t request_count) { - return serveRequestsAsync(instance, requests, request_count); - // return serveRequestsSync(instance, requests, request_count); + void* vstate; + RETURN_IF_ERROR(TRITONBACKEND_ModelInstanceState(instance, &vstate)); + ModelInstanceState* state = reinterpret_cast(vstate); + + if state->Async() { + return serveRequestsAsync(instance, requests, request_count); + } else { + return serveRequestsSync(instance, requests, request_count); + } } } // extern "C" From 207fa0cc7ad193fc1b53a0633f3aea62a9b389a3 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Wed, 24 Mar 2021 18:17:00 -0700 Subject: [PATCH 13/25] Fix async build --- .../triton-aml/marian_backend/src/marian.cc | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/contrib/triton-aml/marian_backend/src/marian.cc b/contrib/triton-aml/marian_backend/src/marian.cc index 650baa63e..75f88b9ca 100644 --- a/contrib/triton-aml/marian_backend/src/marian.cc +++ b/contrib/triton-aml/marian_backend/src/marian.cc @@ -1,4 +1,5 @@ #include +#include #include "marian.h" #include "triton/backend/backend_common.h" @@ -47,7 +48,7 @@ class ModelState { // Get the Marian config path of the model. const std::string& MarianConfigPath() const { return marian_config_path_; } - const unsigned& asyncMode() const { return async_mode_; } + const bool& asyncMode() const { return async_mode_; } private: ModelState( @@ -58,7 +59,7 @@ class ModelState { const std::string name_; common::TritonJson::Value model_config_; std::string marian_config_path_; - unsigned async_mode_; + bool async_mode_; }; TRITONSERVER_Error* @@ -139,23 +140,24 @@ ModelState::SetAsyncMode() LOG_MESSAGE( TRITONSERVER_LOG_INFO, "Setting Async Mode"); - unsigned return_async = 0; + std::string return_async_mode; common::TritonJson::Value parameters; if (model_config_.Find("parameters", ¶meters)) { - common::TritonJson::Value config_filepath; - if (parameters.Find("async", &config_filepath)) { - RETURN_IF_ERROR(config_filepath.MemberAsUInt( - "mode", &return_async) + common::TritonJson::Value async_value; + if (parameters.Find("async", &async_value)) { + RETURN_IF_ERROR(async_value.MemberAsString( + "string_value", &return_async_mode) ); LOG_MESSAGE( TRITONSERVER_LOG_INFO, - (std::string("Async mode set to : ") + return_async != 0) + (std::string("Async mode set to : ") + return_async_mode) .c_str() ); } } - async_mode_ = return_async; + std::transform(return_async_mode.begin(), return_async_mode.end(), return_async_mode.begin(), ::tolower); + async_mode_ = return_async_mode == "true"; return nullptr; // success } @@ -169,7 +171,7 @@ class ModelInstanceState { public: static TRITONSERVER_Error* Create( TRITONBACKEND_ModelInstance* triton_model_instance, - void* marian, unsigned async, ModelInstanceState **state); + void* marian, bool async, ModelInstanceState **state); // Get the handle to the TRITONBACKEND model instance. TRITONBACKEND_ModelInstance* TritonModelInstance() @@ -182,27 +184,27 @@ class ModelInstanceState { TRITONSERVER_InstanceGroupKind Kind() const { return kind_; } int32_t DeviceId() const { return device_id_; } void* Marian() const { return marian_; } - unsigned Async() const { return async_; } + bool Async() const { return async_; } private: ModelInstanceState( TRITONBACKEND_ModelInstance* triton_model_instance, void* marian, const char* name, const TRITONSERVER_InstanceGroupKind kind, const int32_t device_id, - const unsigned async); + const bool async); TRITONBACKEND_ModelInstance* triton_model_instance_; void* marian_; const std::string name_; const TRITONSERVER_InstanceGroupKind kind_; const int32_t device_id_; - const unsigned async_; + const bool async_; }; TRITONSERVER_Error* ModelInstanceState::Create( TRITONBACKEND_ModelInstance* triton_model_instance, - void* marian, unsigned async, ModelInstanceState** state) + void* marian, bool async, ModelInstanceState** state) { const char* instance_name; RETURN_IF_ERROR( @@ -227,7 +229,7 @@ ModelInstanceState::ModelInstanceState( TRITONBACKEND_ModelInstance* triton_model_instance, void* marian, const char* name, const TRITONSERVER_InstanceGroupKind kind, const int32_t device_id, - const unsigned async) + const bool async) : triton_model_instance_(triton_model_instance), marian_(marian), name_(name), kind_(kind), device_id_(device_id), async_(async) { @@ -290,7 +292,7 @@ TRITONBACKEND_ModelInstanceInitialize(TRITONBACKEND_ModelInstance* instance) ModelState* model_state = reinterpret_cast(vmodelstate); std::string marian_config_path = model_state->MarianConfigPath(); - unsigned async = model_state->asyncMode(); + bool async = model_state->asyncMode(); int32_t device; RETURN_IF_ERROR( @@ -993,7 +995,7 @@ TRITONBACKEND_ModelInstanceExecute( RETURN_IF_ERROR(TRITONBACKEND_ModelInstanceState(instance, &vstate)); ModelInstanceState* state = reinterpret_cast(vstate); - if state->Async() { + if (state->Async()) { return serveRequestsAsync(instance, requests, request_count); } else { return serveRequestsSync(instance, requests, request_count); From b1541e0e15153ee0fb426b3c95876f8091a88d4f Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Wed, 24 Mar 2021 19:05:03 -0700 Subject: [PATCH 14/25] adds cmarian.cpp to static build --- contrib/triton-aml/src/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/contrib/triton-aml/src/CMakeLists.txt b/contrib/triton-aml/src/CMakeLists.txt index ff0ff9b79..b7e63d0a1 100644 --- a/contrib/triton-aml/src/CMakeLists.txt +++ b/contrib/triton-aml/src/CMakeLists.txt @@ -12,6 +12,7 @@ include_directories(${CMAKE_BINARY_DIR}/src/3rd_party/intgemm) # running cmake o include_directories(${CMAKE_BINARY_DIR}/local/include) add_library(marian STATIC + static/cmarian.cpp common/aliases.cpp common/fastopt.cpp common/version.cpp From 46acae1f62031efd4378aecfe174537e770d2582 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Thu, 25 Mar 2021 17:03:51 -0700 Subject: [PATCH 15/25] Fixes linking issue in build with new version of Marian. Pass callback and userdata directly to beam search invocation --- contrib/triton-aml/Dockerfile | 6 +++++- contrib/triton-aml/marian_backend/CMakeLists.txt | 1 + contrib/triton-aml/src/cmarian.cpp | 3 +-- src/models/model_task.h | 3 +-- src/translator/translator.h | 10 ++-------- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index b36abb272..1fd5b9764 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -53,7 +53,10 @@ RUN rm src/CMakeLists.txt COPY src/CMakeLists.txt /marian-dev/src WORKDIR /marian-dev/build -RUN cmake .. -DCOMPILE_CPU=on -DCOMPILE_CUDA=on -DUSE_SENTENCEPIECE=on -DUSE_STATIC_LIBS=off -DCOMPILE_SERVER=off -DUSE_FBGEMM=on -DCUDA_cublas_device_LIBRARY=/usr/lib/x86_64-linux-gnu/libcublas.so +RUN cmake .. -DCOMPILE_CPU=on -DCOMPILE_CUDA=on -DUSE_SENTENCEPIECE=on -DUSE_STATIC_LIBS=off -DCOMPILE_SERVER=off -DUSE_FBGEMM=on \ + -DCOMPILE_CUDA_SM35=off -DCOMPILE_CUDA_SM50=off -DCOMPILE_CUDA_SM60=off -DCOMPILE_CUDA_SM70=on -DCOMPILE_CUDA_SM75=off \ + -DCUDA_cublas_device_LIBRARY=/usr/lib/x86_64-linux-gnu/libcublas.so + RUN make -j $(grep -c ^processor /proc/cpuinfo) # build cmarian static library @@ -65,6 +68,7 @@ COPY --from=BUILDER /marian-dev/build/src/3rd_party/fbgemm/libfbgemm.a /usr/lib COPY --from=BUILDER /marian-dev/build/src/3rd_party/fbgemm/asmjit/libasmjit.a /usr/lib COPY --from=BUILDER /marian-dev/build/src/3rd_party/sentencepiece/src/libsentencepiece_train.a /usr/lib COPY --from=BUILDER /marian-dev/build/src/3rd_party/sentencepiece/src/libsentencepiece.a /usr/lib +COPY --from=BUILDER /marian-dev/build/src/3rd_party/intgemm/libintgemm.a /usr/lib COPY --from=BUILDER /marian-dev/build/libmarian.a /usr/lib/libcmarian.a COPY --from=BUILDER /marian-dev/build/src/libmarian_cuda.a /usr/lib/libcmarian_cuda.a diff --git a/contrib/triton-aml/marian_backend/CMakeLists.txt b/contrib/triton-aml/marian_backend/CMakeLists.txt index 87ee85995..0afe953fb 100644 --- a/contrib/triton-aml/marian_backend/CMakeLists.txt +++ b/contrib/triton-aml/marian_backend/CMakeLists.txt @@ -94,6 +94,7 @@ target_link_libraries( fbgemm asmjit protobuf + intgemm ) diff --git a/contrib/triton-aml/src/cmarian.cpp b/contrib/triton-aml/src/cmarian.cpp index bbee07c3e..ecdec8a5c 100644 --- a/contrib/triton-aml/src/cmarian.cpp +++ b/contrib/triton-aml/src/cmarian.cpp @@ -75,8 +75,7 @@ class CMarian { taskAsync_ = New>(options_); } std::string strSent(sent); - taskAsync_->registerCallback(callback, userData); - taskAsync_->run(strSent); + taskAsync_->run(strSent, callback, userData); } }; diff --git a/src/models/model_task.h b/src/models/model_task.h index fc9314d7b..5a54b280e 100644 --- a/src/models/model_task.h +++ b/src/models/model_task.h @@ -11,8 +11,7 @@ struct ModelTask { struct ModelCallbackTask { virtual ~ModelCallbackTask() {} - virtual void registerCallback(void (*)(int, const char*, void*), void*) = 0; - virtual void run(const std::string&) = 0; + virtual void run(const std::string&, void (*)(int, const char*, void*), void*) = 0; }; struct ModelServiceTask { diff --git a/src/translator/translator.h b/src/translator/translator.h index aadddf7fe..78a68f024 100644 --- a/src/translator/translator.h +++ b/src/translator/translator.h @@ -382,7 +382,6 @@ class TranslateServiceAsync : public ModelCallbackTask { size_t numDevices_; void (*callback_)(int, const char*, void*) = nullptr; - void* userData_ = nullptr; public: virtual ~TranslateServiceAsync() {} @@ -434,12 +433,7 @@ class TranslateServiceAsync : public ModelCallbackTask { } } - void registerCallback(void (*callback)(int, const char*, void*), void* userData) override { - callback_ = callback; - userData_ = userData; - } - - void run(const std::string& input) override { + void run(const std::string& input, void (*callback)(int, const char*, void*), void* userData) override { // split tab-separated input into fields if necessary auto inputs = options_->get("tsv", false) ? convertTsvToLists(input, options_->get("tsv-fields", 1)) @@ -467,7 +461,7 @@ class TranslateServiceAsync : public ModelCallbackTask { } auto search = New(options_, scorers, trgVocab_); - search->search(graph, batch, callback_); + search->search(graph, batch, callback, userData); }; threadPool_.enqueue(task, batchId); From 41a392584c0044c3c414cb75826fb89ab0e238ad Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Fri, 26 Mar 2021 10:14:33 -0700 Subject: [PATCH 16/25] Remove print --- contrib/triton-aml/Dockerfile | 2 +- contrib/triton-aml/marian_backend/src/marian.cc | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index 1fd5b9764..a04f64f78 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -42,7 +42,7 @@ RUN ./b2 install --prefix=/usr --with-system --with-thread --with-date_time --wi # Marian install WORKDIR / -RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git +RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git WORKDIR marian-dev RUN git checkout async_return RUN mkdir src/static diff --git a/contrib/triton-aml/marian_backend/src/marian.cc b/contrib/triton-aml/marian_backend/src/marian.cc index 75f88b9ca..67e930a53 100644 --- a/contrib/triton-aml/marian_backend/src/marian.cc +++ b/contrib/triton-aml/marian_backend/src/marian.cc @@ -710,8 +710,6 @@ void sendResponse(int bn, const char* result, void* userData) } } - std::cout << bn << " " << concatedSentences << std::endl; - TRITONBACKEND_Input* input = state->request_input[requestNumber]; const char* input_name; TRITONSERVER_DataType input_datatype; From f52bca3aaeaa8ca9713dce4c3300bbacb307df68 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Fri, 26 Mar 2021 15:09:45 -0700 Subject: [PATCH 17/25] Builds backend against CUDA 11 --- contrib/triton-aml/Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index a04f64f78..b0d4d938d 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -1,5 +1,5 @@ # It is recommended to use a machine which supports CUDA to build this image. -FROM nvidia/cuda:10.2-cudnn7-devel-ubuntu18.04 AS BUILDER +FROM nvidia/cuda:11.1-cudnn8-devel-ubuntu18.04 AS BUILDER RUN apt-get update --fix-missing RUN apt-get install -y curl git autoconf automake libtool curl make g++ unzip cmake build-essential cpio RUN apt-get -y clean && \ @@ -42,7 +42,7 @@ RUN ./b2 install --prefix=/usr --with-system --with-thread --with-date_time --wi # Marian install WORKDIR / -RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git +RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git WORKDIR marian-dev RUN git checkout async_return RUN mkdir src/static @@ -55,7 +55,7 @@ COPY src/CMakeLists.txt /marian-dev/src WORKDIR /marian-dev/build RUN cmake .. -DCOMPILE_CPU=on -DCOMPILE_CUDA=on -DUSE_SENTENCEPIECE=on -DUSE_STATIC_LIBS=off -DCOMPILE_SERVER=off -DUSE_FBGEMM=on \ -DCOMPILE_CUDA_SM35=off -DCOMPILE_CUDA_SM50=off -DCOMPILE_CUDA_SM60=off -DCOMPILE_CUDA_SM70=on -DCOMPILE_CUDA_SM75=off \ - -DCUDA_cublas_device_LIBRARY=/usr/lib/x86_64-linux-gnu/libcublas.so + -DCUDA_cublas_device_LIBRARY=/usr/local/cuda/lib64/libcublas.so RUN make -j $(grep -c ^processor /proc/cpuinfo) From a52f1fefb8c979ebf58c9dc217b7cd9adfc4cbc5 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Tue, 30 Mar 2021 16:34:03 -0700 Subject: [PATCH 18/25] Build for Volta and Turing by default --- contrib/triton-aml/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index b0d4d938d..8d7e3f60f 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -54,7 +54,7 @@ COPY src/CMakeLists.txt /marian-dev/src WORKDIR /marian-dev/build RUN cmake .. -DCOMPILE_CPU=on -DCOMPILE_CUDA=on -DUSE_SENTENCEPIECE=on -DUSE_STATIC_LIBS=off -DCOMPILE_SERVER=off -DUSE_FBGEMM=on \ - -DCOMPILE_CUDA_SM35=off -DCOMPILE_CUDA_SM50=off -DCOMPILE_CUDA_SM60=off -DCOMPILE_CUDA_SM70=on -DCOMPILE_CUDA_SM75=off \ + -DCOMPILE_CUDA_SM35=off -DCOMPILE_CUDA_SM50=off -DCOMPILE_CUDA_SM60=off -DCOMPILE_CUDA_SM70=on -DCOMPILE_CUDA_SM75=on \ -DCUDA_cublas_device_LIBRARY=/usr/local/cuda/lib64/libcublas.so RUN make -j $(grep -c ^processor /proc/cpuinfo) From 5de8a5e99e174a3a7cdb6f5d3f8e90af8bb36c9d Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Wed, 7 Apr 2021 15:19:22 -0700 Subject: [PATCH 19/25] Install tcmalloc --- contrib/triton-aml/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index 8d7e3f60f..a2b2f063b 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -1,7 +1,7 @@ # It is recommended to use a machine which supports CUDA to build this image. FROM nvidia/cuda:11.1-cudnn8-devel-ubuntu18.04 AS BUILDER RUN apt-get update --fix-missing -RUN apt-get install -y curl git autoconf automake libtool curl make g++ unzip cmake build-essential cpio +RUN apt-get install -y curl git autoconf automake libtool curl make g++ unzip cmake build-essential cpio libgoogle-perftools-dev RUN apt-get -y clean && \ rm -rf /var/lib/apt/lists/* From 22d15b00cdc3961921e3f87075f05130524a44a7 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Mon, 19 Apr 2021 12:52:39 -0700 Subject: [PATCH 20/25] Updates contrib readme --- contrib/triton-aml/Dockerfile | 4 ++-- contrib/triton-aml/README.md | 11 +++++++++- .../triton-aml/marian_backend/src/marian.cc | 20 +++++++++++++------ 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index a2b2f063b..61c144b8f 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -42,9 +42,9 @@ RUN ./b2 install --prefix=/usr --with-system --with-thread --with-date_time --wi # Marian install WORKDIR / -RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git +RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git WORKDIR marian-dev -RUN git checkout async_return +RUN git checkout async RUN mkdir src/static RUN mkdir build COPY src/cmarian.cpp /marian-dev/src/static diff --git a/contrib/triton-aml/README.md b/contrib/triton-aml/README.md index b115d7674..ddf47c0c9 100644 --- a/contrib/triton-aml/README.md +++ b/contrib/triton-aml/README.md @@ -27,9 +27,18 @@ For the AzureML Inference team members, you can put it into the following place Where is by default /opt/tritonserver/backends. +This backend will return sentences as soon as they are done with translation by default. To only return when the +entire batch is finished translating, set the async_mode to false by adding the following your config.pbtxt file. + +parameters [ + { + key: "async" + value: { string_value : "false" } + } +] ## Make changes -If you want to compile with another version of Marian, you need to replace `RUN git checkout youki/quantize-embedding` in the Dockerfile, then copy the new CMakeLists.txt replace the old one, add src/cmarian.cpp into CMakeLists.txt and make some changes to make sure it will build a static library of Marian. +If you want to compile with another version of Marian, you need to replace `RUN git checkout async` in the Dockerfile, then copy the new CMakeLists.txt replace the old one, add src/cmarian.cpp into CMakeLists.txt and make some changes to make sure it will build a static library of Marian. ## Limitation diff --git a/contrib/triton-aml/marian_backend/src/marian.cc b/contrib/triton-aml/marian_backend/src/marian.cc index 67e930a53..1b42e6ac8 100644 --- a/contrib/triton-aml/marian_backend/src/marian.cc +++ b/contrib/triton-aml/marian_backend/src/marian.cc @@ -148,16 +148,24 @@ ModelState::SetAsyncMode() RETURN_IF_ERROR(async_value.MemberAsString( "string_value", &return_async_mode) ); - LOG_MESSAGE( - TRITONSERVER_LOG_INFO, - (std::string("Async mode set to : ") + return_async_mode) - .c_str() - ); } } std::transform(return_async_mode.begin(), return_async_mode.end(), return_async_mode.begin(), ::tolower); - async_mode_ = return_async_mode == "true"; + + if (!return_async_mode.empty() && return_async_mode != "true" && return_async_mode != "false") + { + return TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_UNSUPPORTED, "Async mode must be empty, true or false"); + } + + async_mode_ = return_async_mode == "true" || return_async_mode.empty(); + + LOG_MESSAGE( + TRITONSERVER_LOG_INFO, + (std::string("Async mode set to : ") + async_mode_) + .c_str() + ); + return nullptr; // success } From cfde9945492d54aa24605551fd2488d480417f04 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Mon, 19 Apr 2021 14:42:00 -0700 Subject: [PATCH 21/25] Removes artifacts from using std::function for callback --- src/models/model_task.h | 3 +-- src/translator/beam_search.h | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/models/model_task.h b/src/models/model_task.h index 5a54b280e..7b6ba7c73 100644 --- a/src/models/model_task.h +++ b/src/models/model_task.h @@ -1,7 +1,6 @@ #pragma once #include -#include namespace marian { struct ModelTask { @@ -16,6 +15,6 @@ struct ModelCallbackTask { struct ModelServiceTask { virtual ~ModelServiceTask() {} - virtual std::string run(const std::string&, std::function) = 0; + virtual std::string run(const std::string&) = 0; }; } // namespace marian diff --git a/src/translator/beam_search.h b/src/translator/beam_search.h index 8d6c3637a..4c2e1da8c 100644 --- a/src/translator/beam_search.h +++ b/src/translator/beam_search.h @@ -4,7 +4,6 @@ #include "translator/history.h" #include "translator/output_printer.h" #include "translator/scorers.h" -#include namespace marian { From 9e1aac31164cccd0ec7a2064fa4a4bfee2143289 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Mon, 19 Apr 2021 17:43:59 -0700 Subject: [PATCH 22/25] Fixes compile issues --- contrib/triton-aml/Dockerfile | 2 +- contrib/triton-aml/marian_backend/src/marian.cc | 2 +- contrib/triton-aml/src/CMakeLists.txt | 10 ++++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index 61c144b8f..7873d102c 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -42,7 +42,7 @@ RUN ./b2 install --prefix=/usr --with-system --with-thread --with-date_time --wi # Marian install WORKDIR / -RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git +RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git WORKDIR marian-dev RUN git checkout async RUN mkdir src/static diff --git a/contrib/triton-aml/marian_backend/src/marian.cc b/contrib/triton-aml/marian_backend/src/marian.cc index 1b42e6ac8..997f52264 100644 --- a/contrib/triton-aml/marian_backend/src/marian.cc +++ b/contrib/triton-aml/marian_backend/src/marian.cc @@ -162,7 +162,7 @@ ModelState::SetAsyncMode() LOG_MESSAGE( TRITONSERVER_LOG_INFO, - (std::string("Async mode set to : ") + async_mode_) + (std::string("Async mode set to : ") + std::to_string(async_mode_)) .c_str() ); diff --git a/contrib/triton-aml/src/CMakeLists.txt b/contrib/triton-aml/src/CMakeLists.txt index b7e63d0a1..87a06e81a 100644 --- a/contrib/triton-aml/src/CMakeLists.txt +++ b/contrib/triton-aml/src/CMakeLists.txt @@ -43,6 +43,7 @@ add_library(marian STATIC data/corpus_sqlite.cpp data/corpus_nbest.cpp data/text_input.cpp + data/shortlist.cpp 3rd_party/cnpy/cnpy.cpp 3rd_party/ExceptionWithCallStack.cpp @@ -75,6 +76,9 @@ add_library(marian STATIC layers/loss.cpp layers/weight.cpp layers/lsh.cpp + layers/embedding.cpp + layers/output.cpp + layers/logits.cpp rnn/cells.cpp rnn/attention.cpp @@ -87,6 +91,7 @@ add_library(marian STATIC models/model_factory.cpp models/encoder_decoder.cpp models/transformer_stub.cpp + models/costs.cpp rescorer/score_collector.cpp embedder/vector_collector.cpp @@ -110,6 +115,11 @@ add_library(marian STATIC microsoft/quicksand.cpp microsoft/cosmos.cpp + # copied from quicksand to be able to read binary shortlist + microsoft/shortlist/utils/Converter.cpp + microsoft/shortlist/utils/StringUtils.cpp + microsoft/shortlist/utils/ParameterTree.cpp + $ $ $ From c911653e40157c58024c28031640570b923a3813 Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Mon, 19 Apr 2021 18:17:52 -0700 Subject: [PATCH 23/25] Update change log --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8aed7e16..21e21ef58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +- Add support for returning sentences as soon as translation is done in beam search. - Support for RMSNorm as drop-in replace for LayerNorm from `Biao Zhang; Rico Sennrich (2019). Root Mean Square Layer Normalization`. Enabled in Transformer model via `--transformer-postprocess dar` instead of `dan`. - Extend suppression of unwanted output symbols, specifically "\n" from default vocabulary if generated by SentencePiece with byte-fallback. Deactivates with --allow-special - Allow for fine-grained CPU intrinsics overrides when BUILD_ARCH != native e.g. -DBUILD_ARCH=x86-64 -DCOMPILE_AVX512=off From bf12cbc856836fd7fdae8b4affb71c251b85bbca Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Mon, 19 Apr 2021 18:30:43 -0700 Subject: [PATCH 24/25] Fix windows compile error --- src/translator/beam_search.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/translator/beam_search.cpp b/src/translator/beam_search.cpp index ce2720c4f..756730a2f 100644 --- a/src/translator/beam_search.cpp +++ b/src/translator/beam_search.cpp @@ -520,7 +520,7 @@ Histories BeamSearch::search(Ptr graph, Ptr std::stringstream bestn; printer_->print(histories[batchIdx], best1, bestn); std::string result = nbest ? bestn.str() : best1.str(); - callback(histories[batchIdx]->getLineNum(), result.c_str(), userData); + callback((int) histories[batchIdx]->getLineNum(), result.c_str(), userData); } } From 8c82968d42d855759fb1f25bb3ba97585473669b Mon Sep 17 00:00:00 2001 From: Rawn Henry Date: Mon, 19 Apr 2021 18:35:09 -0700 Subject: [PATCH 25/25] Modified Dockerfile to pull from master on marian repo --- contrib/triton-aml/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/triton-aml/Dockerfile b/contrib/triton-aml/Dockerfile index 7873d102c..ee5e7923e 100644 --- a/contrib/triton-aml/Dockerfile +++ b/contrib/triton-aml/Dockerfile @@ -42,9 +42,9 @@ RUN ./b2 install --prefix=/usr --with-system --with-thread --with-date_time --wi # Marian install WORKDIR / -RUN git clone --no-checkout https://github.com/rhenry-nv/marian-dev.git +RUN git clone --no-checkout https://github.com/marian-nmt/marian-dev.git WORKDIR marian-dev -RUN git checkout async +RUN git checkout master RUN mkdir src/static RUN mkdir build COPY src/cmarian.cpp /marian-dev/src/static