Skip to content

Commit

Permalink
produce: adds -literal to use stdin line as value and key as null #26.
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Jun 22, 2016
1 parent 5562909 commit 7eb894f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
31 changes: 23 additions & 8 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ type produceConfig struct {
batch int
timeout time.Duration
verbose bool
literal bool
partitioner string
args struct {
topic string
brokers string
batch int
timeout time.Duration
verbose bool
literal bool
partitioner string
}
}
Expand Down Expand Up @@ -70,6 +72,12 @@ func produceFlags() *flag.FlagSet {
false,
"Verbose output",
)
flags.BoolVar(
&config.produce.args.literal,
"literal",
false,
"Interpret stdin line literally and pass it as value, key as null.",
)
flags.StringVar(
&config.produce.args.partitioner,
"partitioner",
Expand Down Expand Up @@ -158,6 +166,7 @@ func produceParseArgs() {
config.produce.batch = config.produce.args.batch
config.produce.timeout = config.produce.args.timeout
config.produce.verbose = config.produce.args.verbose
config.produce.literal = config.produce.args.literal
}

func mustFindLeaders() map[int32]*sarama.Broker {
Expand Down Expand Up @@ -274,15 +283,21 @@ func deserializeLines(wg *sync.WaitGroup, in chan string, out chan message, part
return
}
var msg message
if err := json.Unmarshal([]byte(l), &msg); err != nil {
if config.produce.verbose {
fmt.Printf("Failed to unmarshal input [%v], falling back to defaults. err=%v\n", l, err)
}
var v *string = &l
if len(l) == 0 {
v = nil

switch {
case config.produce.literal:
msg.Value = &l
default:
if err := json.Unmarshal([]byte(l), &msg); err != nil {
if config.produce.verbose {
fmt.Printf("Failed to unmarshal input [%v], falling back to defaults. err=%v\n", l, err)
}
var v *string = &l
if len(l) == 0 {
v = nil
}
msg = message{Key: nil, Value: v}
}
msg = message{Key: nil, Value: v}
}

var p int32 = 0
Expand Down
18 changes: 16 additions & 2 deletions produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
)

func TestHashCode(t *testing.T) {
Expand Down Expand Up @@ -209,26 +211,37 @@ func TestDeserializeLines(t *testing.T) {
config.produce.partitioner = "hashCode"
data := []struct {
in string
literal bool
partitionCount int32
expected message
}{
{
in: "",
literal: false,
partitionCount: 1,
expected: newMessage("", "", 0),
},
{
in: `{"key":"hans","value":"123"}`,
literal: false,
partitionCount: 4,
expected: newMessage("hans", "123", hashCodePartition("hans", 4)),
},
{
in: `{"key":"hans","value":"123","partition":1}`,
literal: false,
partitionCount: 3,
expected: newMessage("hans", "123", 1),
},
{
in: `{"other":"json","values":"avail"}`,
literal: true,
partitionCount: 4,
expected: newMessage("", `{"other":"json","values":"avail"}`, 0),
},
{
in: `so lange schon`,
literal: false,
partitionCount: 3,
expected: newMessage("", "so lange schon", 0),
},
Expand All @@ -238,15 +251,16 @@ func TestDeserializeLines(t *testing.T) {
var wg sync.WaitGroup
in := make(chan string, 1)
out := make(chan message)
config.produce.literal = d.literal
go deserializeLines(&wg, in, out, d.partitionCount)
in <- d.in

select {
case <-time.After(50 * time.Millisecond):
t.Errorf("did not receive output in time")
case actual := <-out:
if !reflect.DeepEqual(d.expected, actual) {
t.Errorf("\nexpected %#v\nactual %#v", d.expected, actual)
if !(reflect.DeepEqual(d.expected, actual)) {
t.Errorf(spew.Sprintf("\nexpected %#v\nactual %#v", d.expected, actual))
}
}
}
Expand Down

0 comments on commit 7eb894f

Please sign in to comment.