Skip to content

Commit

Permalink
WIP: compression
Browse files Browse the repository at this point in the history
  • Loading branch information
deniszh committed Sep 26, 2024
1 parent 8f50d3c commit e9c5e35
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/facebookgo/grace v0.0.0-20180706040059-75cf19382434
github.com/facebookgo/pidfile v0.0.0-20150612191647-f242e2999868
github.com/google/go-cmp v0.6.0
github.com/klauspost/compress v1.17.9
github.com/libp2p/go-reuseport v0.4.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.2
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/conf/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Host struct {
GRPC bool `toml:"grpc"`
// Optional, number of TCP connections to open to the host.
MTCP int `toml:"mtcp"`
// Optional, compressor to use, default - no compression
Compression string `toml:"compression"`
}

// ReadClustersConfig reads clusters set from a reader.
Expand Down
36 changes: 27 additions & 9 deletions pkg/target/hostMTCP.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package target
import (
"bufio"
"fmt"
"github.com/klauspost/compress/gzip"

Check failure on line 6 in pkg/target/hostMTCP.go

View workflow job for this annotation

GitHub Actions / test (1.21.x)

cannot find module providing package github.com/klauspost/compress/gzip: import lookup disabled by -mod=vendor
"github.com/klauspost/compress/s2"

Check failure on line 7 in pkg/target/hostMTCP.go

View workflow job for this annotation

GitHub Actions / test (1.21.x)

cannot find module providing package github.com/klauspost/compress/s2: import lookup disabled by -mod=vendor
"github.com/klauspost/compress/snappy"

Check failure on line 8 in pkg/target/hostMTCP.go

View workflow job for this annotation

GitHub Actions / test (1.21.x)

cannot find module providing package github.com/klauspost/compress/snappy: import lookup disabled by -mod=vendor
"math/rand"
"net"
"strconv"
Expand All @@ -20,12 +23,13 @@ import (

// HostMTCP represents a single target hosts to send records in multimple TCP connections to.
type HostMTCP struct {
Name string
Port uint16
Ch chan *rec.RecBytes
Available atomic.Bool
NumMTCP int
MTCPs []MultiConnection
Name string
Port uint16
Ch chan *rec.RecBytes
Available atomic.Bool
NumMTCP int
MTCPs []MultiConnection
Compression string

stop chan int

Expand Down Expand Up @@ -58,12 +62,25 @@ type MultiConnection struct {
Available atomic.Bool
}

func newCompressedWriter(c net.Conn, bufSize int, typ string) *bufio.Writer {
switch typ {
case "s2":
return bufio.NewWriterSize(s2.NewWriter(c), bufSize)
case "snappy":
return bufio.NewWriterSize(snappy.NewBufferedWriter(c), bufSize)
case "gzip":
return bufio.NewWriterSize(gzip.NewWriter(c), bufSize)
default:
return bufio.NewWriterSize(c, bufSize)
}
}

// New or updated target connection from existing net.Conn
// Requires Connection.Mutex lock
func (c *MultiConnection) New(n net.Conn, bufSize int) {
func (c *MultiConnection) New(n net.Conn, bufSize int, compressor string) {
c.Conn = n
c.LastConnUse = time.Now()
c.W = bufio.NewWriterSize(n, bufSize)
c.W = newCompressedWriter(n, bufSize, compressor)
}

// Close the connection while mainaining correct internal state.
Expand Down Expand Up @@ -123,6 +140,7 @@ func ConstructHostMTCP(clusterName string, mainCfg conf.Main, hostCfg conf.Host,
}
h.NumMTCP = hostCfg.MTCP
h.MTCPs = make([]MultiConnection, h.NumMTCP)
h.Compression = hostCfg.Compression

h.rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
h.Ms = ms
Expand Down Expand Up @@ -338,7 +356,7 @@ func (h *HostMTCP) connect(c *MultiConnection, attemptCount int) {
}
return
}
c.New(conn, h.conf.TCPOutBufSize)
c.New(conn, h.conf.TCPOutBufSize, h.Compression)
c.Available.Store(true)
h.setHostAvailability()
}
Expand Down
5 changes: 5 additions & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,15 @@ github.com/json-iterator/go
# github.com/klauspost/compress v1.17.9
## explicit; go 1.20
github.com/klauspost/compress
github.com/klauspost/compress/flate
github.com/klauspost/compress/fse
github.com/klauspost/compress/gzip
github.com/klauspost/compress/huff0
github.com/klauspost/compress/internal/cpuinfo
github.com/klauspost/compress/internal/race
github.com/klauspost/compress/internal/snapref
github.com/klauspost/compress/s2
github.com/klauspost/compress/snappy
github.com/klauspost/compress/zstd
github.com/klauspost/compress/zstd/internal/xxhash
# github.com/kylelemons/godebug v1.1.0
Expand Down

0 comments on commit e9c5e35

Please sign in to comment.