Skip to content

Commit

Permalink
produce messages partitioned by value
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuan Xie committed Sep 15, 2020
1 parent bd68672 commit 6460f61
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (cmd *produceCmd) read(as []string) produceArgs {
flags.BoolVar(&args.literal, "literal", false, "Interpret stdin line literally and pass it as value, key as null.")
flags.StringVar(&args.version, "version", "", "Kafka protocol version")
flags.StringVar(&args.compression, "compression", "", "Kafka message compression codec [gzip|snappy|lz4] (defaults to none)")
flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode")
flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode, hashCodeByValue")
flags.StringVar(&args.decodeKey, "decodekey", "string", "Decode message value as (string|hex|base64), defaults to string.")
flags.StringVar(&args.decodeValue, "decodevalue", "string", "Decode message value as (string|hex|base64), defaults to string.")
flags.IntVar(&args.bufferSize, "buffersize", 16777216, "Buffer size for scanning stdin, defaults to 16777216=16*1024*1024.")
Expand Down Expand Up @@ -320,11 +320,16 @@ func (cmd *produceCmd) deserializeLines(in chan string, out chan message, partit
}

var part int32 = 0
if msg.Key != nil && cmd.partitioner == "hashCode" {
part = hashCodePartition(*msg.Key, partitionCount)
}
if msg.Partition == nil {
if msg.Value != nil && cmd.partitioner == "hashCodeByValue" {
part = hashCodePartition(*msg.Value, partitionCount)
msg.Partition = &part
}else {
if msg.Key != nil && cmd.partitioner == "hashCode" {
part = hashCodePartition(*msg.Key, partitionCount)
}
if msg.Partition == nil {
msg.Partition = &part
}
}

out <- msg
Expand Down

0 comments on commit 6460f61

Please sign in to comment.