diff --git a/pkg/window/counter.go b/pkg/window/counter.go new file mode 100644 index 00000000000..8eaf164b7c0 --- /dev/null +++ b/pkg/window/counter.go @@ -0,0 +1,111 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "time" +) + +// Metric is a sample interface. +// Implementations of Metrics in metric package are Counter, Gauge, +// PointGauge, RollingCounter and RollingGauge. +type Metric interface { + // Add adds the given value to the counter. + Add(int64) + // Value gets the current value. + // If the metric's type is PointGauge, RollingCounter, RollingGauge, + // it returns the sum value within the window. + Value() int64 +} + +// Aggregation contains some common aggregation function. +// Each aggregation can compute summary statistics of window. +type Aggregation interface { + // Min finds the min value within the window. + Min() float64 + // Max finds the max value within the window. + Max() float64 + // Avg computes average value within the window. + Avg() float64 + // Sum computes sum value within the window. + Sum() float64 +} + +// RollingCounter represents a ring window based on time duration. +// e.g. [[1], [3], [5]] +type RollingCounter interface { + Metric + Aggregation + + Timespan() int + // Reduce applies the reduction function to all buckets within the window. + Reduce(func(Iterator) float64) float64 +} + +// RollingCounterOpts contains the arguments for creating RollingCounter. +type RollingCounterOpts struct { + Size int + BucketDuration time.Duration +} + +type rollingCounter struct { + policy *RollingPolicy +} + +// NewRollingCounter creates a new RollingCounter bases on RollingCounterOpts. +func NewRollingCounter(opts RollingCounterOpts) RollingCounter { + window := NewWindow(Options{Size: opts.Size}) + policy := NewRollingPolicy(window, RollingPolicyOpts{BucketDuration: opts.BucketDuration}) + return &rollingCounter{ + policy: policy, + } +} + +func (r *rollingCounter) Add(val int64) { + r.policy.Add(float64(val)) +} + +func (r *rollingCounter) Reduce(f func(Iterator) float64) float64 { + return r.policy.Reduce(f) +} + +func (r *rollingCounter) Avg() float64 { + return r.policy.Reduce(Avg) +} + +func (r *rollingCounter) Min() float64 { + return r.policy.Reduce(Min) +} + +func (r *rollingCounter) Max() float64 { + return r.policy.Reduce(Max) +} + +func (r *rollingCounter) Sum() float64 { + return r.policy.Reduce(Sum) +} + +func (r *rollingCounter) Value() int64 { + return int64(r.Sum()) +} + +func (r *rollingCounter) Timespan() int { + r.policy.mu.RLock() + defer r.policy.mu.RUnlock() + return r.policy.timespan() +} diff --git a/pkg/window/counter_test.go b/pkg/window/counter_test.go new file mode 100644 index 00000000000..ce604acc27e --- /dev/null +++ b/pkg/window/counter_test.go @@ -0,0 +1,182 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRollingCounterAdd(t *testing.T) { + re := require.New(t) + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + listBuckets := func() [][]float64 { + buckets := make([][]float64, 0) + r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + buckets = append(buckets, bucket.Points) + } + return 0.0 + }) + return buckets + } + re.Equal([][]float64{{}, {}, {}}, listBuckets()) + r.Add(1) + re.Equal([][]float64{{}, {}, {1}}, listBuckets()) + time.Sleep(time.Second) + r.Add(2) + r.Add(3) + re.Equal([][]float64{{}, {1}, {5}}, listBuckets()) + time.Sleep(time.Second) + r.Add(4) + r.Add(5) + r.Add(6) + re.Equal([][]float64{{1}, {5}, {15}}, listBuckets()) + time.Sleep(time.Second) + r.Add(7) + re.Equal([][]float64{{5}, {15}, {7}}, listBuckets()) + + // test the given reduce methods. + re.Less(math.Abs((r.Sum() - 27.)), 1e-7) + re.Less(math.Abs((r.Max() - 15.)), 1e-7) + re.Less(math.Abs((r.Min() - 5.)), 1e-7) + re.Less(math.Abs((r.Avg() - 9.)), 1e-7) +} + +func TestRollingCounterReduce(t *testing.T) { + re := require.New(t) + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + for x := 0; x < size; x++ { + for i := 0; i <= x; i++ { + r.Add(1) + } + if x < size-1 { + time.Sleep(bucketDuration) + } + } + var result = r.Reduce(func(iterator Iterator) float64 { + var result float64 + for iterator.Next() { + bucket := iterator.Bucket() + result += bucket.Points[0] + } + return result + }) + re.Less(math.Abs(result-6.), 1e-7) + re.Less(math.Abs((r.Sum() - 6.)), 1e-7) + re.Less(math.Abs(float64(r.Value())-6), 1e-7) +} + +func TestRollingCounterDataRace(t *testing.T) { + size := 3 + bucketDuration := time.Millisecond * 10 + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + var stop = make(chan bool) + go func() { + for { + select { + case <-stop: + return + default: + r.Add(1) + time.Sleep(time.Millisecond * 5) + } + } + }() + go func() { + for { + select { + case <-stop: + return + default: + _ = r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + for range bucket.Points { + continue + } + } + return 0 + }) + } + } + }() + time.Sleep(time.Second * 3) + close(stop) +} + +func BenchmarkRollingCounterIncr(b *testing.B) { + size := 3 + bucketDuration := time.Millisecond * 100 + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + b.ResetTimer() + for i := 0; i <= b.N; i++ { + r.Add(1) + } +} + +func BenchmarkRollingCounterReduce(b *testing.B) { + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + for i := 0; i <= 10; i++ { + r.Add(1) + time.Sleep(time.Millisecond * 500) + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + var _ = r.Reduce(func(i Iterator) float64 { + var result float64 + for i.Next() { + bucket := i.Bucket() + if len(bucket.Points) != 0 { + result += bucket.Points[0] + } + } + return result + }) + } +} diff --git a/pkg/window/policy.go b/pkg/window/policy.go new file mode 100644 index 00000000000..d67a8aa6e59 --- /dev/null +++ b/pkg/window/policy.go @@ -0,0 +1,109 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "sync" + "time" +) + +// RollingPolicy is a policy for ring window based on time duration. +// RollingPolicy moves bucket offset with time duration. +// e.g. If the last point is appended one bucket duration ago, +// RollingPolicy will increment current offset. +type RollingPolicy struct { + mu sync.RWMutex + size int + window *Window + offset int + + bucketDuration time.Duration + lastAppendTime time.Time +} + +// RollingPolicyOpts contains the arguments for creating RollingPolicy. +type RollingPolicyOpts struct { + BucketDuration time.Duration +} + +// NewRollingPolicy creates a new RollingPolicy based on the given window and RollingPolicyOpts. +func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy { + return &RollingPolicy{ + window: window, + size: window.Size(), + offset: 0, + + bucketDuration: opts.BucketDuration, + lastAppendTime: time.Now(), + } +} + +// timespan returns passed bucket number since lastAppendTime, +// if it is one bucket duration earlier than the last recorded +// time, it will return the size. +func (r *RollingPolicy) timespan() int { + v := int(time.Since(r.lastAppendTime) / r.bucketDuration) + if v > -1 { // maybe time backwards + return v + } + return r.size +} + +// apply applies function f with value val on +// current offset bucket, expired bucket will be reset +func (r *RollingPolicy) apply(f func(offset int, val float64), val float64) { + r.mu.Lock() + defer r.mu.Unlock() + + // calculate current offset + timespan := r.timespan() + oriTimespan := timespan + if timespan > 0 { + start := (r.offset + 1) % r.size + end := (r.offset + timespan) % r.size + if timespan > r.size { + timespan = r.size + } + // reset the expired buckets + r.window.ResetBuckets(start, timespan) + r.offset = end + r.lastAppendTime = r.lastAppendTime.Add(time.Duration(oriTimespan * int(r.bucketDuration))) + } + f(r.offset, val) +} + +// Add adds the given value to the latest point within bucket. +func (r *RollingPolicy) Add(val float64) { + r.apply(r.window.Add, val) +} + +// Reduce applies the reduction function to all buckets within the window. +func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) { + r.mu.RLock() + defer r.mu.RUnlock() + + timespan := r.timespan() + if count := r.size - timespan; count > 0 { + offset := r.offset + timespan + 1 + if offset >= r.size { + offset -= r.size + } + val = f(r.window.Iterator(offset, count)) + } + return val +} diff --git a/pkg/window/policy_test.go b/pkg/window/policy_test.go new file mode 100644 index 00000000000..14b3b326192 --- /dev/null +++ b/pkg/window/policy_test.go @@ -0,0 +1,129 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func GetRollingPolicy() *RollingPolicy { + w := NewWindow(Options{Size: 3}) + return NewRollingPolicy(w, RollingPolicyOpts{BucketDuration: 100 * time.Millisecond}) +} + +func TestRollingPolicy_Add(t *testing.T) { + re := require.New(t) + // test func timespan return real span + tests := []struct { + timeSleep []int + offset []int + points []float64 + }{ + { + timeSleep: []int{150, 51}, + offset: []int{1, 2}, + points: []float64{1, 1}, + }, + { + timeSleep: []int{94, 250}, + offset: []int{0, 0}, + points: []float64{1, 1}, + }, + { + timeSleep: []int{150, 300, 600}, + offset: []int{1, 1, 1}, + points: []float64{1, 1, 1}, + }, + } + + for _, test := range tests { + t.Run("test policy add", func(t *testing.T) { + var totalTS, lastOffset int + timeSleep := test.timeSleep + policy := GetRollingPolicy() + for i, n := range timeSleep { + totalTS += n + time.Sleep(time.Duration(n) * time.Millisecond) + offset, point := test.offset[i], test.points[i] + policy.Add(point) + + re.Less(math.Abs(point-policy.window.buckets[offset].Points[0]), 1e-6, + fmt.Sprintf("error, time since last append: %vms, last offset: %v", totalTS, lastOffset)) + lastOffset = offset + } + }) + } +} + +func TestRollingPolicy_AddWithTimespan(t *testing.T) { + re := require.New(t) + t.Run("timespan < bucket number", func(t *testing.T) { + policy := GetRollingPolicy() + // bucket 0 + policy.Add(0) + // bucket 1 + time.Sleep(101 * time.Millisecond) + policy.Add(1) + re.Equal(1, int(policy.window.buckets[1].Points[0])) + // bucket 2 + time.Sleep(101 * time.Millisecond) + policy.Add(2) + // bucket 1 + time.Sleep(201 * time.Millisecond) + policy.Add(4) + + for _, bkt := range policy.window.buckets { + t.Logf("%+v", bkt) + } + + re.Equal(0, len(policy.window.buckets[0].Points)) + re.Equal(4, int(policy.window.buckets[1].Points[0])) + re.Equal(2, int(policy.window.buckets[2].Points[0])) + }) + + t.Run("timespan > bucket number", func(t *testing.T) { + policy := GetRollingPolicy() + + // bucket 0 + policy.Add(0) + // bucket 1 + time.Sleep(101 * time.Millisecond) + policy.Add(1) + policy.Add(1) + re.Equal(2, int(policy.window.buckets[1].Points[0])) + // bucket 2 + time.Sleep(101 * time.Millisecond) + policy.Add(2) + // bucket 1 + time.Sleep(501 * time.Millisecond) + policy.Add(4) + + for _, bkt := range policy.window.buckets { + t.Logf("%+v", bkt) + } + + re.Equal(0, len(policy.window.buckets[0].Points)) + re.Equal(4, int(policy.window.buckets[1].Points[0])) + re.Equal(0, len(policy.window.buckets[2].Points)) + }) +} diff --git a/pkg/window/reduce.go b/pkg/window/reduce.go new file mode 100644 index 00000000000..23fa87177f2 --- /dev/null +++ b/pkg/window/reduce.go @@ -0,0 +1,94 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +// Sum the values within the window. +func Sum(iterator Iterator) float64 { + var result = 0.0 + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + result += p + } + } + return result +} + +// Avg the values within the window. +func Avg(iterator Iterator) float64 { + var result = 0.0 + var count = 0.0 + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + result += p + count++ + } + } + return result / count +} + +// Min the values within the window. +func Min(iterator Iterator) float64 { + var result = 0.0 + var started = false + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + if !started { + result = p + started = true + continue + } + if p < result { + result = p + } + } + } + return result +} + +// Max the values within the window. +func Max(iterator Iterator) float64 { + var result = 0.0 + var started = false + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + if !started { + result = p + started = true + continue + } + if p > result { + result = p + } + } + } + return result +} + +// Count sums the count value within the window. +func Count(iterator Iterator) float64 { + var result int64 + for iterator.Next() { + bucket := iterator.Bucket() + result += bucket.Count + } + return float64(result) +} diff --git a/pkg/window/window.go b/pkg/window/window.go new file mode 100644 index 00000000000..a5c4b0dfe3c --- /dev/null +++ b/pkg/window/window.go @@ -0,0 +1,150 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import "fmt" + +// Bucket contains multiple float64 points. +type Bucket struct { + Points []float64 + Count int64 + next *Bucket +} + +// Append appends the given value to the bucket. +func (b *Bucket) Append(val float64) { + b.Points = append(b.Points, val) + b.Count++ +} + +// Add adds the given value to the point. +func (b *Bucket) Add(offset int, val float64) { + b.Points[offset] += val + b.Count++ +} + +// Reset empties the bucket. +func (b *Bucket) Reset() { + b.Points = b.Points[:0] + b.Count = 0 +} + +// Next returns the next bucket. +func (b *Bucket) Next() *Bucket { + return b.next +} + +// Iterator iterates the buckets within the window. +type Iterator struct { + count int + iteratedCount int + cur *Bucket +} + +// Next returns true util all of the buckets has been iterated. +func (i *Iterator) Next() bool { + return i.count != i.iteratedCount +} + +// Bucket gets current bucket. +func (i *Iterator) Bucket() Bucket { + if !(i.Next()) { + panic(fmt.Errorf("stat/metric: iteration out of range iteratedCount: %d count: %d", i.iteratedCount, i.count)) + } + bucket := *i.cur + i.iteratedCount++ + i.cur = i.cur.Next() + return bucket +} + +// Window contains multiple buckets. +type Window struct { + buckets []Bucket + size int +} + +// Options contains the arguments for creating Window. +type Options struct { + Size int +} + +// NewWindow creates a new Window based on WindowOpts. +func NewWindow(opts Options) *Window { + buckets := make([]Bucket, opts.Size) + for offset := range buckets { + buckets[offset].Points = make([]float64, 0) + nextOffset := offset + 1 + if nextOffset == opts.Size { + nextOffset = 0 + } + buckets[offset].next = &buckets[nextOffset] + } + return &Window{buckets: buckets, size: opts.Size} +} + +// ResetWindow empties all buckets within the window. +func (w *Window) ResetWindow() { + for offset := range w.buckets { + w.ResetBucket(offset) + } +} + +// ResetBucket empties the bucket based on the given offset. +func (w *Window) ResetBucket(offset int) { + w.buckets[offset%w.size].Reset() +} + +// ResetBuckets empties the buckets based on the given offsets. +func (w *Window) ResetBuckets(offset int, count int) { + for i := 0; i < count; i++ { + w.ResetBucket(offset + i) + } +} + +// Append appends the given value to the bucket where index equals the given offset. +func (w *Window) Append(offset int, val float64) { + w.buckets[offset%w.size].Append(val) +} + +// Add adds the given value to the latest point within bucket where index equals the given offset. +func (w *Window) Add(offset int, val float64) { + offset %= w.size + if w.buckets[offset].Count == 0 { + w.buckets[offset].Append(val) + return + } + w.buckets[offset].Add(0, val) +} + +// Bucket returns the bucket where index equals the given offset. +func (w *Window) Bucket(offset int) Bucket { + return w.buckets[offset%w.size] +} + +// Size returns the size of the window. +func (w *Window) Size() int { + return w.size +} + +// Iterator returns the count number buckets iterator from offset. +func (w *Window) Iterator(offset int, count int) Iterator { + return Iterator{ + count: count, + cur: &w.buckets[offset%w.size], + } +} diff --git a/pkg/window/window_test.go b/pkg/window/window_test.go new file mode 100644 index 00000000000..0205aae47a3 --- /dev/null +++ b/pkg/window/window_test.go @@ -0,0 +1,101 @@ +// The MIT License (MIT) +// Copyright (c) 2022 go-kratos Project Authors. +// +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWindowResetWindow(t *testing.T) { + re := require.New(t) + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetWindow() + for i := 0; i < opts.Size; i++ { + re.Equal(len(window.Bucket(i).Points), 0) + } +} + +func TestWindowResetBucket(t *testing.T) { + re := require.New(t) + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetBucket(1) + re.Equal(len(window.Bucket(1).Points), 0) + re.Equal(window.Bucket(0).Points[0], float64(1.0)) + re.Equal(window.Bucket(2).Points[0], float64(1.0)) +} + +func TestWindowResetBuckets(t *testing.T) { + re := require.New(t) + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetBuckets(0, 3) + for i := 0; i < opts.Size; i++ { + re.Equal(len(window.Bucket(i).Points), 0) + } +} + +func TestWindowAppend(t *testing.T) { + re := require.New(t) + opts := Options{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + for i := 1; i < opts.Size; i++ { + window.Append(i, 2.0) + } + for i := 0; i < opts.Size; i++ { + re.Equal(window.Bucket(i).Points[0], float64(1.0)) + } + for i := 1; i < opts.Size; i++ { + re.Equal(window.Bucket(i).Points[1], float64(2.0)) + } +} + +func TestWindowAdd(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + window.Append(0, 1.0) + window.Add(0, 1.0) + assert.Equal(t, window.Bucket(0).Points[0], float64(2.0)) + + window = NewWindow(opts) + window.Add(0, 1.0) + window.Add(0, 1.0) + assert.Equal(t, window.Bucket(0).Points[0], float64(2.0)) +} + +func TestWindowSize(t *testing.T) { + opts := Options{Size: 3} + window := NewWindow(opts) + assert.Equal(t, window.Size(), 3) +}