diff --git a/consumer/interfaces.go b/consumer/interfaces.go index ec37006a..4ef8ecb8 100644 --- a/consumer/interfaces.go +++ b/consumer/interfaces.go @@ -300,6 +300,11 @@ var ( "gazette_shard_up", "Indicates the processing status of a shard by this consumer.", []string{"shard", "status"}, nil) + + shardReadHeadDesc = prometheus.NewDesc( + "gazette_shard_read_head", + "Current read head of the consumer (i.e., next journal byte offset to be read).", + []string{"shard", "journal"}, nil) ) var ( @@ -315,10 +320,6 @@ var ( Name: "gazette_shard_read_bytes_total", Help: "Total byte-length of messages processed by completed consumer transactions.", }, []string{"shard"}) - shardReadHeadGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "gazette_shard_read_head", - Help: "Current read head of the consumer (i.e., next journal byte offset to be read).", - }, []string{"shard", "journal"}) shardTxnPhaseSecondsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "gazette_shard_phase_seconds_total", Help: "Cumulative number of seconds processing transactions.", @@ -357,8 +358,4 @@ var ( Name: "gazette_consumer_consumed_bytes_total", Help: "Cumulative number of bytes consumed.", }) - readHeadGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "gazette_consumer_read_head", - Help: "Consumer read head", - }, []string{"journal"}) ) diff --git a/consumer/resolver.go b/consumer/resolver.go index e5401938..1702bb66 100644 --- a/consumer/resolver.go +++ b/consumer/resolver.go @@ -391,17 +391,34 @@ func (r *Resolver) Describe(ch chan<- *prometheus.Desc) { // Collect implements prometheus.Collector func (r *Resolver) Collect(ch chan<- prometheus.Metric) { + for _, m := range r.collectShardMetrics() { + ch <- m + } +} + +func (r *Resolver) collectShardMetrics() []prometheus.Metric { r.state.KS.Mu.RLock() defer r.state.KS.Mu.RUnlock() + metrics := make([]prometheus.Metric, 0) for shardID, shard := range r.shards { - status := shard.resolved.assignment.Decoded.(allocator.Assignment).AssignmentValue.(*pc.ReplicaStatus) - ch <- prometheus.MustNewConstMetric( + shardStatus := shard.resolved.assignment.Decoded.(allocator.Assignment).AssignmentValue.(*pc.ReplicaStatus) + metrics = append(metrics, prometheus.MustNewConstMetric( shardUpDesc, prometheus.GaugeValue, 1, shardID.String(), - status.Code.String()) + shardStatus.Code.String())) + readThrough, _ := shard.Progress() + for j, o := range readThrough { + metrics = append(metrics, prometheus.MustNewConstMetric( + shardReadHeadDesc, + prometheus.GaugeValue, + float64(o), + shardID.String(), + j.String())) + } } + return metrics } // ErrResolverStopped is returned by Resolver if a ShardID resolves to a local diff --git a/consumer/transaction.go b/consumer/transaction.go index a76d4779..910deef3 100644 --- a/consumer/transaction.go +++ b/consumer/transaction.go @@ -214,7 +214,6 @@ func txnRead(s *shard, txn, prev *transaction, env EnvelopeOrError, ok bool) err // DEPRECATED metrics to be removed: bytesConsumedTotal.Add(float64(env.End - env.Begin)) - readHeadGauge.WithLabelValues(env.Journal.Name.String()).Set(float64(env.End)) // End DEPRECATED metrics. // |env| is read-uncommitted. Queue and act on its sequencing outcome. @@ -501,11 +500,6 @@ func recordMetrics(s *shard, txn *transaction) { shardTxnTotal.WithLabelValues(s.FQN()).Inc() shardReadMsgsTotal.WithLabelValues(s.FQN()).Add(float64(txn.consumedCount)) shardReadBytesTotal.WithLabelValues(s.FQN()).Add(float64(txn.consumedBytes)) - for journal, source := range txn.checkpoint.Sources { - shardReadHeadGauge. - WithLabelValues(s.FQN(), journal.String()). - Set(float64(source.ReadThrough)) - } var ( durNotRunning = txn.beganAt.Sub(txn.prevPrepareDoneAt)