/* * This file is part of the AzerothCore Project. See AUTHORS file for Copyright information * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License as published by the * Free Software Foundation; either version 3 of the License, or (at your * option) any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for * more details. * * You should have received a copy of the GNU General Public License along * with this program. If not, see . */ #include "Metric.h" #include "Config.h" #include "Log.h" #include "SteadyTimer.h" #include "Strand.h" #include "Tokenize.h" #include #include Metric::Metric() { } Metric::~Metric() { } Metric* Metric::instance() { static Metric instance; return &instance; } void Metric::Initialize(std::string const& realmName, Acore::Asio::IoContext& ioContext, std::function overallStatusLogger) { _dataStream = std::make_unique(); _realmName = FormatInfluxDBTagValue(realmName); _batchTimer = std::make_unique(ioContext); _overallStatusTimer = std::make_unique(ioContext); _overallStatusLogger = overallStatusLogger; LoadFromConfigs(); } bool Metric::Connect() { auto& stream = static_cast(GetDataStream()); stream.connect(_hostname, _port); auto error = stream.error(); if (error) { LOG_ERROR("metric", "Error connecting to '{}:{}', disabling Metric. Error message: {}", _hostname, _port, error.message()); _enabled = false; return false; } stream.clear(); return true; } void Metric::LoadFromConfigs() { bool previousValue = _enabled; _enabled = sConfigMgr->GetOption("Metric.Enable", false); _updateInterval = sConfigMgr->GetOption("Metric.Interval", 1); if (_updateInterval < 1) { LOG_ERROR("metric", "'Metric.Interval' config set to {}, overriding to 1.", _updateInterval); _updateInterval = 1; } _overallStatusTimerInterval = sConfigMgr->GetOption("Metric.OverallStatusInterval", 1); if (_overallStatusTimerInterval < 1) { LOG_ERROR("metric", "'Metric.OverallStatusInterval' config set to {}, overriding to 1.", _overallStatusTimerInterval); _overallStatusTimerInterval = 1; } _thresholds.clear(); std::vector thresholdSettings = sConfigMgr->GetKeysByString("Metric.Threshold."); for (std::string const& thresholdSetting : thresholdSettings) { int64 thresholdValue = sConfigMgr->GetOption(thresholdSetting, 0); std::string thresholdName = thresholdSetting.substr(strlen("Metric.Threshold.")); _thresholds[thresholdName] = thresholdValue; } // Schedule a send at this point only if the config changed from Disabled to Enabled. // Cancel any scheduled operation if the config changed from Enabled to Disabled. if (_enabled && !previousValue) { std::string connectionInfo = sConfigMgr->GetOption("Metric.InfluxDB.Connection", ""); if (connectionInfo.empty()) { LOG_ERROR("metric", "Metric.InfluxDB.Connection not specified in configuration file."); return; } std::vector tokens = Acore::Tokenize(connectionInfo, ';', true); _useV2 = sConfigMgr->GetOption("Metric.InfluxDB.v2", false); if (_useV2) { if (tokens.size() != 2) { LOG_ERROR("metric", "Metric.InfluxDB.Connection specified with wrong format in configuration file. (hostname;port)"); return; } _hostname.assign(tokens[0]); _port.assign(tokens[1]); _org = sConfigMgr->GetOption("Metric.InfluxDB.Org", ""); _bucket = sConfigMgr->GetOption("Metric.InfluxDB.Bucket", ""); _token = sConfigMgr->GetOption("Metric.InfluxDB.Token", ""); if (_org.empty() || _bucket.empty() || _token.empty()) { LOG_ERROR("metric", "InfluxDB v2 parameters missing: org, bucket, or token."); return; } } else { if (tokens.size() != 3) { LOG_ERROR("metric", "Metric.InfluxDB.Connection specified with wrong format in configuration file. (hostname;port;database)"); return; } _hostname.assign(tokens[0]); _port.assign(tokens[1]); _databaseName.assign(tokens[2]); } Connect(); ScheduleSend(); ScheduleOverallStatusLog(); } } void Metric::Update() { if (_overallStatusTimerTriggered) { _overallStatusTimerTriggered = false; _overallStatusLogger(); } } bool Metric::ShouldLog(std::string const& category, int64 value) const { auto threshold = _thresholds.find(category); if (threshold == _thresholds.end()) { return false; } return value >= threshold->second; } void Metric::LogEvent(std::string const& category, std::string const& title, std::string const& description) { using namespace std::chrono; MetricData* data = new MetricData; data->Category = category; data->Timestamp = system_clock::now(); data->Type = METRIC_DATA_EVENT; data->Title = title; data->Text = description; _queuedData.Enqueue(data); } void Metric::SendBatch() { using namespace std::chrono; std::stringstream batchedData; MetricData* data; bool firstLoop = true; while (_queuedData.Dequeue(data)) { if (!firstLoop) batchedData << "\n"; batchedData << data->Category; if (!_realmName.empty()) batchedData << ",realm=" << _realmName; for (MetricTag const& tag : data->Tags) batchedData << "," << tag.first << "=" << FormatInfluxDBTagValue(tag.second); batchedData << " "; switch (data->Type) { case METRIC_DATA_VALUE: batchedData << "value=" << data->Value; break; case METRIC_DATA_EVENT: batchedData << "title=\"" << data->Title << "\",text=\"" << data->Text << "\""; break; } batchedData << " " << std::to_string(duration_cast(data->Timestamp.time_since_epoch()).count()); firstLoop = false; delete data; } // Check if there's any data to send if (batchedData.tellp() == std::streampos(0)) { ScheduleSend(); return; } if (!GetDataStream().good() && !Connect()) return; if (_useV2) { GetDataStream() << "POST " << "/api/v2/write?bucket=" << _bucket << "&org=" << _org << "&precision=ns HTTP/1.1\r\n"; GetDataStream() << "Host: " << _hostname << ":" << _port << "\r\n"; GetDataStream() << "Authorization: Token " << _token << "\r\n"; } else { GetDataStream() << "POST " << "/write?db=" << _databaseName << " HTTP/1.1\r\n"; GetDataStream() << "Host: " << _hostname << ":" << _port << "\r\n"; } GetDataStream() << "Accept: */*\r\n"; GetDataStream() << "Content-Type: application/octet-stream\r\n"; GetDataStream() << "Content-Transfer-Encoding: binary\r\n"; GetDataStream() << "Content-Length: " << std::to_string(batchedData.tellp()) << "\r\n\r\n"; GetDataStream() << batchedData.rdbuf(); std::string http_version; GetDataStream() >> http_version; unsigned int status_code = 0; GetDataStream() >> status_code; if (status_code != 204) { LOG_ERROR("metric", "Error sending data, returned HTTP code: {}", status_code); } // Read and ignore the status description std::string status_description; std::getline(GetDataStream(), status_description); // Read headers std::string header; while (std::getline(GetDataStream(), header) && header != "\r") { if (header == "Connection: close\r") { static_cast(GetDataStream()).close(); } } ScheduleSend(); } void Metric::ScheduleSend() { if (_enabled) { _batchTimer->expires_at(Acore::Asio::SteadyTimer::GetExpirationTime(_updateInterval)); _batchTimer->async_wait(std::bind(&Metric::SendBatch, this)); } else { static_cast(GetDataStream()).close(); MetricData* data; // Clear the queue while (_queuedData.Dequeue(data)) { delete data; } } } void Metric::Unload() { // Send what's queued only if IoContext is stopped (so only on shutdown) if (_enabled && Acore::Asio::get_io_context(*_batchTimer).stopped()) { _enabled = false; SendBatch(); } _batchTimer->cancel(); _overallStatusTimer->cancel(); } void Metric::ScheduleOverallStatusLog() { if (_enabled) { _overallStatusTimer->expires_at(Acore::Asio::SteadyTimer::GetExpirationTime(_overallStatusTimerInterval)); _overallStatusTimer->async_wait([this](const boost::system::error_code&) { _overallStatusTimerTriggered = true; ScheduleOverallStatusLog(); }); } } std::string Metric::FormatInfluxDBValue(bool value) { return value ? "t" : "f"; } template std::string Metric::FormatInfluxDBValue(T value) { return std::to_string(value) + 'i'; } std::string Metric::FormatInfluxDBValue(std::string const& value) { return '"' + boost::replace_all_copy(value, "\"", "\\\"") + '"'; } std::string Metric::FormatInfluxDBValue(char const* value) { return FormatInfluxDBValue(std::string(value)); } std::string Metric::FormatInfluxDBValue(double value) { return std::to_string(value); } std::string Metric::FormatInfluxDBValue(float value) { return FormatInfluxDBValue(double(value)); } std::string Metric::FormatInfluxDBTagValue(std::string const& value) { /// @todo: should handle '=' and ',' characters too return boost::replace_all_copy(value, " ", "\\ "); } std::string Metric::FormatInfluxDBValue(std::chrono::nanoseconds value) { return FormatInfluxDBValue(std::chrono::duration_cast(value).count()); } template AC_COMMON_API std::string Metric::FormatInfluxDBValue(int8); template AC_COMMON_API std::string Metric::FormatInfluxDBValue(uint8); template AC_COMMON_API std::string Metric::FormatInfluxDBValue(int16); template AC_COMMON_API std::string Metric::FormatInfluxDBValue(uint16); template AC_COMMON_API std::string Metric::FormatInfluxDBValue(int32); template AC_COMMON_API std::string Metric::FormatInfluxDBValue(uint32); template AC_COMMON_API std::string Metric::FormatInfluxDBValue(int64); template AC_COMMON_API std::string Metric::FormatInfluxDBValue(uint64);