From 7eb894fa6e63dcc331bebe27909188bc6787ed1b Mon Sep 17 00:00:00 2001 From: Felix Geller Date: Wed, 22 Jun 2016 21:46:14 +1200 Subject: [PATCH] produce: adds -literal to use stdin line as value and key as null #26. --- produce.go | 31 +++++++++++++++++++++++-------- produce_test.go | 18 ++++++++++++++++-- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/produce.go b/produce.go index 6df0da6..9ff413a 100644 --- a/produce.go +++ b/produce.go @@ -21,6 +21,7 @@ type produceConfig struct { batch int timeout time.Duration verbose bool + literal bool partitioner string args struct { topic string @@ -28,6 +29,7 @@ type produceConfig struct { batch int timeout time.Duration verbose bool + literal bool partitioner string } } @@ -70,6 +72,12 @@ func produceFlags() *flag.FlagSet { false, "Verbose output", ) + flags.BoolVar( + &config.produce.args.literal, + "literal", + false, + "Interpret stdin line literally and pass it as value, key as null.", + ) flags.StringVar( &config.produce.args.partitioner, "partitioner", @@ -158,6 +166,7 @@ func produceParseArgs() { config.produce.batch = config.produce.args.batch config.produce.timeout = config.produce.args.timeout config.produce.verbose = config.produce.args.verbose + config.produce.literal = config.produce.args.literal } func mustFindLeaders() map[int32]*sarama.Broker { @@ -274,15 +283,21 @@ func deserializeLines(wg *sync.WaitGroup, in chan string, out chan message, part return } var msg message - if err := json.Unmarshal([]byte(l), &msg); err != nil { - if config.produce.verbose { - fmt.Printf("Failed to unmarshal input [%v], falling back to defaults. err=%v\n", l, err) - } - var v *string = &l - if len(l) == 0 { - v = nil + + switch { + case config.produce.literal: + msg.Value = &l + default: + if err := json.Unmarshal([]byte(l), &msg); err != nil { + if config.produce.verbose { + fmt.Printf("Failed to unmarshal input [%v], falling back to defaults. err=%v\n", l, err) + } + var v *string = &l + if len(l) == 0 { + v = nil + } + msg = message{Key: nil, Value: v} } - msg = message{Key: nil, Value: v} } var p int32 = 0 diff --git a/produce_test.go b/produce_test.go index 908b75d..f82282d 100644 --- a/produce_test.go +++ b/produce_test.go @@ -6,6 +6,8 @@ import ( "sync" "testing" "time" + + "github.com/davecgh/go-spew/spew" ) func TestHashCode(t *testing.T) { @@ -209,26 +211,37 @@ func TestDeserializeLines(t *testing.T) { config.produce.partitioner = "hashCode" data := []struct { in string + literal bool partitionCount int32 expected message }{ { in: "", + literal: false, partitionCount: 1, expected: newMessage("", "", 0), }, { in: `{"key":"hans","value":"123"}`, + literal: false, partitionCount: 4, expected: newMessage("hans", "123", hashCodePartition("hans", 4)), }, { in: `{"key":"hans","value":"123","partition":1}`, + literal: false, partitionCount: 3, expected: newMessage("hans", "123", 1), }, + { + in: `{"other":"json","values":"avail"}`, + literal: true, + partitionCount: 4, + expected: newMessage("", `{"other":"json","values":"avail"}`, 0), + }, { in: `so lange schon`, + literal: false, partitionCount: 3, expected: newMessage("", "so lange schon", 0), }, @@ -238,6 +251,7 @@ func TestDeserializeLines(t *testing.T) { var wg sync.WaitGroup in := make(chan string, 1) out := make(chan message) + config.produce.literal = d.literal go deserializeLines(&wg, in, out, d.partitionCount) in <- d.in @@ -245,8 +259,8 @@ func TestDeserializeLines(t *testing.T) { case <-time.After(50 * time.Millisecond): t.Errorf("did not receive output in time") case actual := <-out: - if !reflect.DeepEqual(d.expected, actual) { - t.Errorf("\nexpected %#v\nactual %#v", d.expected, actual) + if !(reflect.DeepEqual(d.expected, actual)) { + t.Errorf(spew.Sprintf("\nexpected %#v\nactual %#v", d.expected, actual)) } } }