diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index ccd6f1de..e940b017 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -321,8 +321,13 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s } g := storage.TransGlobalStore{} dtmimp.MustUnmarshal(v, &g) + if !((condition.Status == "" || g.Status == condition.Status) && + (condition.TransType == "" || g.TransType == condition.TransType) && + (condition.CreateTimeStart.IsZero() || g.CreateTime.After(condition.CreateTimeStart)) && + (condition.CreateTimeEnd.IsZero() || g.CreateTime.Before(condition.CreateTimeEnd))) { + continue + } globals = append(globals, g) - // todo condition if len(globals) == int(limit) { break } diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 1a356200..9b80d2dc 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -80,8 +80,12 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s for _, v := range values { global := storage.TransGlobalStore{} dtmimp.MustUnmarshalString(v.(string), &global) - // todo condition - globals = append(globals, global) + if (condition.Status == "" || global.Status == condition.Status) && + (condition.TransType == "" || global.TransType == condition.TransType) && + (condition.CreateTimeStart.IsZero() || global.CreateTime.After(condition.CreateTimeStart)) && + (condition.CreateTimeEnd.IsZero() || global.CreateTime.Before(condition.CreateTimeEnd)) { + globals = append(globals, global) + } if len(globals) == int(limit) { break } diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index bcbbbb1b..3a390cb9 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -58,8 +58,22 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64, condition s if *position != "" { lid = dtmimp.MustAtoi(*position) } - // todo condition - dbr := dbGet().Must().Where("id < ?", lid).Order("id desc").Limit(int(limit)).Find(&globals) + query := dbGet().Must().Where("id < ?", lid) + if condition.Status != "" { + query = query.Where("status = ?", condition.Status) + } + if condition.TransType != "" { + query = query.Where("trans_type = ?", condition.TransType) + } + if !condition.CreateTimeStart.IsZero() { + query = query.Where("create_time >= ?", condition.CreateTimeStart.Format("2006-01-02 15:04:05")) + } + if !condition.CreateTimeEnd.IsZero() { + query = query.Where("create_time <= ?", condition.CreateTimeEnd.Format("2006-01-02 15:04:05")) + } + + dbr := query.Order("id desc").Limit(int(limit)).Find(&globals) + if dbr.RowsAffected < limit { *position = "" } else { diff --git a/test/api_test.go b/test/api_test.go index 5755e0d4..16702b73 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -11,6 +11,7 @@ import ( "net/http" "strconv" "testing" + "time" "github.com/dtm-labs/dtm/client/dtmcli" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -51,12 +52,15 @@ func TestAPIQuery(t *testing.T) { } func TestAPIAll(t *testing.T) { + startTime := time.Now() for i := 0; i < 3; i++ { // add three gid := dtmimp.GetFuncName() + fmt.Sprintf("%d", i) err := genMsg(gid).Submit() assert.Nil(t, err) waitTransProcessed(gid) } + endTime := time.Now() + resp, err := dtmcli.GetRestyClient().R().SetQueryParam("limit", "1").Get(dtmutil.DefaultHTTPServer + "/all") assert.Nil(t, err) m := map[string]interface{}{} @@ -92,6 +96,40 @@ func TestAPIAll(t *testing.T) { assert.Equal(t, "", nextPos3) // assert.Equal(t, 2, len(m["transactions"].([]interface{}))) // the left 2. + // filter test + resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{ + "limit": "10", + "status": "succeed", + "transType": "msg", + "createTimeStart": strconv.Itoa(int(startTime.Add(time.Minute*-1).Unix() * 1000)), + "createTimeEnd": strconv.Itoa(int(endTime.Add(time.Minute*1).Unix() * 1000)), + }).Get(dtmutil.DefaultHTTPServer + "/all") + assert.Nil(t, err) + dtmimp.MustUnmarshalString(resp.String(), &m) + nextPos1 := m["next_position"].(string) + // assert.Equal(t, 3, len(m["transactions"].([]interface{}))) + assert.GreaterOrEqual(t, len(m["transactions"].([]interface{})), 3) // Be disturbed by something else test case, so use >=3 instead of =3. + assert.Empty(t, nextPos1) // is over + for _, item := range m["transactions"].([]interface{}) { + g := item.(map[string]interface{}) + assert.Equal(t, "msg", g["trans_type"]) + assert.Equal(t, "succeed", g["status"]) + } + + // filter, five minutes ago + resp, err = dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{ + "limit": "10", + "status": "succeed", + "transType": "msg", + "createTimeStart": strconv.Itoa(int(startTime.Add(time.Minute*-10).Unix() * 1000)), + "createTimeEnd": strconv.Itoa(int(endTime.Add(time.Minute*-5).Unix() * 1000)), + }).Get(dtmutil.DefaultHTTPServer + "/all") + assert.Nil(t, err) + dtmimp.MustUnmarshalString(resp.String(), &m) + nextPos1 = m["next_position"].(string) + assert.Equal(t, 0, len(m["transactions"].([]interface{}))) + assert.Empty(t, nextPos1) // is over + //fmt.Printf("pos1:%s,pos2:%s,pos3:%s", nextPos, nextPos2, nextPos3) }