Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
const (
defaultQueueSize = 1000
defaultMaxPacketSize = 65000
defaultBufferSize = 4 * 1024
defaultServerWorkers = 10

jaegerModel Model = "jaeger"
Expand Down Expand Up @@ -90,6 +91,7 @@ type ProcessorConfiguration struct {
type ServerConfiguration struct {
QueueSize int `yaml:"queueSize"`
MaxPacketSize int `yaml:"maxPacketSize"`
BufferSize int `yaml:"bufferSize"`
HostPort string `yaml:"hostPort" validate:"nonzero"`
}

Expand Down Expand Up @@ -188,6 +190,7 @@ func (c *ProcessorConfiguration) applyDefaults() {
func (c *ServerConfiguration) applyDefaults() {
c.QueueSize = defaultInt(c.QueueSize, defaultQueueSize)
c.MaxPacketSize = defaultInt(c.MaxPacketSize, defaultMaxPacketSize)
c.BufferSize = defaultInt(c.BufferSize, defaultBufferSize)
}

// getUDPServer gets a TBufferedServer backed server using the server configuration
Expand All @@ -197,7 +200,7 @@ func (c *ServerConfiguration) getUDPServer(mFactory metrics.Factory) (servers.Se
if c.HostPort == "" {
return nil, fmt.Errorf("no host:port provided for udp server: %+v", *c)
}
transport, err := thriftudp.NewTUDPServerTransport(c.HostPort)
transport, err := thriftudp.NewTUDPServerTransport(c.HostPort, c.BufferSize)
if err != nil {
return nil, fmt.Errorf("cannot create UDPServerTransport: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
suffixWorkers = "workers"
suffixServerQueueSize = "server-queue-size"
suffixServerMaxPacketSize = "server-max-packet-size"
suffixServerBufferSize = "server-buffer-size"
suffixServerHostPort = "server-host-port"
httpServerHostPort = "http-server.host-port"
)
Expand All @@ -50,6 +51,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(prefix+suffixWorkers, defaultServerWorkers, "how many workers the processor should run")
flags.Int(prefix+suffixServerQueueSize, defaultQueueSize, "length of the queue for the UDP server")
flags.Int(prefix+suffixServerMaxPacketSize, defaultMaxPacketSize, "max packet size for the UDP server")
flags.Int(prefix+suffixServerBufferSize, defaultBufferSize, "buffer size for UDP packets")
flags.String(prefix+suffixServerHostPort, ":"+strconv.Itoa(p.port), "host:port for the UDP server")
}
flags.String(
Expand All @@ -66,6 +68,7 @@ func (b *Builder) InitFromViper(v *viper.Viper) *Builder {
p.Workers = v.GetInt(prefix + suffixWorkers)
p.Server.QueueSize = v.GetInt(prefix + suffixServerQueueSize)
p.Server.MaxPacketSize = v.GetInt(prefix + suffixServerMaxPacketSize)
p.Server.BufferSize = v.GetInt(prefix + suffixServerBufferSize)
p.Server.HostPort = portNumToHostPort(v.GetString(prefix + suffixServerHostPort))
b.Processors = append(b.Processors, *p)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/processors/thrift_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
)

func createProcessor(t *testing.T, mFactory metrics.Factory, tFactory thrift.TProtocolFactory, handler AgentProcessor) (string, Processor) {
transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0")
transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0", thriftudp.BufferSize)
require.NoError(t, err)

queueSize := 10
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/servers/tbuffered_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestTBufferedServer(t *testing.T) {
func testTBufferedServer(t *testing.T, queueSize int, testDroppedPackets bool) {
metricsFactory := metricstest.NewFactory(0)

transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0")
transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0", thriftudp.BufferSize)
require.NoError(t, err)

maxPacketSize := 65000
Expand Down
17 changes: 17 additions & 0 deletions cmd/agent/app/servers/thriftudp/socket_buffer_linux_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// +build linux darwin

package thriftudp

import (
"net"
"syscall"
)

func setSocketBuffer(conn *net.UDPConn, bufferSize int) error {
file, err := conn.File()
if err != nil {
return err
}

return syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bufferSize)
}
12 changes: 12 additions & 0 deletions cmd/agent/app/servers/thriftudp/socket_buffer_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// +build windows

package thriftudp

import (
"net"
)

// Not supported on windows, so windows version just returns nil
func setSocketBuffer(_ *net.UDPConn, _ int) error {
return nil
}
12 changes: 10 additions & 2 deletions cmd/agent/app/servers/thriftudp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
)

//MaxLength of UDP packet
const MaxLength = 65000
const (
MaxLength = 65000
BufferSize = 4 * 1024
)

var errConnAlreadyClosed = errors.New("connection already closed")

Expand Down Expand Up @@ -71,7 +74,7 @@ func createClient(destAddr, locAddr *net.UDPAddr) (*TUDPTransport, error) {
// It will listen for incoming udp packets on the specified host/port
// Example:
// trans, err := thriftudp.NewTUDPClientTransport("localhost:9001")
func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) {
func NewTUDPServerTransport(hostPort string, bufferSize int) (*TUDPTransport, error) {
addr, err := net.ResolveUDPAddr("udp", hostPort)
if err != nil {
return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
Expand All @@ -80,6 +83,11 @@ func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) {
if err != nil {
return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
}

if err = setSocketBuffer(conn, bufferSize); err != nil {
return nil, err
}

return &TUDPTransport{addr: conn.LocalAddr(), conn: conn}, nil
}

Expand Down
16 changes: 8 additions & 8 deletions cmd/agent/app/servers/thriftudp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ func TestNewTUDPClientTransport(t *testing.T) {
}

func TestNewTUDPServerTransport(t *testing.T) {
_, err := NewTUDPServerTransport("fakeAddressAndPort")
_, err := NewTUDPServerTransport("fakeAddressAndPort", BufferSize)
require.NotNil(t, err)

trans, err := NewTUDPServerTransport(localListenAddr.String())
trans, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
require.True(t, trans.IsOpen())
require.Equal(t, ^uint64(0), trans.RemainingBytes())

//Ensure a second server can't be created on the same address
trans2, err := NewTUDPServerTransport(trans.Addr().String())
trans2, err := NewTUDPServerTransport(trans.Addr().String(), BufferSize)
if trans2 != nil {
//close the second server if one got created
trans2.Close()
Expand All @@ -77,10 +77,10 @@ func TestNewTUDPServerTransport(t *testing.T) {
}

func TestTUDPServerTransportIsOpen(t *testing.T) {
_, err := NewTUDPServerTransport("fakeAddressAndPort")
_, err := NewTUDPServerTransport("fakeAddressAndPort", BufferSize)
require.NotNil(t, err)

trans, err := NewTUDPServerTransport(localListenAddr.String())
trans, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
require.True(t, trans.IsOpen())
require.Equal(t, ^uint64(0), trans.RemainingBytes())
Expand All @@ -107,7 +107,7 @@ func TestTUDPServerTransportIsOpen(t *testing.T) {
}

func TestWriteRead(t *testing.T) {
server, err := NewTUDPServerTransport(localListenAddr.String())
server, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
defer server.Close()

Expand All @@ -133,7 +133,7 @@ func TestWriteRead(t *testing.T) {
}

func TestDoubleCloseError(t *testing.T) {
trans, err := NewTUDPServerTransport(localListenAddr.String())
trans, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
require.True(t, trans.IsOpen())

Expand All @@ -149,7 +149,7 @@ func TestDoubleCloseError(t *testing.T) {
}

func TestConnClosedReadWrite(t *testing.T) {
trans, err := NewTUDPServerTransport(localListenAddr.String())
trans, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
require.True(t, trans.IsOpen())
require.NoError(t, trans.Close())
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe
github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ=
github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
github.com/gocql/gocql v0.0.0-20200226121155-e5c8c1f505c5 h1:FDQYpzoJWwYzJ0pOMU+RqUFqT3N4BfCBGey9rP5708c=
github.com/gocql/gocql v0.0.0-20200226121155-e5c8c1f505c5/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb h1:H3tisfjQwq9FTyWqlKsZpgoYrsvn2pmTWvAiDHa5pho=
github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gogo/googleapis v1.0.1-0.20180501115203-b23578765ee5 h1:l3BMcdrtdBYa5PH99FBrPEWJGRODZFOjxHPnb2I7/98=
Expand Down