Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
Chart repo instances implement a new method Start()
Browse files Browse the repository at this point in the history
This change propagates stop channel to repo catalog and to all repos
instantiated by it. The main goal is to make sure the refresh background
job refreshing chart repo index is in full control of the main
goroutine.

Chart repo refreshIndex got a protection mechanism against 0-entries
response returned from chart repo. This is a fairly impossible situation
which was observed in a real life runtime. The strategy Shipper takes in
this case is to return an error if a former index fetch was non-empty.

Minor change: chart repo FetchVersions doesn't fetch chart tarballs
anymore. This was a loosy strategy which looked like an extra bit of
confidence (motivation was like: if we can't fetch a resolved version,
there is probably a data corruption so we fall back to the highest
resolvable observed version). In reality, it makes the process to be
non-deterministic and frankly it shouldn't be there at first.

Some chart repo fetch tests became obsolete due to the fact tarballs are
no longer downloaded, therefore dropped.

Signed-off-by: Oleg Sidorov <[email protected]>
  • Loading branch information
Oleg Sidorov committed Aug 19, 2019
1 parent ff75c31 commit 2f3cc10
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 65 deletions.
1 change: 1 addition & 0 deletions cmd/shipper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func main() {
repoCatalog := repo.NewCatalog(
repo.DefaultFileCacheFactory(*chartCacheDir),
repo.DefaultRemoteFetcher,
stopCh,
)

cfg := &cfg{
Expand Down
5 changes: 4 additions & 1 deletion pkg/chart/repo/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ type Catalog struct {
factory CacheFactory
repos map[string]*Repo
fetcher RemoteFetcher
stopCh <-chan struct{}
sync.Mutex
}

func NewCatalog(factory CacheFactory, fetcher RemoteFetcher) *Catalog {
func NewCatalog(factory CacheFactory, fetcher RemoteFetcher, stopCh <-chan struct{}) *Catalog {
return &Catalog{
factory: factory,
repos: make(map[string]*Repo),
fetcher: fetcher,
stopCh: stopCh,
}
}

Expand All @@ -77,6 +79,7 @@ func (c *Catalog) CreateRepoIfNotExist(repoURL string) (*Repo, error) {
return nil, err
}
c.repos[name] = repo
go repo.Start(c.stopCh)
}

return repo, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/chart/repo/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ func TestCreateRepoIfNotExist(t *testing.T) {

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
c := NewCatalog(testCase.factory, func(_ string) ([]byte, error) {
return []byte{}, nil
})
}, stopCh)
_, err := c.CreateRepoIfNotExist(testCase.url)
if (err == nil && testCase.err != nil) ||
(err != nil && testCase.err == nil) ||
Expand Down
53 changes: 20 additions & 33 deletions pkg/chart/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,23 @@ func NewRepo(repoURL string, cache Cache, fetcher RemoteFetcher) (*Repo, error)
parsed.Path = path.Join(parsed.Path, "index.yaml")
indexURL := parsed.String()

repo := &Repo{
r := &Repo{
repoURL: repoURL,
indexURL: indexURL,
cache: cache,
fetcher: fetcher,
indexResolved: make(chan struct{}),
}

// runs repo.refreshIndex forever
go wait.Forever(func() {
if err := repo.refreshIndex(); err != nil {
glog.Errorf("failed to refresh repo %q index: %s", repo.repoURL, err)
}
}, RepoIndexRefreshPeriod)
return r, nil
}

return repo, nil
func (r *Repo) Start(stopCh <-chan struct{}) {
wait.Until(func() {
if err := r.refreshIndex(); err != nil {
glog.Errorf("failed to refresh repo %q index: %s", r.repoURL, err)
}
}, RepoIndexRefreshPeriod, stopCh)
}

func (r *Repo) refreshIndex() error {
Expand All @@ -90,6 +91,15 @@ func (r *Repo) refreshIndex() error {
)
}

oldindex, ok := r.index.Load().(*repo.IndexFile)
if ok && oldindex != nil {
if len(oldindex.Entries) != 0 && len(index.Entries) == 0 {
return shippererrors.NewChartRepoIndexError(
fmt.Errorf("the new index contains no entries whereas the previous fetch returned a non-empty result"),
)
}
}

r.index.Store(index)

// close indexResolved once
Expand All @@ -113,30 +123,7 @@ func (r *Repo) ResolveVersion(chartspec *shipper.Chart) (*repo.ChartVersion, err
return nil, repo.ErrNoChartVersion
}

var highestver *repo.ChartVersion
var lasterr error

for _, ver := range versions {
if _, lasterr = r.LoadCached(ver); lasterr == nil {
highestver = ver
break
}
if _, lasterr = r.FetchRemote(ver); lasterr == nil {
highestver = ver
break
}
}
if highestver == nil {
if lasterr == nil {
lasterr = repo.ErrNoChartVersion
}
return nil, shippererrors.NewChartVersionResolveError(
chartspec,
lasterr,
)
}

return highestver, nil
return versions[0], nil
}

