diff --git a/src/cpucounters.cpp b/src/cpucounters.cpp index 02bb2e56..f683b380 100644 --- a/src/cpucounters.cpp +++ b/src/cpucounters.cpp @@ -1444,29 +1444,19 @@ bool PCM::discoverSystemTopology() } if(num_sockets == 0) { num_sockets = (int32)(std::max)(socketIdMap.size(), (size_t)1); + // std::cerr << " num_sockets = " << num_sockets << "\n"; } socketIdMap_type::iterator s = socketIdMap.begin(); for (uint32 sid = 0; s != socketIdMap.end(); ++s) { s->second = sid++; - // first is apic id, second is logical socket id - systemTopology->addSocket( s->first, s->second ); } - for (int32 cid = 0; cid < num_cores; ++cid) - { - //std::cerr << "Cid: " << cid << "\n"; - systemTopology->addThread( cid, topology[cid] ); - } - - // All threads are here now so we can set the refCore for a socket - for ( auto& socket : systemTopology->sockets() ) - socket->setRefCore(); - // use map to change apic socket id to the logical socket id for (int i = 0; (i < (int)num_cores) && (!socketIdMap.empty()); ++i) { + // std::cerr << "socket_id: " << topology[i].socket_id << ", socketIdMap tells me: " << socketIdMap[topology[i].socket_id] << "\n"; if(isCoreOnline((int32)i)) topology[i].socket_id = socketIdMap[topology[i].socket_id]; } @@ -1483,7 +1473,7 @@ bool PCM::discoverSystemTopology() { for (int i = 0; i < (int)num_cores; ++i) { - if (topology[i].socket_id == topology[0].socket_id && topology[i].core_id == topology[0].core_id) + if (topology[i].isSameCore( topology[0] )) ++threads_per_core; } assert(threads_per_core != 0); @@ -1491,6 +1481,22 @@ bool PCM::discoverSystemTopology() if(num_phys_cores_per_socket == 0 && num_cores == num_online_cores) num_phys_cores_per_socket = num_cores / num_sockets / threads_per_core; if(num_online_cores == 0) num_online_cores = num_cores; + s = socketIdMap.begin(); + for (; s != socketIdMap.end(); ++s) + { + systemTopology->addSocket( s->second ); + } + + for (int32 cid = 0; cid < num_cores; ++cid) + { + //std::cerr << "Cid: " << cid << "\n"; + systemTopology->addThread( cid, topology[cid] ); + } + + // All threads are here now so we can set the refCore for a socket + for ( auto& socket : systemTopology->sockets() ) + socket->setRefCore(); + int32 i = 0; socketRefCore.resize(num_sockets, -1); @@ -3421,11 +3427,11 @@ void PCM::destroyMSR() PCM::~PCM() { + deleteAndNullify(systemTopology); if (instance) { destroyMSR(); instance = NULL; - deleteAndNullify(systemTopology); } } diff --git a/src/pcm-sensor-server.cpp b/src/pcm-sensor-server.cpp index 4845c01c..f8c3c3b4 100644 --- a/src/pcm-sensor-server.cpp +++ b/src/pcm-sensor-server.cpp @@ -645,7 +645,7 @@ class PrometheusPrinter : Visitor } virtual void dispatch( Core* c ) override { - addToHierarchy( std::string( "core=\"" ) + std::to_string( c->coreID() ) + "\"" ); + addToHierarchy( std::string( "core=\"" ) + std::to_string( c->dieGroupID()*256 + c->dieID()*64 + c->tileID()*16 + c->moduleID()*4 + c->coreID() ) + "\"" ); auto vec = c->threads(); iterateVectorAndCallAccept( vec ); @@ -801,6 +801,7 @@ class PrometheusPrinter : Visitor } removeFromHierarchy(); } + void printSystemCounterState( SystemCounterState const& before, SystemCounterState const& after ) { addToHierarchy( "source=\"uncore\"" ); PCM* pcm = PCM::getInstance(); @@ -3759,6 +3760,7 @@ int mainThrows(int argc, char * argv[]) { bool useRealtimePriority = false; #endif bool forceRTMAbortMode = false; + bool printTopology = false; unsigned short port = 0; unsigned short debug_level = 0; std::string certificateFile; @@ -3774,7 +3776,12 @@ int mainThrows(int argc, char * argv[]) { MainLoop mainLoop; std::string ev_file_name; - if ( argc > 1 ) { + const char* PPTEnv = std::getenv( "PCMSENSORSERVER_PRINT_TOPOLOGY" ); + if ( PPTEnv ) { + if ( *PPTEnv == '1' ) { + printTopology = true; + } + } else if ( argc > 1 ) { std::string arg_value; for ( int i=1; i < argc; ++i ) { @@ -3901,7 +3908,7 @@ int mainThrows(int argc, char * argv[]) { } } - #ifdef __linux__ +#ifdef __linux__ // check kernel version for driver dependency. if (accel != ACCEL_NOCONFIG) { @@ -4035,6 +4042,17 @@ int mainThrows(int argc, char * argv[]) { accs_->programAccelCounters(); } + if ( printTopology ) { + TopologyPrinter* tp = new TopologyPrinter(); + tp->dispatch( PCM::getInstance()->getSystemTopology() ); + std::vector & tpData = tp->topologyDataStrings(); + std::sort( tpData.begin(), tpData.end(), TopologyStringCompare ); + for( auto& line: tpData ) { + std::cout << line << "\n"; + } + deleteAndNullify( tp ); + exit( 0 ); + } #if defined (USE_SSL) if ( useSSL ) { if ( port == 0 ) diff --git a/src/simdjson b/src/simdjson index e341c8b4..b4242d3b 160000 --- a/src/simdjson +++ b/src/simdjson @@ -1 +1 @@ -Subproject commit e341c8b43861b43de29c48ab65f292d997096953 +Subproject commit b4242d3b4ffb97854b035175be077aab712a2d46 diff --git a/src/topology.cpp b/src/topology.cpp index 7368d82f..4572606e 100644 --- a/src/topology.cpp +++ b/src/topology.cpp @@ -30,8 +30,8 @@ UncoreCounterState ClientUncore::uncoreCounterState( void ) const return ucs; } -Socket::Socket( PCM* m, int32 apicID, int32 logicalID ) - : pcm_(m), refCore_(nullptr), apicID_(apicID), logicalID_(logicalID) +Socket::Socket( PCM* m, int32 logicalID ) + : pcm_(m), refCore_(nullptr), logicalID_(logicalID) { if ( pcm_->isServerCPU() ) uncore_ = new ServerUncore( pcm_, logicalID ); @@ -102,4 +102,16 @@ void Aggregator::dispatch( SystemRoot const& syp ) { readAccelCounters(sycs_); } +bool TopologyStringCompare( const std::string& topology1, const std::string& topology2 ) { + if ( topology1.size() == 0 ) return true; + if ( topology2.size() == 0 ) return false; + + int topo1asint, topo2asint; + std::stringstream ss1(topology1); + std::stringstream ss2(topology2); + ss1 >> topo1asint; + ss2 >> topo2asint; + return topo1asint < topo2asint; +} + }// namespace pcm diff --git a/src/topology.h b/src/topology.h index 47432377..62529a07 100644 --- a/src/topology.h +++ b/src/topology.h @@ -63,7 +63,7 @@ enum Status { class HyperThread : public SystemObject { public: - HyperThread( PCM* m, int32 threadID, int32 osID, enum Status status ) : pcm_(m), threadID_(threadID), osID_(osID), status_(status) {} + HyperThread( PCM* m, int32 osID, TopologyEntry te, enum Status status ) : pcm_(m), osID_(osID), te_(te), status_(status) {} virtual ~HyperThread() { pcm_ = nullptr; } virtual void accept( Visitor& v ) override { @@ -77,16 +77,50 @@ class HyperThread : public SystemObject return ccs; } + std::string topologyDataString() const { + std::stringstream ss; + ss << osID_ << "\t" << te_.socket_id << "\t" << te_.die_grp_id << "\t" << te_.die_id << "\t" << te_.tile_id << "\t" << te_.core_id << "\t" << te_.thread_id << "\t"; + return ss.str(); + } + + TopologyEntry topologyEntry() const { + return te_; + } + void addMSRHandle( std::shared_ptr handle ) { msrHandle_ = handle; } + int32 osID() const { + return osID_; + } + int32 threadID() const { - return threadID_; + return te_.thread_id; } - int32 osID() const { - return osID_; + int32 coreID() const { + return te_.core_id; + } + + int32 moduleID() const { + return te_.module_id; + } + + int32 tileID() const { + return te_.tile_id; + } + + int32 dieID() const { + return te_.die_id; + } + + int32 dieGroupID() const { + return te_.die_grp_id; + } + + int32 socketID() const { + return te_.socket_id; } // We simply pass by value, this way the refcounting works best and as expected @@ -99,23 +133,22 @@ class HyperThread : public SystemObject } private: - PCM* pcm_; + PCM* pcm_; std::shared_ptr msrHandle_; - int32 threadID_; - int32 osID_; - enum Status status_; + // osID is the expected osID, offlined cores have te.os_id == -1 + int32 osID_; + TopologyEntry te_; + enum Status status_; }; class Core : public SystemObject { - constexpr static int32 MAX_THREADS_PER_CORE = 4; - public: - Core( PCM* m, int32 coreID, int32 tileID, int32 socketID ) { - pcm_ = m; - coreID_ = coreID; - tileID_ = tileID; - socketID_ = socketID; + Core( PCM* m ) : pcm_(m) { + // PCM* m is not 0, we're being called from the PCM constructor + // Just before this Core object is constructed, the value for + // threads_per_core is determined + MAX_THREADS_PER_CORE = pcm_->getThreadsPerCore(); } virtual ~Core() { pcm_ = nullptr; @@ -136,10 +169,10 @@ class Core : public SystemObject return ccs; } - void addHyperThreadInfo( int32 threadID, int32 osID ) { - if ( threadID >= MAX_THREADS_PER_CORE ) { + void addHyperThreadInfo( int32 osID, TopologyEntry te ) { + if ( te.thread_id >= MAX_THREADS_PER_CORE ) { std::stringstream ss; - ss << "ERROR: Core: threadID cannot be larger than " << MAX_THREADS_PER_CORE << ".\n"; + ss << "ERROR: Core: thread_id cannot be larger than " << MAX_THREADS_PER_CORE << ".\n"; throw std::runtime_error( ss.str() ); } if ( threads_.size() == 0 || @@ -149,8 +182,8 @@ class Core : public SystemObject } ) == threads_.end() ) { - // std::cerr << "Core::addHyperThreadInfo: " << threadID << ", " << osID << "\n"; - threads_.push_back( new HyperThread( pcm_, threadID, osID, Status::Online ) ); + // std::cerr << "Core::addHyperThreadInfo: " << te.thread_id << ", " << te.os_id << "\n"; + threads_.push_back( new HyperThread( pcm_, osID, te, Status::Online ) ); } } @@ -179,15 +212,39 @@ class Core : public SystemObject } int32 coreID() const { - return coreID_; + if ( 0 == threads_.size() ) + throw std::runtime_error("BUG: No threads yet but asking for a coreID!"); + return threads_.front()->coreID(); + } + + int32 moduleID() const { + if ( 0 == threads_.size() ) + throw std::runtime_error("BUG: No threads yet but asking for a moduleID!"); + return threads_.front()->moduleID(); } int32 tileID() const { - return tileID_; + if ( 0 == threads_.size() ) + throw std::runtime_error("BUG: No threads yet but asking for a tileID!"); + return threads_.front()->tileID(); + } + + int32 dieID() const { + if ( 0 == threads_.size() ) + throw std::runtime_error("BUG: No threads yet but asking for a tileID!"); + return threads_.front()->dieID(); + } + + int32 dieGroupID() const { + if ( 0 == threads_.size() ) + throw std::runtime_error("BUG: No threads yet but asking for a tileID!"); + return threads_.front()->dieGroupID(); } int32 socketID() const { - return socketID_; + if ( 0 == threads_.size() ) + throw std::runtime_error("BUG: No threads yet but asking for a socketID!"); + return threads_.front()->socketID(); } bool isOnline() const { @@ -200,9 +257,7 @@ class Core : public SystemObject private: PCM* pcm_; std::vector threads_; - int32 coreID_; - int32 tileID_; - int32 socketID_; + int32 MAX_THREADS_PER_CORE; }; class Uncore : public SystemObject @@ -268,12 +323,12 @@ class Socket : public SystemObject { Socket(const Socket &) = delete; Socket & operator = (const Socket &) = delete; public: - Socket( PCM* m, int32 apicID, int32 logicalID ); + Socket( PCM* m, int32 logicalID ); virtual ~Socket() { pcm_ = nullptr; + refCore_ = nullptr; // cores_ is owner, set it to null before deleting it one below for ( auto& core : cores_ ) deleteAndNullify(core); - refCore_ = nullptr; // cores_ is owner so it is already deleted by here deleteAndNullify(uncore_); } @@ -305,11 +360,10 @@ class Socket : public SystemObject { SocketCounterState socketCounterState( void ) const; - Core* findCoreByTileID( int32 tileID ) { - for ( auto& core : cores_ ) { - if ( core->tileID() == tileID ) + Core* findCoreByTopologyEntry( TopologyEntry te ) { + for ( auto& core : cores_ ) + if ( core->hyperThread( 0 )->topologyEntry().isSameCore( te ) ) return core; - } return nullptr; } @@ -321,10 +375,6 @@ class Socket : public SystemObject { return uncore_; } - int32 apicId() const { - return apicID_; - } - int32 socketID() const { return logicalID_; } @@ -338,7 +388,6 @@ class Socket : public SystemObject { PCM* pcm_; Core* refCore_; Uncore* uncore_; - int32 apicID_; int32 logicalID_; }; @@ -361,29 +410,29 @@ class SystemRoot : public SystemObject { v.dispatch( *this ); } - void addSocket( int32 apic_id, int32 logical_id ) { - Socket* s = new Socket( pcm_, apic_id, logical_id ); + void addSocket( int32 logical_id ) { + Socket* s = new Socket( pcm_, logical_id ); sockets_.push_back( s ); } // osID is the expected os_id, this is used in case te.os_id = -1 (offlined core) void addThread( int32 osID, TopologyEntry& te ) { + // std::cerr << "SystemRoot::addThread: coreid: " << te.core_id << ", module_id: " << te.module_id << ", tile_id: " << te.tile_id << ", die_id: " << te.die_id << ", die_grp_id: " << te.die_grp_id << ", socket_id: " << te.socket_id << ", os_id: " << osID << "\n"; // quick check during development to see if expected osId == te.os_id for onlined cores // assert( te.os_id != -1 && osID == te.os_id ); bool entryAdded = false; for ( auto& socket : sockets_ ) { - if ( socket->apicId() == te.socket_id ) { + if ( socket->socketID() == te.socket_id ) { Core* core = nullptr; - if ( (core = socket->findCoreByTileID( te.tile_id )) == nullptr ) { - // std::cerr << "SystemRoot::addThread: " << te.tile_id << ", " << osID << "\n"; - core = new Core( pcm_, te.core_id, te.tile_id, te.socket_id ); + if ( (core = socket->findCoreByTopologyEntry( te )) == nullptr ) { + core = new Core( pcm_ ); // std::cerr << "new Core ThreadID: " << te.thread_id << "\n"; - core->addHyperThreadInfo( te.thread_id, osID ); + core->addHyperThreadInfo( osID, te ); socket->addCore( core ); // std::cerr << "Added core " << te.core_id << " with os_id " << osID << ", threadid " << te.thread_id << " and tileid " << te.tile_id << " to socket " << te.socket_id << ".\n"; } else { // std::cerr << "existing Core ThreadID: " << te.thread_id << "\n"; - core->addHyperThreadInfo( te.thread_id, osID ); + core->addHyperThreadInfo( osID, te ); // std::cerr << "Augmented core " << te.core_id << " with osID " << osID << " and threadid " << te.thread_id << " for the hyperthread to socket " << te.socket_id << ".\n"; } entryAdded = true; @@ -393,7 +442,7 @@ class SystemRoot : public SystemObject { if ( !entryAdded ) { // if ( te.os_id == -1 ) // std::cerr << "TE not added because os_id == -1, core is offline\n"; - offlinedThreadsAtStart_.push_back( new HyperThread( pcm_, -1, osID, Status::Offline ) ); + offlinedThreadsAtStart_.push_back( new HyperThread( pcm_, osID, te, Status::Offline ) ); } } @@ -446,7 +495,7 @@ class SystemRoot : public SystemObject { /* Method used here: while walking the tree and iterating the vector * elements, collect the counters. Once all elements have been walked - * the vectors are filled with the aggregates. + * the vectors contain the aggregates. */ class Aggregator : Visitor { @@ -547,4 +596,97 @@ class Aggregator : Visitor std::chrono::steady_clock::time_point dispatchedAt_{}; }; +/* Method used here: while walking the cores in the tree and iterating the + * vector elements, print the core related ids into a large string. Once all + * cores have been walked the vector of strings contains all ids. + */ +class TopologyPrinter : Visitor +{ +public: + TopologyPrinter() : wq_( WorkQueue::getInstance() ) + { + PCM* const pcm = PCM::getInstance(); + // Resize user provided vectors to the right size + threadIDsVector_.resize( pcm->getNumCores() ); + // Internal use only, need to be the same size as the user provided vectors + threadIDsFutures_.resize( pcm->getNumCores() ); + } + + virtual ~TopologyPrinter() { + wq_ = nullptr; + } + +public: + virtual void dispatch( SystemRoot const& syp ) override { + // std::cerr << "TopologyPrinter::dispatch( SystemRoot )\n"; + for ( auto* socket : syp.sockets() ) + socket->accept( *this ); + + auto tidFuturesIter = threadIDsFutures_.begin(); + auto tidIter = threadIDsVector_.begin(); + // int i; + // i = 0; + for ( ; tidFuturesIter != threadIDsFutures_.end() && tidIter != threadIDsVector_.end(); ++tidFuturesIter, ++tidIter ) { + // std::cerr << "Works tidFuture: " << ++i << "\n"; + (*tidIter) = (*tidFuturesIter).get(); + } + } + + virtual void dispatch( Socket* sop ) override { + // std::cerr << "TopologyPrinter::dispatch( Socket )\n"; + // Fetch Topology Data + for ( auto* core : sop->cores() ) + core->accept( *this ); + } + + virtual void dispatch( Core* cop ) override { + // std::cerr << "TopologyPrinter::dispatch( Core )\n"; + // Loop each HyperThread + for ( auto* thread : cop->threads() ) { + // Fetch the Topology Data + thread->accept( *this ); + } + } + + virtual void dispatch( HyperThread* htp ) override { + // std::cerr << "TopologyPrinter::dispatch( HyperThread )\n"; + // std::cerr << "Dispatch htp with osID=" << htp->osID() << "\n"; + auto job = new LambdaJob( + []( HyperThread* h ) -> std::string { + DBG( 5, "Lambda fetching Topology Data async" ); + std::string s; + if ( !h->isOnline() ) + return s; + return h->topologyDataString(); + }, htp + ); + threadIDsFutures_[ htp->osID() ] = job->getFuture(); + wq_->addWork( job ); + } + + virtual void dispatch( ServerUncore* /*sup*/ ) override { + // std::cerr << "TopologyPrinter::dispatch( ServerUncore )\n"; + } + + virtual void dispatch( ClientUncore* /*cup*/ ) override { + // std::cerr << "TopologyPrinter::dispatch( ClientUncore )\n"; + } + + std::vector & topologyDataStrings( void ) { + return threadIDsVector_; + } + + std::chrono::steady_clock::time_point dispatchedAt( void ) const { + return dispatchedAt_; + } + +private: + WorkQueue* wq_; + std::vector threadIDsVector_; + std::vector> threadIDsFutures_; + std::chrono::steady_clock::time_point dispatchedAt_{}; +}; + +bool TopologyStringCompare( const std::string& topology1, const std::string& topology2 ); + } // namespace pcm diff --git a/src/topologyentry.h b/src/topologyentry.h index d86f9f9b..43608107 100644 --- a/src/topologyentry.h +++ b/src/topologyentry.h @@ -67,6 +67,24 @@ struct PCM_API TopologyEntry // describes a core } return "unknown"; } + bool isSameSocket( TopologyEntry& te ) { + return this->socket_id == te.socket_id; + } + bool isSameDieGroup( TopologyEntry& te ) { + return this->die_grp_id == te.die_grp_id && isSameSocket(te); + } + bool isSameDie( TopologyEntry& te ) { + return this->die_id == te.die_id && isSameDieGroup(te); + } + bool isSameTile( TopologyEntry& te ) { + return this->tile_id == te.tile_id && isSameDie(te); + } + bool isSameModule( TopologyEntry& te ) { + return this->module_id == te.module_id && isSameTile (te); + } + bool isSameCore( TopologyEntry& te ) { + return this->core_id == te.core_id && isSameModule(te); + } }; inline void fillEntry(TopologyEntry & entry, const uint32 & smtMaskWidth, const uint32 & coreMaskWidth, const uint32 & l2CacheMaskShift, const int apic_id)