From 1188f6a18df349d658d0a6d714cc295c63352a65 Mon Sep 17 00:00:00 2001 From: red Date: Thu, 21 Sep 2023 01:48:26 +0200 Subject: [PATCH 1/2] fix multiproc bug Signed-off-by: red --- prometheus_client/metrics.py | 468 ++++++++++++++++++++++------------- 1 file changed, 291 insertions(+), 177 deletions(-) diff --git a/prometheus_client/metrics.py b/prometheus_client/metrics.py index 66f51fa6..cdf77bb7 100644 --- a/prometheus_client/metrics.py +++ b/prometheus_client/metrics.py @@ -3,45 +3,59 @@ import time import types from typing import ( - Any, Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, - Type, TypeVar, Union, + Any, + Callable, + Dict, + Iterable, + List, + Literal, + Optional, + Sequence, + Tuple, + Type, + TypeVar, + Union, ) from . import values # retain this import style for testability from .context_managers import ExceptionCounter, InprogressTracker, Timer from .metrics_core import ( - Metric, METRIC_LABEL_NAME_RE, METRIC_NAME_RE, + Metric, + METRIC_LABEL_NAME_RE, + METRIC_NAME_RE, RESERVED_METRIC_LABEL_NAME_RE, ) from .registry import Collector, CollectorRegistry, REGISTRY from .samples import Exemplar, Sample from .utils import floatToGoString, INF -T = TypeVar('T', bound='MetricWrapperBase') +T = TypeVar("T", bound="MetricWrapperBase") F = TypeVar("F", bound=Callable[..., Any]) def _build_full_name(metric_type, name, namespace, subsystem, unit): - full_name = '' + full_name = "" if namespace: - full_name += namespace + '_' + full_name += namespace + "_" if subsystem: - full_name += subsystem + '_' + full_name += subsystem + "_" full_name += name - if metric_type == 'counter' and full_name.endswith('_total'): + if metric_type == "counter" and full_name.endswith("_total"): full_name = full_name[:-6] # Munge to OpenMetrics. if unit and not full_name.endswith("_" + unit): full_name += "_" + unit - if unit and metric_type in ('info', 'stateset'): - raise ValueError('Metric name is of a type that cannot have a unit: ' + full_name) + if unit and metric_type in ("info", "stateset"): + raise ValueError( + "Metric name is of a type that cannot have a unit: " + full_name + ) return full_name def _validate_labelname(l): if not METRIC_LABEL_NAME_RE.match(l): - raise ValueError('Invalid label metric name: ' + l) + raise ValueError("Invalid label metric name: " + l) if RESERVED_METRIC_LABEL_NAME_RE.match(l): - raise ValueError('Reserved label metric name: ' + l) + raise ValueError("Reserved label metric name: " + l) def _validate_labelnames(cls, labelnames): @@ -49,7 +63,7 @@ def _validate_labelnames(cls, labelnames): for l in labelnames: _validate_labelname(l) if l in cls._reserved_labelnames: - raise ValueError('Reserved label metric name: ' + l) + raise ValueError("Reserved label metric name: " + l) return labelnames @@ -60,11 +74,17 @@ def _validate_exemplar(exemplar): runes += len(k) runes += len(v) if runes > 128: - raise ValueError('Exemplar labels have %d UTF-8 characters, exceeding the limit of 128') + raise ValueError( + "Exemplar labels have %d UTF-8 characters, exceeding the limit of 128" + ) def _get_use_created() -> bool: - return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", 'False').lower() not in ('true', '1', 't') + return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", "False").lower() not in ( + "true", + "1", + "t", + ) _use_created = _get_use_created() @@ -85,7 +105,7 @@ def _raise_if_not_observable(self): # a counter, will fail if the metric is not observable, because only if a # metric is observable will the value be initialized. if not self._is_observable(): - raise ValueError('%s metric is missing label values' % str(self._type)) + raise ValueError("%s metric is missing label values" % str(self._type)) def _is_parent(self): return self._labelnames and not self._labelvalues @@ -109,16 +129,17 @@ def __repr__(self) -> str: metric_type = type(self) return f"{metric_type.__module__}.{metric_type.__name__}({self._name})" - def __init__(self: T, - name: str, - documentation: str, - labelnames: Iterable[str] = (), - namespace: str = '', - subsystem: str = '', - unit: str = '', - registry: Optional[CollectorRegistry] = REGISTRY, - _labelvalues: Optional[Sequence[str]] = None, - ) -> None: + def __init__( + self: T, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = "", + subsystem: str = "", + unit: str = "", + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + ) -> None: self._name = _build_full_name(self._type, name, namespace, subsystem, unit) self._labelnames = _validate_labelnames(self, labelnames) self._labelvalues = tuple(_labelvalues or ()) @@ -127,7 +148,7 @@ def __init__(self: T, self._unit = unit if not METRIC_NAME_RE.match(self._name): - raise ValueError('Invalid metric name: ' + self._name) + raise ValueError("Invalid metric name: " + self._name) if self._is_parent(): # Prepare the fields needed for child metrics. @@ -139,7 +160,10 @@ def __init__(self: T, if not self._labelvalues: # Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang. - if registry: + if registry and not ( + "prometheus_multiproc_dir" in os.environ + or "PROMETHEUS_MULTIPROC_DIR" in os.environ + ): registry.register(self) def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: @@ -166,24 +190,25 @@ def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels). """ if not self._labelnames: - raise ValueError('No label names were set when constructing %s' % self) + raise ValueError("No label names were set when constructing %s" % self) if self._labelvalues: - raise ValueError('{} already has labels set ({}); can not chain calls to .labels()'.format( - self, - dict(zip(self._labelnames, self._labelvalues)) - )) + raise ValueError( + "{} already has labels set ({}); can not chain calls to .labels()".format( + self, dict(zip(self._labelnames, self._labelvalues)) + ) + ) if labelvalues and labelkwargs: raise ValueError("Can't pass both *args and **kwargs") if labelkwargs: if sorted(labelkwargs) != sorted(self._labelnames): - raise ValueError('Incorrect label names') + raise ValueError("Incorrect label names") labelvalues = tuple(str(labelkwargs[l]) for l in self._labelnames) else: if len(labelvalues) != len(self._labelnames): - raise ValueError('Incorrect label count') + raise ValueError("Incorrect label count") labelvalues = tuple(str(l) for l in labelvalues) with self._lock: if labelvalues not in self._metrics: @@ -193,17 +218,20 @@ def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: labelnames=self._labelnames, unit=self._unit, _labelvalues=labelvalues, - **self._kwargs + **self._kwargs, ) return self._metrics[labelvalues] def remove(self, *labelvalues: Any) -> None: if not self._labelnames: - raise ValueError('No label names were set when constructing %s' % self) + raise ValueError("No label names were set when constructing %s" % self) """Remove the given labelset from the metric.""" if len(labelvalues) != len(self._labelnames): - raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues)) + raise ValueError( + "Incorrect label count (expected %d, got %s)" + % (len(self._labelnames), labelvalues) + ) labelvalues = tuple(str(l) for l in labelvalues) with self._lock: del self._metrics[labelvalues] @@ -225,10 +253,16 @@ def _multi_samples(self) -> Iterable[Sample]: for labels, metric in metrics.items(): series_labels = list(zip(self._labelnames, labels)) for suffix, sample_labels, value, timestamp, exemplar in metric._samples(): - yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar) + yield Sample( + suffix, + dict(series_labels + list(sample_labels.items())), + value, + timestamp, + exemplar, + ) def _child_samples(self) -> Iterable[Sample]: # pragma: no cover - raise NotImplementedError('_child_samples() must be implemented by %r' % self) + raise NotImplementedError("_child_samples() must be implemented by %r" % self) def _metric_init(self): # pragma: no cover """ @@ -236,7 +270,7 @@ def _metric_init(self): # pragma: no cover This is factored as a separate function to allow for deferred initialization. """ - raise NotImplementedError('_metric_init() must be implemented by %r' % self) + raise NotImplementedError("_metric_init() must be implemented by %r" % self) class Counter(MetricWrapperBase): @@ -271,24 +305,38 @@ def f(): with c.count_exceptions(ValueError): pass """ - _type = 'counter' + + _type = "counter" def _metric_init(self) -> None: - self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames, - self._labelvalues, self._documentation) + self._value = values.ValueClass( + self._type, + self._name, + self._name + "_total", + self._labelnames, + self._labelvalues, + self._documentation, + ) self._created = time.time() def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None: """Increment counter by the given amount.""" self._raise_if_not_observable() if amount < 0: - raise ValueError('Counters can only be incremented by non-negative amounts.') + raise ValueError( + "Counters can only be incremented by non-negative amounts." + ) self._value.inc(amount) if exemplar: _validate_exemplar(exemplar) self._value.set_exemplar(Exemplar(exemplar, amount, time.time())) - def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception) -> ExceptionCounter: + def count_exceptions( + self, + exception: Union[ + Type[BaseException], Tuple[Type[BaseException], ...] + ] = Exception, + ) -> ExceptionCounter: """Count exceptions in a block of code or function. Can be used as a function decorator or context manager. @@ -299,69 +347,74 @@ def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[Base return ExceptionCounter(self, exception) def _child_samples(self) -> Iterable[Sample]: - sample = Sample('_total', {}, self._value.get(), None, self._value.get_exemplar()) + sample = Sample( + "_total", {}, self._value.get(), None, self._value.get_exemplar() + ) if _use_created: - return ( - sample, - Sample('_created', {}, self._created, None, None) - ) + return (sample, Sample("_created", {}, self._created, None, None)) return (sample,) class Gauge(MetricWrapperBase): """Gauge metric, to report instantaneous values. - Examples of Gauges include: - - Inprogress requests - - Number of items in a queue - - Free memory - - Total memory - - Temperature + Examples of Gauges include: + - Inprogress requests + - Number of items in a queue + - Free memory + - Total memory + - Temperature - Gauges can go both up and down. + Gauges can go both up and down. - from prometheus_client import Gauge + from prometheus_client import Gauge - g = Gauge('my_inprogress_requests', 'Description of gauge') - g.inc() # Increment by 1 - g.dec(10) # Decrement by given value - g.set(4.2) # Set to a given value + g = Gauge('my_inprogress_requests', 'Description of gauge') + g.inc() # Increment by 1 + g.dec(10) # Decrement by given value + g.set(4.2) # Set to a given value - There are utilities for common use cases: + There are utilities for common use cases: - g.set_to_current_time() # Set to current unixtime + g.set_to_current_time() # Set to current unixtime - # Increment when entered, decrement when exited. - @g.track_inprogress() - def f(): - pass + # Increment when entered, decrement when exited. + @g.track_inprogress() + def f(): + pass - with g.track_inprogress(): - pass + with g.track_inprogress(): + pass - A Gauge can also take its value from a callback: + A Gauge can also take its value from a callback: - d = Gauge('data_objects', 'Number of objects') - my_dict = {} - d.set_function(lambda: len(my_dict)) + d = Gauge('data_objects', 'Number of objects') + my_dict = {} + d.set_function(lambda: len(my_dict)) """ - _type = 'gauge' - _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum')) - - def __init__(self, - name: str, - documentation: str, - labelnames: Iterable[str] = (), - namespace: str = '', - subsystem: str = '', - unit: str = '', - registry: Optional[CollectorRegistry] = REGISTRY, - _labelvalues: Optional[Sequence[str]] = None, - multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all', - ): + + _type = "gauge" + _MULTIPROC_MODES = frozenset( + ("all", "liveall", "min", "livemin", "max", "livemax", "sum", "livesum") + ) + + def __init__( + self, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = "", + subsystem: str = "", + unit: str = "", + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + multiprocess_mode: Literal[ + "all", "liveall", "min", "livemin", "max", "livemax", "sum", "livesum" + ] = "all", + ): self._multiprocess_mode = multiprocess_mode if multiprocess_mode not in self._MULTIPROC_MODES: - raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode) + raise ValueError("Invalid multiprocess mode: " + multiprocess_mode) super().__init__( name=name, documentation=documentation, @@ -372,12 +425,17 @@ def __init__(self, registry=registry, _labelvalues=_labelvalues, ) - self._kwargs['multiprocess_mode'] = self._multiprocess_mode + self._kwargs["multiprocess_mode"] = self._multiprocess_mode def _metric_init(self) -> None: self._value = values.ValueClass( - self._type, self._name, self._name, self._labelnames, self._labelvalues, - self._documentation, multiprocess_mode=self._multiprocess_mode + self._type, + self._name, + self._name, + self._labelnames, + self._labelvalues, + self._documentation, + multiprocess_mode=self._multiprocess_mode, ) def inc(self, amount: float = 1) -> None: @@ -414,7 +472,7 @@ def time(self) -> Timer: Can be used as a function decorator or context manager. """ - return Timer(self, 'set') + return Timer(self, "set") def set_function(self, f: Callable[[], float]) -> None: """Call the provided function to return the Gauge value. @@ -426,12 +484,12 @@ def set_function(self, f: Callable[[], float]) -> None: self._raise_if_not_observable() def samples(_: Gauge) -> Iterable[Sample]: - return (Sample('', {}, float(f()), None, None),) + return (Sample("", {}, float(f()), None, None),) self._child_samples = types.MethodType(samples, self) # type: ignore def _child_samples(self) -> Iterable[Sample]: - return (Sample('', {}, self._value.get(), None, None),) + return (Sample("", {}, self._value.get(), None, None),) class Summary(MetricWrapperBase): @@ -464,13 +522,27 @@ def create_response(request): with REQUEST_TIME.time(): pass # Logic to be timed """ - _type = 'summary' - _reserved_labelnames = ['quantile'] + + _type = "summary" + _reserved_labelnames = ["quantile"] def _metric_init(self) -> None: - self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames, - self._labelvalues, self._documentation) - self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) + self._count = values.ValueClass( + self._type, + self._name, + self._name + "_count", + self._labelnames, + self._labelvalues, + self._documentation, + ) + self._sum = values.ValueClass( + self._type, + self._name, + self._name + "_sum", + self._labelnames, + self._labelvalues, + self._documentation, + ) self._created = time.time() def observe(self, amount: float) -> None: @@ -492,15 +564,15 @@ def time(self) -> Timer: Can be used as a function decorator or context manager. """ - return Timer(self, 'observe') + return Timer(self, "observe") def _child_samples(self) -> Iterable[Sample]: samples = [ - Sample('_count', {}, self._count.get(), None, None), - Sample('_sum', {}, self._sum.get(), None, None), + Sample("_count", {}, self._count.get(), None, None), + Sample("_sum", {}, self._sum.get(), None, None), ] if _use_created: - samples.append(Sample('_created', {}, self._created, None, None)) + samples.append(Sample("_created", {}, self._created, None, None)) return tuple(samples) @@ -539,21 +611,39 @@ def create_response(request): The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds. They can be overridden by passing `buckets` keyword argument to `Histogram`. """ - _type = 'histogram' - _reserved_labelnames = ['le'] - DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF) - - def __init__(self, - name: str, - documentation: str, - labelnames: Iterable[str] = (), - namespace: str = '', - subsystem: str = '', - unit: str = '', - registry: Optional[CollectorRegistry] = REGISTRY, - _labelvalues: Optional[Sequence[str]] = None, - buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, - ): + + _type = "histogram" + _reserved_labelnames = ["le"] + DEFAULT_BUCKETS = ( + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + INF, + ) + + def __init__( + self, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = "", + subsystem: str = "", + unit: str = "", + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, + ): self._prepare_buckets(buckets) super().__init__( name=name, @@ -565,33 +655,42 @@ def __init__(self, registry=registry, _labelvalues=_labelvalues, ) - self._kwargs['buckets'] = buckets + self._kwargs["buckets"] = buckets def _prepare_buckets(self, source_buckets: Sequence[Union[float, str]]) -> None: buckets = [float(b) for b in source_buckets] if buckets != sorted(buckets): # This is probably an error on the part of the user, # so raise rather than sorting for them. - raise ValueError('Buckets not in sorted order') + raise ValueError("Buckets not in sorted order") if buckets and buckets[-1] != INF: buckets.append(INF) if len(buckets) < 2: - raise ValueError('Must have at least two buckets') + raise ValueError("Must have at least two buckets") self._upper_bounds = buckets def _metric_init(self) -> None: self._buckets: List[values.ValueClass] = [] self._created = time.time() - bucket_labelnames = self._labelnames + ('le',) - self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) + bucket_labelnames = self._labelnames + ("le",) + self._sum = values.ValueClass( + self._type, + self._name, + self._name + "_sum", + self._labelnames, + self._labelvalues, + self._documentation, + ) for b in self._upper_bounds: - self._buckets.append(values.ValueClass( - self._type, - self._name, - self._name + '_bucket', - bucket_labelnames, - self._labelvalues + (floatToGoString(b),), - self._documentation) + self._buckets.append( + values.ValueClass( + self._type, + self._name, + self._name + "_bucket", + bucket_labelnames, + self._labelvalues + (floatToGoString(b),), + self._documentation, + ) ) def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None: @@ -611,7 +710,9 @@ def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> N self._buckets[i].inc(1) if exemplar: _validate_exemplar(exemplar) - self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time.time())) + self._buckets[i].set_exemplar( + Exemplar(exemplar, amount, time.time()) + ) break def time(self) -> Timer: @@ -619,39 +720,48 @@ def time(self) -> Timer: Can be used as a function decorator or context manager. """ - return Timer(self, 'observe') + return Timer(self, "observe") def _child_samples(self) -> Iterable[Sample]: samples = [] acc = 0.0 for i, bound in enumerate(self._upper_bounds): acc += self._buckets[i].get() - samples.append(Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar())) - samples.append(Sample('_count', {}, acc, None, None)) + samples.append( + Sample( + "_bucket", + {"le": floatToGoString(bound)}, + acc, + None, + self._buckets[i].get_exemplar(), + ) + ) + samples.append(Sample("_count", {}, acc, None, None)) if self._upper_bounds[0] >= 0: - samples.append(Sample('_sum', {}, self._sum.get(), None, None)) + samples.append(Sample("_sum", {}, self._sum.get(), None, None)) if _use_created: - samples.append(Sample('_created', {}, self._created, None, None)) + samples.append(Sample("_created", {}, self._created, None, None)) return tuple(samples) class Info(MetricWrapperBase): """Info metric, key-value pairs. - Examples of Info include: - - Build information - - Version information - - Potential target metadata + Examples of Info include: + - Build information + - Version information + - Potential target metadata - Example usage: - from prometheus_client import Info + Example usage: + from prometheus_client import Info - i = Info('my_build', 'Description of info') - i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) + i = Info('my_build', 'Description of info') + i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) - Info metrics do not work in multiprocess mode. + Info metrics do not work in multiprocess mode. """ - _type = 'info' + + _type = "info" def _metric_init(self): self._labelname_set = set(self._labelnames) @@ -661,42 +771,47 @@ def _metric_init(self): def info(self, val: Dict[str, str]) -> None: """Set info metric.""" if self._labelname_set.intersection(val.keys()): - raise ValueError('Overlapping labels for Info metric, metric: {} child: {}'.format( - self._labelnames, val)) + raise ValueError( + "Overlapping labels for Info metric, metric: {} child: {}".format( + self._labelnames, val + ) + ) with self._lock: self._value = dict(val) def _child_samples(self) -> Iterable[Sample]: with self._lock: - return (Sample('_info', self._value, 1.0, None, None),) + return (Sample("_info", self._value, 1.0, None, None),) class Enum(MetricWrapperBase): """Enum metric, which of a set of states is true. - Example usage: - from prometheus_client import Enum + Example usage: + from prometheus_client import Enum - e = Enum('task_state', 'Description of enum', - states=['starting', 'running', 'stopped']) - e.state('running') + e = Enum('task_state', 'Description of enum', + states=['starting', 'running', 'stopped']) + e.state('running') - The first listed state will be the default. - Enum metrics do not work in multiprocess mode. + The first listed state will be the default. + Enum metrics do not work in multiprocess mode. """ - _type = 'stateset' - - def __init__(self, - name: str, - documentation: str, - labelnames: Sequence[str] = (), - namespace: str = '', - subsystem: str = '', - unit: str = '', - registry: Optional[CollectorRegistry] = REGISTRY, - _labelvalues: Optional[Sequence[str]] = None, - states: Optional[Sequence[str]] = None, - ): + + _type = "stateset" + + def __init__( + self, + name: str, + documentation: str, + labelnames: Sequence[str] = (), + namespace: str = "", + subsystem: str = "", + unit: str = "", + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + states: Optional[Sequence[str]] = None, + ): super().__init__( name=name, documentation=documentation, @@ -708,10 +823,10 @@ def __init__(self, _labelvalues=_labelvalues, ) if name in labelnames: - raise ValueError(f'Overlapping labels for Enum metric: {name}') + raise ValueError(f"Overlapping labels for Enum metric: {name}") if not states: - raise ValueError(f'No states provided for Enum metric: {name}') - self._kwargs['states'] = self._states = states + raise ValueError(f"No states provided for Enum metric: {name}") + self._kwargs["states"] = self._states = states def _metric_init(self) -> None: self._value = 0 @@ -726,7 +841,6 @@ def state(self, state: str) -> None: def _child_samples(self) -> Iterable[Sample]: with self._lock: return [ - Sample('', {self._name: s}, 1 if i == self._value else 0, None, None) - for i, s - in enumerate(self._states) + Sample("", {self._name: s}, 1 if i == self._value else 0, None, None) + for i, s in enumerate(self._states) ] From e59f58eeba5ae7320e45d9538ad2527b0aebf569 Mon Sep 17 00:00:00 2001 From: red Date: Thu, 21 Sep 2023 01:52:12 +0200 Subject: [PATCH 2/2] unformat for PR purposes Signed-off-by: red --- prometheus_client/metrics.py | 467 +++++++++++++---------------------- 1 file changed, 178 insertions(+), 289 deletions(-) diff --git a/prometheus_client/metrics.py b/prometheus_client/metrics.py index cdf77bb7..3361b2e8 100644 --- a/prometheus_client/metrics.py +++ b/prometheus_client/metrics.py @@ -3,59 +3,45 @@ import time import types from typing import ( - Any, - Callable, - Dict, - Iterable, - List, - Literal, - Optional, - Sequence, - Tuple, - Type, - TypeVar, - Union, + Any, Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, + Type, TypeVar, Union, ) from . import values # retain this import style for testability from .context_managers import ExceptionCounter, InprogressTracker, Timer from .metrics_core import ( - Metric, - METRIC_LABEL_NAME_RE, - METRIC_NAME_RE, + Metric, METRIC_LABEL_NAME_RE, METRIC_NAME_RE, RESERVED_METRIC_LABEL_NAME_RE, ) from .registry import Collector, CollectorRegistry, REGISTRY from .samples import Exemplar, Sample from .utils import floatToGoString, INF -T = TypeVar("T", bound="MetricWrapperBase") +T = TypeVar('T', bound='MetricWrapperBase') F = TypeVar("F", bound=Callable[..., Any]) def _build_full_name(metric_type, name, namespace, subsystem, unit): - full_name = "" + full_name = '' if namespace: - full_name += namespace + "_" + full_name += namespace + '_' if subsystem: - full_name += subsystem + "_" + full_name += subsystem + '_' full_name += name - if metric_type == "counter" and full_name.endswith("_total"): + if metric_type == 'counter' and full_name.endswith('_total'): full_name = full_name[:-6] # Munge to OpenMetrics. if unit and not full_name.endswith("_" + unit): full_name += "_" + unit - if unit and metric_type in ("info", "stateset"): - raise ValueError( - "Metric name is of a type that cannot have a unit: " + full_name - ) + if unit and metric_type in ('info', 'stateset'): + raise ValueError('Metric name is of a type that cannot have a unit: ' + full_name) return full_name def _validate_labelname(l): if not METRIC_LABEL_NAME_RE.match(l): - raise ValueError("Invalid label metric name: " + l) + raise ValueError('Invalid label metric name: ' + l) if RESERVED_METRIC_LABEL_NAME_RE.match(l): - raise ValueError("Reserved label metric name: " + l) + raise ValueError('Reserved label metric name: ' + l) def _validate_labelnames(cls, labelnames): @@ -63,7 +49,7 @@ def _validate_labelnames(cls, labelnames): for l in labelnames: _validate_labelname(l) if l in cls._reserved_labelnames: - raise ValueError("Reserved label metric name: " + l) + raise ValueError('Reserved label metric name: ' + l) return labelnames @@ -74,17 +60,11 @@ def _validate_exemplar(exemplar): runes += len(k) runes += len(v) if runes > 128: - raise ValueError( - "Exemplar labels have %d UTF-8 characters, exceeding the limit of 128" - ) + raise ValueError('Exemplar labels have %d UTF-8 characters, exceeding the limit of 128') def _get_use_created() -> bool: - return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", "False").lower() not in ( - "true", - "1", - "t", - ) + return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", 'False').lower() not in ('true', '1', 't') _use_created = _get_use_created() @@ -105,7 +85,7 @@ def _raise_if_not_observable(self): # a counter, will fail if the metric is not observable, because only if a # metric is observable will the value be initialized. if not self._is_observable(): - raise ValueError("%s metric is missing label values" % str(self._type)) + raise ValueError('%s metric is missing label values' % str(self._type)) def _is_parent(self): return self._labelnames and not self._labelvalues @@ -129,17 +109,16 @@ def __repr__(self) -> str: metric_type = type(self) return f"{metric_type.__module__}.{metric_type.__name__}({self._name})" - def __init__( - self: T, - name: str, - documentation: str, - labelnames: Iterable[str] = (), - namespace: str = "", - subsystem: str = "", - unit: str = "", - registry: Optional[CollectorRegistry] = REGISTRY, - _labelvalues: Optional[Sequence[str]] = None, - ) -> None: + def __init__(self: T, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = '', + subsystem: str = '', + unit: str = '', + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + ) -> None: self._name = _build_full_name(self._type, name, namespace, subsystem, unit) self._labelnames = _validate_labelnames(self, labelnames) self._labelvalues = tuple(_labelvalues or ()) @@ -148,7 +127,7 @@ def __init__( self._unit = unit if not METRIC_NAME_RE.match(self._name): - raise ValueError("Invalid metric name: " + self._name) + raise ValueError('Invalid metric name: ' + self._name) if self._is_parent(): # Prepare the fields needed for child metrics. @@ -161,8 +140,8 @@ def __init__( if not self._labelvalues: # Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang. if registry and not ( - "prometheus_multiproc_dir" in os.environ - or "PROMETHEUS_MULTIPROC_DIR" in os.environ + 'prometheus_multiproc_dir' in os.environ + or 'PROMETHEUS_MULTIPROC_DIR' in os.environ ): registry.register(self) @@ -190,25 +169,24 @@ def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels). """ if not self._labelnames: - raise ValueError("No label names were set when constructing %s" % self) + raise ValueError('No label names were set when constructing %s' % self) if self._labelvalues: - raise ValueError( - "{} already has labels set ({}); can not chain calls to .labels()".format( - self, dict(zip(self._labelnames, self._labelvalues)) - ) - ) + raise ValueError('{} already has labels set ({}); can not chain calls to .labels()'.format( + self, + dict(zip(self._labelnames, self._labelvalues)) + )) if labelvalues and labelkwargs: raise ValueError("Can't pass both *args and **kwargs") if labelkwargs: if sorted(labelkwargs) != sorted(self._labelnames): - raise ValueError("Incorrect label names") + raise ValueError('Incorrect label names') labelvalues = tuple(str(labelkwargs[l]) for l in self._labelnames) else: if len(labelvalues) != len(self._labelnames): - raise ValueError("Incorrect label count") + raise ValueError('Incorrect label count') labelvalues = tuple(str(l) for l in labelvalues) with self._lock: if labelvalues not in self._metrics: @@ -218,20 +196,17 @@ def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: labelnames=self._labelnames, unit=self._unit, _labelvalues=labelvalues, - **self._kwargs, + **self._kwargs ) return self._metrics[labelvalues] def remove(self, *labelvalues: Any) -> None: if not self._labelnames: - raise ValueError("No label names were set when constructing %s" % self) + raise ValueError('No label names were set when constructing %s' % self) """Remove the given labelset from the metric.""" if len(labelvalues) != len(self._labelnames): - raise ValueError( - "Incorrect label count (expected %d, got %s)" - % (len(self._labelnames), labelvalues) - ) + raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues)) labelvalues = tuple(str(l) for l in labelvalues) with self._lock: del self._metrics[labelvalues] @@ -253,16 +228,10 @@ def _multi_samples(self) -> Iterable[Sample]: for labels, metric in metrics.items(): series_labels = list(zip(self._labelnames, labels)) for suffix, sample_labels, value, timestamp, exemplar in metric._samples(): - yield Sample( - suffix, - dict(series_labels + list(sample_labels.items())), - value, - timestamp, - exemplar, - ) + yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar) def _child_samples(self) -> Iterable[Sample]: # pragma: no cover - raise NotImplementedError("_child_samples() must be implemented by %r" % self) + raise NotImplementedError('_child_samples() must be implemented by %r' % self) def _metric_init(self): # pragma: no cover """ @@ -270,7 +239,7 @@ def _metric_init(self): # pragma: no cover This is factored as a separate function to allow for deferred initialization. """ - raise NotImplementedError("_metric_init() must be implemented by %r" % self) + raise NotImplementedError('_metric_init() must be implemented by %r' % self) class Counter(MetricWrapperBase): @@ -305,38 +274,24 @@ def f(): with c.count_exceptions(ValueError): pass """ - - _type = "counter" + _type = 'counter' def _metric_init(self) -> None: - self._value = values.ValueClass( - self._type, - self._name, - self._name + "_total", - self._labelnames, - self._labelvalues, - self._documentation, - ) + self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames, + self._labelvalues, self._documentation) self._created = time.time() def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None: """Increment counter by the given amount.""" self._raise_if_not_observable() if amount < 0: - raise ValueError( - "Counters can only be incremented by non-negative amounts." - ) + raise ValueError('Counters can only be incremented by non-negative amounts.') self._value.inc(amount) if exemplar: _validate_exemplar(exemplar) self._value.set_exemplar(Exemplar(exemplar, amount, time.time())) - def count_exceptions( - self, - exception: Union[ - Type[BaseException], Tuple[Type[BaseException], ...] - ] = Exception, - ) -> ExceptionCounter: + def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception) -> ExceptionCounter: """Count exceptions in a block of code or function. Can be used as a function decorator or context manager. @@ -347,74 +302,69 @@ def count_exceptions( return ExceptionCounter(self, exception) def _child_samples(self) -> Iterable[Sample]: - sample = Sample( - "_total", {}, self._value.get(), None, self._value.get_exemplar() - ) + sample = Sample('_total', {}, self._value.get(), None, self._value.get_exemplar()) if _use_created: - return (sample, Sample("_created", {}, self._created, None, None)) + return ( + sample, + Sample('_created', {}, self._created, None, None) + ) return (sample,) class Gauge(MetricWrapperBase): """Gauge metric, to report instantaneous values. - Examples of Gauges include: - - Inprogress requests - - Number of items in a queue - - Free memory - - Total memory - - Temperature + Examples of Gauges include: + - Inprogress requests + - Number of items in a queue + - Free memory + - Total memory + - Temperature - Gauges can go both up and down. + Gauges can go both up and down. - from prometheus_client import Gauge + from prometheus_client import Gauge - g = Gauge('my_inprogress_requests', 'Description of gauge') - g.inc() # Increment by 1 - g.dec(10) # Decrement by given value - g.set(4.2) # Set to a given value + g = Gauge('my_inprogress_requests', 'Description of gauge') + g.inc() # Increment by 1 + g.dec(10) # Decrement by given value + g.set(4.2) # Set to a given value - There are utilities for common use cases: + There are utilities for common use cases: - g.set_to_current_time() # Set to current unixtime + g.set_to_current_time() # Set to current unixtime - # Increment when entered, decrement when exited. - @g.track_inprogress() - def f(): - pass + # Increment when entered, decrement when exited. + @g.track_inprogress() + def f(): + pass - with g.track_inprogress(): - pass + with g.track_inprogress(): + pass - A Gauge can also take its value from a callback: + A Gauge can also take its value from a callback: - d = Gauge('data_objects', 'Number of objects') - my_dict = {} - d.set_function(lambda: len(my_dict)) + d = Gauge('data_objects', 'Number of objects') + my_dict = {} + d.set_function(lambda: len(my_dict)) """ - - _type = "gauge" - _MULTIPROC_MODES = frozenset( - ("all", "liveall", "min", "livemin", "max", "livemax", "sum", "livesum") - ) - - def __init__( - self, - name: str, - documentation: str, - labelnames: Iterable[str] = (), - namespace: str = "", - subsystem: str = "", - unit: str = "", - registry: Optional[CollectorRegistry] = REGISTRY, - _labelvalues: Optional[Sequence[str]] = None, - multiprocess_mode: Literal[ - "all", "liveall", "min", "livemin", "max", "livemax", "sum", "livesum" - ] = "all", - ): + _type = 'gauge' + _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum')) + + def __init__(self, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = '', + subsystem: str = '', + unit: str = '', + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all', + ): self._multiprocess_mode = multiprocess_mode if multiprocess_mode not in self._MULTIPROC_MODES: - raise ValueError("Invalid multiprocess mode: " + multiprocess_mode) + raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode) super().__init__( name=name, documentation=documentation, @@ -425,17 +375,12 @@ def __init__( registry=registry, _labelvalues=_labelvalues, ) - self._kwargs["multiprocess_mode"] = self._multiprocess_mode + self._kwargs['multiprocess_mode'] = self._multiprocess_mode def _metric_init(self) -> None: self._value = values.ValueClass( - self._type, - self._name, - self._name, - self._labelnames, - self._labelvalues, - self._documentation, - multiprocess_mode=self._multiprocess_mode, + self._type, self._name, self._name, self._labelnames, self._labelvalues, + self._documentation, multiprocess_mode=self._multiprocess_mode ) def inc(self, amount: float = 1) -> None: @@ -472,7 +417,7 @@ def time(self) -> Timer: Can be used as a function decorator or context manager. """ - return Timer(self, "set") + return Timer(self, 'set') def set_function(self, f: Callable[[], float]) -> None: """Call the provided function to return the Gauge value. @@ -484,12 +429,12 @@ def set_function(self, f: Callable[[], float]) -> None: self._raise_if_not_observable() def samples(_: Gauge) -> Iterable[Sample]: - return (Sample("", {}, float(f()), None, None),) + return (Sample('', {}, float(f()), None, None),) self._child_samples = types.MethodType(samples, self) # type: ignore def _child_samples(self) -> Iterable[Sample]: - return (Sample("", {}, self._value.get(), None, None),) + return (Sample('', {}, self._value.get(), None, None),) class Summary(MetricWrapperBase): @@ -522,27 +467,13 @@ def create_response(request): with REQUEST_TIME.time(): pass # Logic to be timed """ - - _type = "summary" - _reserved_labelnames = ["quantile"] + _type = 'summary' + _reserved_labelnames = ['quantile'] def _metric_init(self) -> None: - self._count = values.ValueClass( - self._type, - self._name, - self._name + "_count", - self._labelnames, - self._labelvalues, - self._documentation, - ) - self._sum = values.ValueClass( - self._type, - self._name, - self._name + "_sum", - self._labelnames, - self._labelvalues, - self._documentation, - ) + self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames, + self._labelvalues, self._documentation) + self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) self._created = time.time() def observe(self, amount: float) -> None: @@ -564,15 +495,15 @@ def time(self) -> Timer: Can be used as a function decorator or context manager. """ - return Timer(self, "observe") + return Timer(self, 'observe') def _child_samples(self) -> Iterable[Sample]: samples = [ - Sample("_count", {}, self._count.get(), None, None), - Sample("_sum", {}, self._sum.get(), None, None), + Sample('_count', {}, self._count.get(), None, None), + Sample('_sum', {}, self._sum.get(), None, None), ] if _use_created: - samples.append(Sample("_created", {}, self._created, None, None)) + samples.append(Sample('_created', {}, self._created, None, None)) return tuple(samples) @@ -611,39 +542,21 @@ def create_response(request): The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds. They can be overridden by passing `buckets` keyword argument to `Histogram`. """ - - _type = "histogram" - _reserved_labelnames = ["le"] - DEFAULT_BUCKETS = ( - 0.005, - 0.01, - 0.025, - 0.05, - 0.075, - 0.1, - 0.25, - 0.5, - 0.75, - 1.0, - 2.5, - 5.0, - 7.5, - 10.0, - INF, - ) - - def __init__( - self, - name: str, - documentation: str, - labelnames: Iterable[str] = (), - namespace: str = "", - subsystem: str = "", - unit: str = "", - registry: Optional[CollectorRegistry] = REGISTRY, - _labelvalues: Optional[Sequence[str]] = None, - buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, - ): + _type = 'histogram' + _reserved_labelnames = ['le'] + DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF) + + def __init__(self, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = '', + subsystem: str = '', + unit: str = '', + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, + ): self._prepare_buckets(buckets) super().__init__( name=name, @@ -655,42 +568,33 @@ def __init__( registry=registry, _labelvalues=_labelvalues, ) - self._kwargs["buckets"] = buckets + self._kwargs['buckets'] = buckets def _prepare_buckets(self, source_buckets: Sequence[Union[float, str]]) -> None: buckets = [float(b) for b in source_buckets] if buckets != sorted(buckets): # This is probably an error on the part of the user, # so raise rather than sorting for them. - raise ValueError("Buckets not in sorted order") + raise ValueError('Buckets not in sorted order') if buckets and buckets[-1] != INF: buckets.append(INF) if len(buckets) < 2: - raise ValueError("Must have at least two buckets") + raise ValueError('Must have at least two buckets') self._upper_bounds = buckets def _metric_init(self) -> None: self._buckets: List[values.ValueClass] = [] self._created = time.time() - bucket_labelnames = self._labelnames + ("le",) - self._sum = values.ValueClass( - self._type, - self._name, - self._name + "_sum", - self._labelnames, - self._labelvalues, - self._documentation, - ) + bucket_labelnames = self._labelnames + ('le',) + self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) for b in self._upper_bounds: - self._buckets.append( - values.ValueClass( - self._type, - self._name, - self._name + "_bucket", - bucket_labelnames, - self._labelvalues + (floatToGoString(b),), - self._documentation, - ) + self._buckets.append(values.ValueClass( + self._type, + self._name, + self._name + '_bucket', + bucket_labelnames, + self._labelvalues + (floatToGoString(b),), + self._documentation) ) def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None: @@ -710,9 +614,7 @@ def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> N self._buckets[i].inc(1) if exemplar: _validate_exemplar(exemplar) - self._buckets[i].set_exemplar( - Exemplar(exemplar, amount, time.time()) - ) + self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time.time())) break def time(self) -> Timer: @@ -720,48 +622,39 @@ def time(self) -> Timer: Can be used as a function decorator or context manager. """ - return Timer(self, "observe") + return Timer(self, 'observe') def _child_samples(self) -> Iterable[Sample]: samples = [] acc = 0.0 for i, bound in enumerate(self._upper_bounds): acc += self._buckets[i].get() - samples.append( - Sample( - "_bucket", - {"le": floatToGoString(bound)}, - acc, - None, - self._buckets[i].get_exemplar(), - ) - ) - samples.append(Sample("_count", {}, acc, None, None)) + samples.append(Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar())) + samples.append(Sample('_count', {}, acc, None, None)) if self._upper_bounds[0] >= 0: - samples.append(Sample("_sum", {}, self._sum.get(), None, None)) + samples.append(Sample('_sum', {}, self._sum.get(), None, None)) if _use_created: - samples.append(Sample("_created", {}, self._created, None, None)) + samples.append(Sample('_created', {}, self._created, None, None)) return tuple(samples) class Info(MetricWrapperBase): """Info metric, key-value pairs. - Examples of Info include: - - Build information - - Version information - - Potential target metadata + Examples of Info include: + - Build information + - Version information + - Potential target metadata - Example usage: - from prometheus_client import Info + Example usage: + from prometheus_client import Info - i = Info('my_build', 'Description of info') - i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) + i = Info('my_build', 'Description of info') + i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) - Info metrics do not work in multiprocess mode. + Info metrics do not work in multiprocess mode. """ - - _type = "info" + _type = 'info' def _metric_init(self): self._labelname_set = set(self._labelnames) @@ -771,47 +664,42 @@ def _metric_init(self): def info(self, val: Dict[str, str]) -> None: """Set info metric.""" if self._labelname_set.intersection(val.keys()): - raise ValueError( - "Overlapping labels for Info metric, metric: {} child: {}".format( - self._labelnames, val - ) - ) + raise ValueError('Overlapping labels for Info metric, metric: {} child: {}'.format( + self._labelnames, val)) with self._lock: self._value = dict(val) def _child_samples(self) -> Iterable[Sample]: with self._lock: - return (Sample("_info", self._value, 1.0, None, None),) + return (Sample('_info', self._value, 1.0, None, None),) class Enum(MetricWrapperBase): """Enum metric, which of a set of states is true. - Example usage: - from prometheus_client import Enum + Example usage: + from prometheus_client import Enum - e = Enum('task_state', 'Description of enum', - states=['starting', 'running', 'stopped']) - e.state('running') + e = Enum('task_state', 'Description of enum', + states=['starting', 'running', 'stopped']) + e.state('running') - The first listed state will be the default. - Enum metrics do not work in multiprocess mode. + The first listed state will be the default. + Enum metrics do not work in multiprocess mode. """ - - _type = "stateset" - - def __init__( - self, - name: str, - documentation: str, - labelnames: Sequence[str] = (), - namespace: str = "", - subsystem: str = "", - unit: str = "", - registry: Optional[CollectorRegistry] = REGISTRY, - _labelvalues: Optional[Sequence[str]] = None, - states: Optional[Sequence[str]] = None, - ): + _type = 'stateset' + + def __init__(self, + name: str, + documentation: str, + labelnames: Sequence[str] = (), + namespace: str = '', + subsystem: str = '', + unit: str = '', + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + states: Optional[Sequence[str]] = None, + ): super().__init__( name=name, documentation=documentation, @@ -823,10 +711,10 @@ def __init__( _labelvalues=_labelvalues, ) if name in labelnames: - raise ValueError(f"Overlapping labels for Enum metric: {name}") + raise ValueError(f'Overlapping labels for Enum metric: {name}') if not states: - raise ValueError(f"No states provided for Enum metric: {name}") - self._kwargs["states"] = self._states = states + raise ValueError(f'No states provided for Enum metric: {name}') + self._kwargs['states'] = self._states = states def _metric_init(self) -> None: self._value = 0 @@ -841,6 +729,7 @@ def state(self, state: str) -> None: def _child_samples(self) -> Iterable[Sample]: with self._lock: return [ - Sample("", {self._name: s}, 1 if i == self._value else 0, None, None) - for i, s in enumerate(self._states) + Sample('', {self._name: s}, 1 if i == self._value else 0, None, None) + for i, s + in enumerate(self._states) ]