func (r *Repo) FetchChartVersions(chartspec *shipper.Chart) (repo.ChartVersions, error) {
Expand Down Expand Up @@ -290,7 +277,7 @@ func loadIndexData(data []byte) (*repo.IndexFile, error) {
i.SortEntries()
if i.APIVersion == "" {
// do not support pre-v2.0.0
return i, repo.ErrNoAPIVersion
return nil, repo.ErrNoAPIVersion
}

return i, nil
Expand Down
58 changes: 28 additions & 30 deletions pkg/chart/repo/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ entries:
- https://charts.example.com/non-existing-0.0.2.tgz
version: 0.0.2
generated: 2016-10-06T16:23:20.499029981-06:00
`

IndexYamlRespNoCharts = `
---
apiVersion: v1
entries:
generated: 2016-10-06T16:23:20.499029981-06:00
`

repoURL = "https://registry.example.com/charts"
Expand Down Expand Up @@ -122,6 +129,22 @@ func TestRefreshIndex(t *testing.T) {
expectedFetchURL: repoURL + "/index.yaml",
expectedErr: nil,
},
{
name: "Empty response",
fetchBody: "",
fetchErr: nil,
repoURL: repoURL,
expectedFetchURL: repoURL + "/index.yaml",
expectedErr: fmt.Errorf("failed to get chart repo index: failed to load index file: no index content"),
},
{
name: "no charts in valid response",
fetchBody: IndexYamlRespNoCharts,
fetchErr: nil,
repoURL: repoURL,
expectedFetchURL: repoURL + "/index.yaml",
expectedErr: nil,
},
}

for _, testCase := range tests {
Expand Down Expand Up @@ -206,14 +229,14 @@ func TestResolveVersion(t *testing.T) {
"Existing dual version >= function applied",
"nginx",
">=0.0.1",
"0.0.2",
"0.0.3",
nil,
},
{
"Existing dual version > function applied",
"nginx",
">0.0.1",
"0.0.2",
"0.0.3",
nil,
},
{
Expand All @@ -230,34 +253,6 @@ func TestResolveVersion(t *testing.T) {
"0.0.1",
nil,
},
{
"Failed fetch version downgrade to a lower one",
"nginx",
"<=0.0.3",
"0.0.2",
nil,
},
{
"Failed fetch version exact match",
"nginx",
"0.0.3",
"",
fmt.Errorf("failed to resolve chart version [name: \"nginx\", version: \"0.0.3\", repo: \"https://charts.example.com\"]: failed to read file \"nginx-0.0.3.tgz\": open testdata/nginx-0.0.3.tgz: no such file or directory"),
},
{
"Faield fetch version a lower one is in the cache",
"non-existing",
"<=0.0.2",
"0.0.1",
nil,
},
{
"Failed fetch version exact match with lower version available in cache",
"non-existing",
"0.0.2",
"",
fmt.Errorf("failed to resolve chart version [name: \"non-existing\", version: \"0.0.2\", repo: \"https://charts.example.com\"]: failed to read file \"non-existing-0.0.2.tgz\": open testdata/non-existing-0.0.2.tgz: no such file or directory"),
},
}

t.Parallel()
Expand Down Expand Up @@ -421,6 +416,9 @@ func TestConcurrentFetchChartVersionsRefreshesIndexOnce(t *testing.T) {
if err != nil {
t.Fatalf("failed to initialize repo: %s", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
go repo.Start(stopCh)

// Chart contents doesn't really matter
chartspec := &shipper.Chart{
Expand Down

0 comments on commit 2f3cc10

Please sign in to comment.