diff --git a/.gitignore b/.gitignore index 3fa5ddc..750beeb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /kt /quickfix +.idea/* +.vscode/* \ No newline at end of file diff --git a/go.mod b/go.mod index 023a007..1bc8e61 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.21 require ( github.com/IBM/sarama v1.41.1 github.com/davecgh/go-spew v1.1.1 + github.com/stretchr/testify v1.5.1 + golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 github.com/markusmobius/go-dateparser v1.2.1 github.com/stretchr/testify v1.8.4 golang.org/x/crypto v0.13.0 diff --git a/produce.go b/produce.go index 6ff2809..2e05ebb 100644 --- a/produce.go +++ b/produce.go @@ -55,7 +55,7 @@ func (cmd *produceCmd) read(as []string) produceArgs { flags.BoolVar(&args.literal, "literal", false, "Interpret stdin line literally and pass it as value, key as null.") flags.StringVar(&args.version, "version", "", "Kafka protocol version") flags.StringVar(&args.compression, "compression", "", "Kafka message compression codec [gzip|snappy|lz4] (defaults to none)") - flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode") + flags.StringVar(&args.partitioner, "partitioner", "", "Optional partitioner to use. Available: hashCode, hashCodeByValue") flags.StringVar(&args.decodeKey, "decodekey", "string", "Decode message value as (string|hex|base64), defaults to string.") flags.StringVar(&args.decodeValue, "decodevalue", "string", "Decode message value as (string|hex|base64), defaults to string.") flags.IntVar(&args.bufferSize, "buffersize", 16777216, "Buffer size for scanning stdin, defaults to 16777216=16*1024*1024.") @@ -346,11 +346,16 @@ func (cmd *produceCmd) deserializeLines(in chan string, out chan message, partit } var part int32 = 0 - if msg.Key != nil && cmd.partitioner == "hashCode" { - part = hashCodePartition(*msg.Key, partitionCount) - } - if msg.Partition == nil { + if msg.Value != nil && cmd.partitioner == "hashCodeByValue" { + part = hashCodePartition(*msg.Value, partitionCount) msg.Partition = &part + }else { + if msg.Key != nil && cmd.partitioner == "hashCode" { + part = hashCodePartition(*msg.Key, partitionCount) + } + if msg.Partition == nil { + msg.Partition = &part + } } out <- msg @@ -545,6 +550,12 @@ like the following: In case the input line cannot be interpeted as a JSON object the key and value both default to the input line and partition to 0. +If you don't want to specify key for single message, in other words, it doesn't matter that a message goes +to a random paritition (with equal probability), you can set the flag '-partitioner' with 'hashCodeByValue'. +That will tell kt to take the value of a message to calculate a hashcode deciding which paritition it will go to. +This can be helpful when you just want there are many messages distributed in partitions of a topic, and don't +care about what the content is. + Examples: Send a single message with a specific key: @@ -555,6 +566,11 @@ Send a single message with a specific key: $ kt consume -topic greetings -timeout 1s -offsets 0:3- {"partition":0,"offset":3,"key":"id-23","message":"ola"} +Send a single message without specified key: + $ echo 'no key specified message' | kt produce -topic greetings -partitioner hashCodeByValue + Sent message to a partition decided by your case + + Keep reading input from stdin until interrupted (via ^C). $ kt produce -topic greetings