From 9a4f263a21a61f93d4242e4be6f99626f834d922 Mon Sep 17 00:00:00 2001 From: Nishchay Date: Thu, 11 May 2023 17:55:16 +0530 Subject: [PATCH 1/4] Add comprehensive test and support for timestamp function --- engine/engine_test.go | 7 +++++++ execution/function/functions.go | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/engine/engine_test.go b/engine/engine_test.go index 6aab9936..46a4c2fd 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -317,6 +317,13 @@ func TestQueriesAgainstOldEngine(t *testing.T) { http_requests_total{pod="nginx-2"} 1`, query: "atanh(http_requests_total)", }, + { + name: "timestamp", + load: `load 30s + http_requests_total{pod="nginx-1"} 0 + http_requests_total{pod="nginx-2"} 1`, + query: "timestamp(http_requests_total)", + }, { name: "rad", load: `load 30s diff --git a/execution/function/functions.go b/execution/function/functions.go index e59aab56..0a36182b 100644 --- a/execution/function/functions.go +++ b/execution/function/functions.go @@ -120,6 +120,18 @@ var Funcs = map[string]FunctionCall{ F: math.Pi, } }, + "timestamp": func(f FunctionArgs) promql.Sample { + if len(f.Samples) == 0 { + return InvalidSample + } + result := promql.Sample{} + for _, sample := range f.Samples { + if sample.H == nil { // only consider float samples + result.Points = append(result.Points, promql.Point{T: sample.T, V: float64(sample.T) / 1000}) + } + } + return result + }, "sum_over_time": func(f FunctionArgs) promql.Sample { if len(f.Samples) == 0 { return InvalidSample From 6ed9338e01ad7d862173aab298e327851dfc82a3 Mon Sep 17 00:00:00 2001 From: Nishchay Date: Tue, 16 May 2023 03:33:19 +0530 Subject: [PATCH 2/4] pushed timestamp func into selectPoint func inside vector_selector --- engine/engine_test.go | 7 ---- execution/function/functions.go | 12 ------ execution/scan/vector_selector.go | 65 +++++++++++++++++++++---------- 3 files changed, 45 insertions(+), 39 deletions(-) diff --git a/engine/engine_test.go b/engine/engine_test.go index 46a4c2fd..6aab9936 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -317,13 +317,6 @@ func TestQueriesAgainstOldEngine(t *testing.T) { http_requests_total{pod="nginx-2"} 1`, query: "atanh(http_requests_total)", }, - { - name: "timestamp", - load: `load 30s - http_requests_total{pod="nginx-1"} 0 - http_requests_total{pod="nginx-2"} 1`, - query: "timestamp(http_requests_total)", - }, { name: "rad", load: `load 30s diff --git a/execution/function/functions.go b/execution/function/functions.go index 0a36182b..e59aab56 100644 --- a/execution/function/functions.go +++ b/execution/function/functions.go @@ -120,18 +120,6 @@ var Funcs = map[string]FunctionCall{ F: math.Pi, } }, - "timestamp": func(f FunctionArgs) promql.Sample { - if len(f.Samples) == 0 { - return InvalidSample - } - result := promql.Sample{} - for _, sample := range f.Samples { - if sample.H == nil { // only consider float samples - result.Points = append(result.Points, promql.Point{T: sample.T, V: float64(sample.T) / 1000}) - } - } - return result - }, "sum_over_time": func(f FunctionArgs) promql.Sample { if len(f.Samples) == 0 { return InvalidSample diff --git a/execution/scan/vector_selector.go b/execution/scan/vector_selector.go index 501d5bdf..69ccb75f 100644 --- a/execution/scan/vector_selector.go +++ b/execution/scan/vector_selector.go @@ -49,6 +49,12 @@ type vectorSelector struct { numShards int } +type point struct { + t int64 + v float64 + fh *histogram.FloatHistogram +} + // NewVectorSelector creates operator which selects vector of series. func NewVectorSelector( pool *model.VectorPool, @@ -106,35 +112,51 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { vectors := o.vectorPool.GetVectorBatch() ts := o.currentStep + for i := 0; i < len(o.scanners); i++ { var ( - series = o.scanners[i] - seriesTs = ts + series = o.scanners[i] + seriesTs = ts + lastSampleTs int64 // Added variable to store timestamp of the last sample in the lookback period. ) for currStep := 0; currStep < o.numSteps && seriesTs <= o.maxt; currStep++ { if len(vectors) <= currStep { vectors = append(vectors, o.vectorPool.GetStepVector(seriesTs)) } - _, v, h, ok, err := selectPoint(series.samples, seriesTs, o.lookbackDelta, o.offset) + + // Modify selectPoint call to retrieve timestamp of the last sample in the lookback period. + p, ok, err := selectPoint(series.samples, seriesTs, o.lookbackDelta, o.offset) if err != nil { return nil, err } + if ok { - if h != nil { - vectors[currStep].AppendHistogram(o.vectorPool, series.signature, h) + if p.fh != nil { + vectors[currStep].AppendHistogram(o.vectorPool, series.signature, p.fh) } else { - vectors[currStep].AppendSample(o.vectorPool, series.signature, v) + vectors[currStep].AppendSample(o.vectorPool, series.signature, p.v) } + + // Save the timestamp of the last sample in the lookback period. + lastSampleTs = p.t } + seriesTs += o.step } + + // Use the saved timestamp to compute timestamp of last sample in lookback period. + if lastSampleTs > 0 { + // vectors[len(vectors)-1].SetTimestamp(lastSampleTs) + } } + // For instant queries, set the step to a positive value // so that the operator can terminate. if o.step == 0 { o.step = 1 } + o.currentStep += o.step * int64(o.numSteps) return vectors, nil @@ -165,35 +187,38 @@ func (o *vectorSelector) loadSeries(ctx context.Context) error { } // TODO(fpetkovski): Add max samples limit. -func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset int64) (int64, float64, *histogram.FloatHistogram, bool, error) { +// To push down the timestamp function into this file and store the timestamp in the value for each series, you can modify the selectPoint function. +func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset int64) (point, bool, error) { refTime := ts - offset - var t int64 - var v float64 - var fh *histogram.FloatHistogram + var p point valueType := it.Seek(refTime) switch valueType { case chunkenc.ValNone: if it.Err() != nil { - return 0, 0, nil, false, it.Err() + return p, false, it.Err() } case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: - t, fh = it.AtFloatHistogram() + t, fh := it.AtFloatHistogram() + p = point{t: t, fh: fh} case chunkenc.ValFloat: - t, v = it.At() + t, v := it.At() + p = point{t: t, v: v} default: panic(errors.Newf("unknown value type %v", valueType)) } - if valueType == chunkenc.ValNone || t > refTime { + + if valueType == chunkenc.ValNone || p.t > refTime { var ok bool - t, v, _, fh, ok = it.PeekPrev() - if !ok || t < refTime-lookbackDelta { - return 0, 0, nil, false, nil + p.t, p.v, _, p.fh, ok = it.PeekPrev() + if !ok || p.t < refTime-lookbackDelta { + return p, false, nil } } - if value.IsStaleNaN(v) || (fh != nil && value.IsStaleNaN(fh.Sum)) { - return 0, 0, nil, false, nil + + if value.IsStaleNaN(p.v) || (p.fh != nil && value.IsStaleNaN(p.fh.Sum)) { + return p, false, nil } - return t, v, fh, true, nil + return p, true, nil } From 80a1a1f7c6a87efd876d57bdb1c3a8071afd0766 Mon Sep 17 00:00:00 2001 From: Nishchay Date: Tue, 16 May 2023 04:05:21 +0530 Subject: [PATCH 3/4] use the save timestamp to compute that of last --- execution/scan/vector_selector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/execution/scan/vector_selector.go b/execution/scan/vector_selector.go index 69ccb75f..1de5bf7f 100644 --- a/execution/scan/vector_selector.go +++ b/execution/scan/vector_selector.go @@ -147,7 +147,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { // Use the saved timestamp to compute timestamp of last sample in lookback period. if lastSampleTs > 0 { - // vectors[len(vectors)-1].SetTimestamp(lastSampleTs) + vectors[len(vectors)-1].T = lastSampleTs } } From 564cb693c1b56ba141ebafb40b3490866358a2be Mon Sep 17 00:00:00 2001 From: Nishchay Date: Thu, 18 May 2023 21:08:02 +0530 Subject: [PATCH 4/4] return timestamp as value in case chunkenc.ValFloat --- execution/scan/vector_selector.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/execution/scan/vector_selector.go b/execution/scan/vector_selector.go index 1de5bf7f..6aab243c 100644 --- a/execution/scan/vector_selector.go +++ b/execution/scan/vector_selector.go @@ -192,6 +192,8 @@ func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset i refTime := ts - offset var p point + returnTimestampAsValue := true // bool flag to indicate whether to return the timestamp as the point value. + valueType := it.Seek(refTime) switch valueType { case chunkenc.ValNone: @@ -203,7 +205,13 @@ func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset i p = point{t: t, fh: fh} case chunkenc.ValFloat: t, v := it.At() - p = point{t: t, v: v} + if returnTimestampAsValue { + p = point{t: t, v: float64(t)} + + } else { + p = point{t: t, v: v} + } + default: panic(errors.Newf("unknown value type %v", valueType)) }