diff --git a/consumer/shard.go b/consumer/shard.go index ab79790d..710b9768 100644 --- a/consumer/shard.go +++ b/consumer/shard.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "reflect" "sync" "time" @@ -244,7 +245,7 @@ func servePrimary(s *shard) (err error) { func waitAndTearDown(s *shard, done func()) { s.wg.Wait() - if s.store != nil { + if s.store != nil && !reflect.ValueOf(s.store).IsNil() { s.store.Destroy() } done() diff --git a/consumer/shard_test.go b/consumer/shard_test.go index 8dc15678..06f5f85c 100644 --- a/consumer/shard_test.go +++ b/consumer/shard_test.go @@ -156,6 +156,19 @@ func TestShardAppNewStoreError(t *testing.T) { tf.allocateShard(makeShard(shardA)) // Cleanup. } +func TestShardAppNilJSONStore(t *testing.T) { + var tf, cleanup = newTestFixture(t) + defer cleanup() + + tf.app.nilStore = true + tf.allocateShard(makeShard(shardA), localID) + + assert.Equal(t, "completeRecovery: app.NewStore: state must not be nil", + expectStatusCode(t, tf.state, pc.ReplicaStatus_FAILED).Errors[0]) + + tf.allocateShard(makeShard(shardA)) // Cleanup. +} + func TestShardAppNewMessageFails(t *testing.T) { var tf, cleanup = newTestFixture(t) defer cleanup() diff --git a/consumer/store_json_file.go b/consumer/store_json_file.go index 003b12a7..76619a9e 100644 --- a/consumer/store_json_file.go +++ b/consumer/store_json_file.go @@ -31,8 +31,11 @@ var _ Store = &JSONFileStore{} // JSONFileStore is-a Store. // NewJSONFileStore returns a new JSONFileStore. |state| is the runtime instance // of the Store's state, which is decoded into, encoded from, and retained -// as JSONFileState.State. +// as JSONFileState.State. |state| must not be nil. func NewJSONFileStore(rec *recoverylog.Recorder, state interface{}) (*JSONFileStore, error) { + if state == nil { + return nil, errors.New("state must not be nil") + } var store = &JSONFileStore{ State: state, fs: recoverylog.RecordedAferoFS{Recorder: rec, Fs: afero.NewOsFs()}, diff --git a/consumer/test_support_test.go b/consumer/test_support_test.go index 46a72c5e..88151476 100644 --- a/consumer/test_support_test.go +++ b/consumer/test_support_test.go @@ -83,6 +83,7 @@ type testApplication struct { finalizeErr error // Error returned by Application.FinalizeTxn(). startCommitErr error // Error returned by Store.StartCommit(). restoreCheckpointErr error // Error returned by Store.RestoreCheckpoint(). + nilStore bool // Whether the application initializes its store in Application.NewStore(). finishedCh chan OpFuture // Signaled on FinishedTxn(). db *sql.DB // "Remote" sqlite database. } @@ -114,6 +115,8 @@ func (a *testApplication) NewStore(shard Shard, rec *recoverylog.Recorder) (Stor return &errStore{app: a}, nil } else if rec == nil { return NewSQLStore(a.db), nil + } else if a.nilStore { + return NewJSONFileStore(rec, nil) } else { var state = make(map[string]string) return NewJSONFileStore(rec, &state)