From d2948664be4776401582eed7de14282c3d722ea4 Mon Sep 17 00:00:00 2001 From: kannibalox Date: Fri, 7 Sep 2018 21:32:11 +0000 Subject: [PATCH 1/6] torque: add prometheus exporter --- docs/advanced-queue.rst | 13 ++ pavement.py | 1 + src/pyrocore/daemon/exporter.py | 180 ++++++++++++++++++++++++++++ src/pyrocore/data/config/torque.ini | 18 +++ 4 files changed, 212 insertions(+) create mode 100644 src/pyrocore/daemon/exporter.py diff --git a/docs/advanced-queue.rst b/docs/advanced-queue.rst index b7f2e588..70251de6 100644 --- a/docs/advanced-queue.rst +++ b/docs/advanced-queue.rst @@ -354,3 +354,16 @@ connection to rTorrent, and logs some statistical information. You can change it to run only hourly by adding this to the configuration: ``job.connstats.schedule = hour=*`` + +**RtorrentExporter** + +``pyrocore.daemon.exporter:RtorrentExporter`` exports statistics in +a format suitable for scraping by Prometheus_. It supports scraping +metrics related to trackers, system stats, and arbitrary item +attributes. Be aware that scraping item attributes can introduce high +levels of cardinality into your Prometheus instance. + +Note that stats are updated every ``job.«NAME».schedule`` run, not +when the scrape is actually performed by Prometheus. + +.. _Prometheus: https://prometheus.io/ diff --git a/pavement.py b/pavement.py index 62323325..69732211 100644 --- a/pavement.py +++ b/pavement.py @@ -122,6 +122,7 @@ "templating": ["Tempita>=0.5.1"], "pyrotorque": ["APScheduler>=2.0.2,<3"], "pyrotorque.httpd": ["waitress>=0.8.2", "WebOb>=1.2.3", "psutil>=0.6.1"], + "pyrotorque.exporter": ["prometheus_client>=0.3.1"], "FlexGet": ["flexget>=1.0"], "http": ['requests'], "https": ['requests[security]'], diff --git a/src/pyrocore/daemon/exporter.py b/src/pyrocore/daemon/exporter.py new file mode 100644 index 00000000..50b75204 --- /dev/null +++ b/src/pyrocore/daemon/exporter.py @@ -0,0 +1,180 @@ +import threading +from time import sleep +from collections import Counter + +from pyrocore import config as config_ini +from pyrocore.util import pymagic +from pyrobase.parts import Bunch + +from prometheus_client import start_http_server, Gauge + +class ClientServer(threading.Thread): + def __init__(self, port): + super(ClientServer, self).__init__() + self.port = int(port) + + def run(self): + start_http_server(self.port) + +class RtorrentExporter(object): + """ Expose rTorrent and host statistics for scraping by a Prometheus instance. + """ + + def __init__(self, config=None): + """ Set up RtorrentExporter. + """ + self.config = config or Bunch() + self.LOG = pymagic.get_class_logger(self) + if 'log_level' in self.config: + self.LOG.setLevel(config.log_level) + self.LOG.debug("RtorrentExporter created with config %r" % self.config) + self.prefix = self.config.get('prefix', 'rtorrent_') + self.proxy = config_ini.engine.open() + self.jobs = [] + jobs_init = { + 'tracker': self._init_tracker_stats, + 'system': self._init_system_stats, + 'item': self._init_item_stats + } + for j in self.config.get('jobs', 'system').split(','): + if j in ['tracker', 'system', 'item']: + self.jobs.append(j) + jobs_init[j]() + else: + self.LOG.error("Unknown job '{}' requested, not initializing it".format(j)) + if not self.jobs: + raise RuntimeError("Job configuration '{}' contained no valid jobs".format(self.config.get('jobs'))) + # Start the server right off the bat + self.prom_thread = ClientServer(self.config.get('port', '8000')) + self.prom_thread.start() + + + def run(self): + """Update any defined metrics + """ + # Update requested stats + jobs = { + 'tracker': self._fetch_tracker_stats, + 'system': self._fetch_system_stats, + 'item': self._fetch_item_stats + } + for j in self.jobs: + jobs[j]() + + def _init_item_stats(self): + available_methods = set(self.proxy.system.listMethods()) + if 'item_stats' in self.config: + item_stat_methods = self.config['item_stats'].split(',') + item_stat_methods = set(item_stat_methods) & available_methods + else: + item_stat_methods = ("down.total", "up.total") + if 'item_labels' in self.config: + item_labels = self.config['item_labels'].split(',') + self.item_labels = list(set(item_labels) & available_methods) + else: + self.item_labels = ["hash", "name"] + self.item_stats = {} + for m in item_stat_methods: + self.item_stats[m] = Gauge(self.prefix + "item_" + m.replace('.', '_'), m, self.item_labels) + + def _fetch_item_stats(self): + """Use d.multicall2 to + """ + calls = ["d."+m+"=" for m in list(self.item_stats.keys()) + self.item_labels] + result = self.proxy.d.multicall2('', "main", *calls) + for i in result: + info = dict(list(zip(list(self.item_stats.keys()) + self.item_labels, i))) + for stat, gauge in self.item_stats.items(): + gauge.labels(*[info[l] for l in self.item_labels]).set(info[stat]) + + def _init_tracker_stats(self): + """Initialize the tracker gauges + """ + self.tracker_gauge = Gauge(self.prefix + 'tracker_amount', 'Number of torrents belonging to a specific tracker', ['alias']) + self.tracker_error_gauge = Gauge(self.prefix + 'tracker_errors', + 'Number of torrents with tracker errors belonging to a specific tracker', ['alias']) + + + def _fetch_tracker_stats(self): + """Scrape tracker metrics from item information + """ + item_fields = ["d.tracker_domain=", "d.message="] + + result = self.proxy.d.multicall("main", *item_fields) + + trackers = Counter([config_ini.map_announce2alias(d[0]) for d in result]) + tracker_errors = Counter([config_ini.map_announce2alias(d[0]) for d in result if d[1]]) + + for k, v in trackers.items(): + self.tracker_gauge.labels(k).set(v) + for k in trackers.keys(): # Use the tracker keys to make sure all active trackers get a value + self.tracker_error_gauge.labels(k).set(tracker_errors[k]) + + def _init_system_stats(self): + """Initialize the system gauges + """ + stat_methods = [ + "throttle.global_up.rate", "throttle.global_up.max_rate", "throttle.global_up.total", + "throttle.global_down.rate", "throttle.global_down.max_rate", "throttle.global_down.total", + "pieces.stats_not_preloaded", "pieces.stats_preloaded", + "system.files.opened_counter", "system.files.failed_counter", "system.files.closed_counter", + "pieces.memory.block_count", "pieces.memory.current", "pieces.memory.max", + "network.open_sockets", "pieces.sync.queue_size", + "pieces.stats.total_size", "pieces.preload.type", + "pieces.preload.min_size", "pieces.preload.min_rate", + "pieces.memory.sync_queue", "network.max_open_files", + "network.max_open_sockets", "network.http.max_open", + "throttle.max_downloads.global", "throttle.max_uploads.global", + "startup_time", "network.http.current_open" + ] + + info_methods = ['system.client_version', 'system.library_version'] + + self.system_stats = {} + for m in set(stat_methods) & set(self.proxy.system.listMethods()): # Strip out any methods that aren't available on the system + self.system_stats[m] = Gauge(self.prefix + m.replace('.', '_'), m) + self.system_info = Gauge(self.prefix + "info", "rTorrent platform information", [m.replace('.','_') for m in info_methods]) + self.system_view_size = Gauge(self.prefix + "view_size", "Size of rtorrent views", ["view"]) + + def _fetch_system_stats(self): + """Scrape system and view statistics + """ + info_methods = ['system.client_version', 'system.library_version'] + views = self.proxy.view.list() + + # Get data via multicall + # Sort the system stats because we can't trust py2 keys() to be deterministic + calls = [dict(methodName=method, params=[]) for method in sorted(self.system_stats.keys())] \ + + [dict(methodName=method, params=[]) for method in info_methods] \ + + [dict(methodName="view.size", params=['', view]) for view in views] + + result = self.proxy.system.multicall(calls, flatten=True) + + # Get numeric metrics + for m in sorted(self.system_stats.keys()): + self.system_stats[m].set(result[0]) + del result[0] + + # Get text-like information + info_methods = [m.replace('.', '_') for m in info_methods] + self.system_info.labels(*result[0:len(result)-len(views)]).set(1) + result = result[-len(views):] + + # Get view information + for v in views: + self.system_view_size.labels(v).set(result[0]) + del result[0] + + +def module_test(): + from pyrocore import connect + engine = connect() + + i = RtorrentExporter(Bunch(jobs="system,tracker,item",port="8100")) + i.proxy = engine.open() + while True: + i.run() + sleep(5) + +if __name__ == '__main__': + module_test() diff --git a/src/pyrocore/data/config/torque.ini b/src/pyrocore/data/config/torque.ini index 91f8d73e..0ecda217 100644 --- a/src/pyrocore/data/config/torque.ini +++ b/src/pyrocore/data/config/torque.ini @@ -122,3 +122,21 @@ job.treewatch.load_mode = normal ; Queue mode means "start" items keep their normal prio ; (it's NOT set to "off", but they're also not immediately started) job.treewatch.queued = False + +# Metric exporter +job.exporter.handler = pyrocore.daemon.exporter:RtorrentExporter +job.exporter.schedule = second=*/15 +job.exporter.active = False +job.exporter.dry_run = False +job.exporter.quiet = False +;job.exporter.dry_run = DEBUG +; Comma-separated jobs to run, valid options are: system, tracker, item +job.exporter.jobs = system +; The port to listen on +job.exporter.port = 8000 +; The prefix to add to any metric names +job.exporter.prefix = rtorrent_ +; Comma-separated list of attributes fetched from items +job.exporter.item_stats = down.total,up.total +; Comma-separated list of attributes applied to items' labels +job.exporter.item_labels = hash,name From 88ed6cc4f3f4b8e5bd0b22d24134cb18ae86187c Mon Sep 17 00:00:00 2001 From: kannibalox Date: Sat, 10 Nov 2018 00:48:43 +0000 Subject: [PATCH 2/6] Use the python prometheus client appropriately Previously the exporter assumed it was rtorrent's main process and would continue to server the same stats --- src/pyrocore/daemon/exporter.py | 195 ++++++++++++++++---------------- 1 file changed, 97 insertions(+), 98 deletions(-) diff --git a/src/pyrocore/daemon/exporter.py b/src/pyrocore/daemon/exporter.py index 50b75204..d0f8ad92 100644 --- a/src/pyrocore/daemon/exporter.py +++ b/src/pyrocore/daemon/exporter.py @@ -7,6 +7,7 @@ from pyrobase.parts import Bunch from prometheus_client import start_http_server, Gauge +from prometheus_client.core import GaugeMetricFamily, REGISTRY class ClientServer(threading.Thread): def __init__(self, port): @@ -16,103 +17,72 @@ def __init__(self, port): def run(self): start_http_server(self.port) -class RtorrentExporter(object): - """ Expose rTorrent and host statistics for scraping by a Prometheus instance. - """ - - def __init__(self, config=None): - """ Set up RtorrentExporter. - """ - self.config = config or Bunch() - self.LOG = pymagic.get_class_logger(self) - if 'log_level' in self.config: - self.LOG.setLevel(config.log_level) - self.LOG.debug("RtorrentExporter created with config %r" % self.config) +class RtorrentCollector(object): + def __init__(self, proxy, config): + self.proxy = proxy + self.config = config self.prefix = self.config.get('prefix', 'rtorrent_') - self.proxy = config_ini.engine.open() - self.jobs = [] - jobs_init = { - 'tracker': self._init_tracker_stats, - 'system': self._init_system_stats, - 'item': self._init_item_stats - } - for j in self.config.get('jobs', 'system').split(','): - if j in ['tracker', 'system', 'item']: - self.jobs.append(j) - jobs_init[j]() - else: - self.LOG.error("Unknown job '{}' requested, not initializing it".format(j)) - if not self.jobs: - raise RuntimeError("Job configuration '{}' contained no valid jobs".format(self.config.get('jobs'))) - # Start the server right off the bat - self.prom_thread = ClientServer(self.config.get('port', '8000')) - self.prom_thread.start() - def run(self): - """Update any defined metrics - """ - # Update requested stats - jobs = { - 'tracker': self._fetch_tracker_stats, - 'system': self._fetch_system_stats, - 'item': self._fetch_item_stats - } - for j in self.jobs: - jobs[j]() + def collect(self): + raise NotImplementedError + +class RtorrentItemCollector(RtorrentCollector): + def __init__(self, proxy, config): + super(RtorrentItemCollector, self).__init__(proxy, config) - def _init_item_stats(self): available_methods = set(self.proxy.system.listMethods()) - if 'item_stats' in self.config: - item_stat_methods = self.config['item_stats'].split(',') - item_stat_methods = set(item_stat_methods) & available_methods + if 'item-stats' in self.config: + self.item_stat_methods = set(self.config['item-stats'].split(',')) & available_methods else: - item_stat_methods = ("down.total", "up.total") - if 'item_labels' in self.config: - item_labels = self.config['item_labels'].split(',') - self.item_labels = list(set(item_labels) & available_methods) + self.item_stat_methods = ("down.total", "up.total") + if 'item-labels' in self.config: + self.item_labels = list(set(self.config['item-labels'].split(',')) & available_methods) else: self.item_labels = ["hash", "name"] - self.item_stats = {} - for m in item_stat_methods: - self.item_stats[m] = Gauge(self.prefix + "item_" + m.replace('.', '_'), m, self.item_labels) - def _fetch_item_stats(self): - """Use d.multicall2 to - """ - calls = ["d."+m+"=" for m in list(self.item_stats.keys()) + self.item_labels] - result = self.proxy.d.multicall2('', "main", *calls) + + def collect(self): + calls = ["d."+m+"=" for m in list(self.item_stat_methods) + self.item_labels] + result = self.proxy.d.multicall("main", *calls) + item_stats = {} + for stat in self.item_stat_methods: + item_stats[stat] = GaugeMetricFamily(self.prefix + stat.replace('.', '_'), stat, labels=self.item_labels) for i in result: - info = dict(list(zip(list(self.item_stats.keys()) + self.item_labels, i))) - for stat, gauge in self.item_stats.items(): - gauge.labels(*[info[l] for l in self.item_labels]).set(info[stat]) + info = dict(list(zip(list(self.item_stat_methods) + self.item_labels, i))) + for stat, gauge in item_stats.items(): + gauge.add_metric([info[l] for l in self.item_labels], info[stat]) + for stat, guage in item_stats.items(): + yield guage - def _init_tracker_stats(self): - """Initialize the tracker gauges - """ - self.tracker_gauge = Gauge(self.prefix + 'tracker_amount', 'Number of torrents belonging to a specific tracker', ['alias']) - self.tracker_error_gauge = Gauge(self.prefix + 'tracker_errors', - 'Number of torrents with tracker errors belonging to a specific tracker', ['alias']) +class RtorrentTrackerCollector(RtorrentCollector): + def __init__(self, proxy, config): + super(RtorrentTrackerCollector, self).__init__(proxy, config) - def _fetch_tracker_stats(self): - """Scrape tracker metrics from item information - """ - item_fields = ["d.tracker_domain=", "d.message="] + def collect(self): + tracker_gauge = GaugeMetricFamily(self.prefix + 'tracker_amount', + 'Number of torrents belonging to a specific tracker', labels=['alias']) + tracker_error_gauge = GaugeMetricFamily(self.prefix + 'tracker_errors', + 'Number of torrents with tracker errors belonging to a specific tracker', labels=['alias']) + item_fields = ["d.tracker_domain=", "d.message="] result = self.proxy.d.multicall("main", *item_fields) trackers = Counter([config_ini.map_announce2alias(d[0]) for d in result]) tracker_errors = Counter([config_ini.map_announce2alias(d[0]) for d in result if d[1]]) for k, v in trackers.items(): - self.tracker_gauge.labels(k).set(v) + tracker_gauge.add_metric([k], v) for k in trackers.keys(): # Use the tracker keys to make sure all active trackers get a value - self.tracker_error_gauge.labels(k).set(tracker_errors[k]) + tracker_error_gauge.add_metric([k], tracker_errors[k]) - def _init_system_stats(self): - """Initialize the system gauges - """ + yield tracker_gauge + yield tracker_error_gauge + +class RtorrentSystemCollector(RtorrentCollector): + def __init__(self, proxy, config): + super(RtorrentSystemCollector, self).__init__(proxy, config) stat_methods = [ "throttle.global_up.rate", "throttle.global_up.max_rate", "throttle.global_up.total", "throttle.global_down.rate", "throttle.global_down.max_rate", "throttle.global_down.total", @@ -128,53 +98,82 @@ def _init_system_stats(self): "startup_time", "network.http.current_open" ] - info_methods = ['system.client_version', 'system.library_version'] + self.info_methods = ['system.client_version', 'system.library_version'] - self.system_stats = {} - for m in set(stat_methods) & set(self.proxy.system.listMethods()): # Strip out any methods that aren't available on the system - self.system_stats[m] = Gauge(self.prefix + m.replace('.', '_'), m) - self.system_info = Gauge(self.prefix + "info", "rTorrent platform information", [m.replace('.','_') for m in info_methods]) - self.system_view_size = Gauge(self.prefix + "view_size", "Size of rtorrent views", ["view"]) + # Strip out unavailable methods + self.system_stats = set(stat_methods) & set(self.proxy.system.listMethods()) - def _fetch_system_stats(self): - """Scrape system and view statistics - """ - info_methods = ['system.client_version', 'system.library_version'] + def collect(self): + system_info = GaugeMetricFamily(self.prefix + "info", "rTorrent platform information", labels=[m.replace('.','_') for m in self.info_methods]) + system_view_size = GaugeMetricFamily(self.prefix + "view_size", "Size of rtorrent views", labels=["view"]) views = self.proxy.view.list() # Get data via multicall - # Sort the system stats because we can't trust py2 keys() to be deterministic - calls = [dict(methodName=method, params=[]) for method in sorted(self.system_stats.keys())] \ - + [dict(methodName=method, params=[]) for method in info_methods] \ + calls = [dict(methodName=method, params=[]) for method in sorted(self.system_stats)] \ + + [dict(methodName=method, params=[]) for method in self.info_methods] \ + [dict(methodName="view.size", params=['', view]) for view in views] result = self.proxy.system.multicall(calls, flatten=True) # Get numeric metrics - for m in sorted(self.system_stats.keys()): - self.system_stats[m].set(result[0]) + for m in sorted(self.system_stats): + yield GaugeMetricFamily(self.prefix + m.replace('.', '_'), m, value=result[0]) del result[0] # Get text-like information - info_methods = [m.replace('.', '_') for m in info_methods] - self.system_info.labels(*result[0:len(result)-len(views)]).set(1) + info_methods = [m.replace('.', '_') for m in self.info_methods] + system_info.add_metric(result[0:len(result)-len(views)], 1) + yield system_info result = result[-len(views):] # Get view information for v in views: - self.system_view_size.labels(v).set(result[0]) + system_view_size.add_metric([v], result[0]) del result[0] -def module_test(): +class RtorrentExporter(object): + """ Expose rTorrent and host statistics for scraping by a Prometheus instance. + """ + + def __init__(self, config=None): + """ Set up RtorrentExporter. + """ + self.config = config or Bunch() + self.LOG = pymagic.get_class_logger(self) + if 'log_level' in self.config: + self.LOG.setLevel(config.log_level) + self.LOG.debug("RtorrentExporter created with config %r" % self.config) + self.prefix = self.config.get('prefix', 'rtorrent_') + self.proxy = config_ini.engine.open() + self.system_stats_initialized = False + jobs = { + "item": RtorrentItemCollector, + "tracker": RtorrentTrackerCollector, + "system": RtorrentSystemCollector + } + for j in self.config.get('jobs', 'system').split(','): + j = j.strip() + if j not in jobs: + self.LOG.error("Job {} not found, skipping".format(j)) + else: + REGISTRY.register(jobs[j](self.proxy, self.config)) + + # Start the server right off the bat + self.prom_thread = ClientServer(self.config.get('port', '8000')) + self.prom_thread.start() + + + def run(self): + # NOOP, stats are generated at scrape time + pass + +if __name__ == '__main__': from pyrocore import connect engine = connect() - i = RtorrentExporter(Bunch(jobs="system,tracker,item",port="8100")) + i = RtorrentExporter(Bunch(jobs="system,tracker,item", port=8005)) i.proxy = engine.open() while True: i.run() sleep(5) - -if __name__ == '__main__': - module_test() From 457d83da0667ae6a0b0f3c90a4d61f61a9ed3316 Mon Sep 17 00:00:00 2001 From: kannibalox Date: Sat, 10 Nov 2018 00:52:58 +0000 Subject: [PATCH 3/6] Update documentation to match reality --- src/pyrocore/data/config/torque.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pyrocore/data/config/torque.ini b/src/pyrocore/data/config/torque.ini index 0ecda217..d9a2ab05 100644 --- a/src/pyrocore/data/config/torque.ini +++ b/src/pyrocore/data/config/torque.ini @@ -125,7 +125,8 @@ job.treewatch.queued = False # Metric exporter job.exporter.handler = pyrocore.daemon.exporter:RtorrentExporter -job.exporter.schedule = second=*/15 +; The schedule doesn't matters, metrics are update at scrape time +job.exporter.schedule = hour=* job.exporter.active = False job.exporter.dry_run = False job.exporter.quiet = False From c3d7575ac3da6276b68f387e02aa1688975e2d8e Mon Sep 17 00:00:00 2001 From: kannibalox Date: Thu, 17 Jan 2019 16:28:34 +0000 Subject: [PATCH 4/6] Spelling fixes --- src/pyrocore/data/config/torque.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyrocore/data/config/torque.ini b/src/pyrocore/data/config/torque.ini index d9a2ab05..e91b1585 100644 --- a/src/pyrocore/data/config/torque.ini +++ b/src/pyrocore/data/config/torque.ini @@ -125,7 +125,7 @@ job.treewatch.queued = False # Metric exporter job.exporter.handler = pyrocore.daemon.exporter:RtorrentExporter -; The schedule doesn't matters, metrics are update at scrape time +; The schedule doesn't matter, metrics are updated at scrape time job.exporter.schedule = hour=* job.exporter.active = False job.exporter.dry_run = False From 8d8d8458469d63d458fe676b91a96784aeaf63f8 Mon Sep 17 00:00:00 2001 From: kannibalox Date: Thu, 17 Jan 2019 16:39:48 +0000 Subject: [PATCH 5/6] Further update documentation to match reality --- docs/advanced-queue.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/advanced-queue.rst b/docs/advanced-queue.rst index 70251de6..48f255f5 100644 --- a/docs/advanced-queue.rst +++ b/docs/advanced-queue.rst @@ -363,7 +363,7 @@ metrics related to trackers, system stats, and arbitrary item attributes. Be aware that scraping item attributes can introduce high levels of cardinality into your Prometheus instance. -Note that stats are updated every ``job.«NAME».schedule`` run, not -when the scrape is actually performed by Prometheus. +Note that stats are updated whenever the scrape is performed, i.e. +``job.«NAME».schedule`` has no bearing on when the metrics are updated. .. _Prometheus: https://prometheus.io/ From 2344723332e4970b4fcad994a88a734f342557cf Mon Sep 17 00:00:00 2001 From: kannibalox Date: Thu, 17 Jan 2019 16:40:07 +0000 Subject: [PATCH 6/6] Make sure view sizes are being exported Also remove unused line --- src/pyrocore/daemon/exporter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/pyrocore/daemon/exporter.py b/src/pyrocore/daemon/exporter.py index d0f8ad92..af394ba7 100644 --- a/src/pyrocore/daemon/exporter.py +++ b/src/pyrocore/daemon/exporter.py @@ -121,7 +121,6 @@ def collect(self): del result[0] # Get text-like information - info_methods = [m.replace('.', '_') for m in self.info_methods] system_info.add_metric(result[0:len(result)-len(views)], 1) yield system_info result = result[-len(views):] @@ -130,7 +129,7 @@ def collect(self): for v in views: system_view_size.add_metric([v], result[0]) del result[0] - + yield system_view_size class RtorrentExporter(object): """ Expose rTorrent and host statistics for scraping by a Prometheus instance.