Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (s) TestJoinDialOption(t *testing.T) {
if cc.dopts.copts.InitialWindowSize != initialWindowSize {
t.Fatalf("Unexpected cc.dopts.copts.InitialWindowSize: %d != %d", cc.dopts.copts.InitialWindowSize, initialWindowSize)
}
// Make sure static window size is not enabled when using WithInitialWindowSize.
if cc.dopts.copts.StaticWindowSize {
t.Fatalf("Unexpected cc.dopts.copts.StaticWindowSize: %t", cc.dopts.copts.StaticWindowSize)
}
}

// TestJoinServerOption tests the join server option. It configures a joined
Expand All @@ -162,6 +166,10 @@ func (s) TestJoinServerOption(t *testing.T) {
if s.opts.initialWindowSize != initialWindowSize {
t.Fatalf("Unexpected s.opts.initialWindowSize: %d != %d", s.opts.initialWindowSize, initialWindowSize)
}
// Make sure static window size is not enabled when using InitialWindowSize.
if s.opts.staticWindowSize {
t.Fatalf("Unexpected s.opts.staticWindowSize: %t", s.opts.staticWindowSize)
}
}

// funcTestHeaderListSizeDialOptionServerOption tests
Expand Down
14 changes: 12 additions & 2 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,13 @@ func WithReadBufferSize(s int) DialOption {
// WithInitialWindowSize returns a DialOption which sets the value for initial
// window size on a stream. The lower bound for window size is 64K and any value
// smaller than that will be ignored.
//
// Deprecated: use WithInitialStreamWindowSize to set a stream window size without disabling
// dynamic flow control.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please make sure the comments are restricted to 80 cols?

// Will be supported throughout 1.x.
func WithInitialWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.StaticWindowSize = true
})
}

Expand All @@ -223,10 +226,17 @@ func WithInitialWindowSize(s int32) DialOption {
func WithInitialConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.StaticWindowSize = true
})
}

// WithInitialStreamWindowSize returns a DialOption which sets the value for
// a initial window size on a stream. The lower bound for window size is 64K
// and any value smaller than that will be ignored. Importantly, this does
// not disable dynamic flow control.
Comment on lines +232 to +235
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// WithInitialStreamWindowSize returns a DialOption which sets the value for
// a initial window size on a stream. The lower bound for window size is 64K
// and any value smaller than that will be ignored. Importantly, this does
// not disable dynamic flow control.
// WithInitialStreamWindowSize returns a DialOption which sets the value for
// a initial window size on a stream without disabling dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.

func WithInitialStreamWindowSize(s int32) DialOption {
return WithInitialWindowSize(s)
}

// WithStaticStreamWindowSize returns a DialOption which sets the initial
// stream window size to the value provided and disables dynamic flow control.
func WithStaticStreamWindowSize(s int32) DialOption {
Expand Down
13 changes: 11 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,13 @@ func ReadBufferSize(s int) ServerOption {

// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
//
// Deprecated: use InitialStreamWindowSize to set a stream window size without disabling
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ensure comments are restricted to 80 cols.

// dynamic flow control.
// Will be supported throughout 1.x.
func InitialWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.staticWindowSize = true
})
}

Expand All @@ -290,10 +293,16 @@ func InitialWindowSize(s int32) ServerOption {
func InitialConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.staticWindowSize = true
})
}

// InitialStreamWindowSize returns a ServerOption that sets the window size for a stream.
// THe lower bound for a window size is 64K, and any value smaller than that will be ignored.
// Importantly, this does not disable dynamic flow control.
Comment on lines +299 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// InitialStreamWindowSize returns a ServerOption that sets the window size for a stream.
// THe lower bound for a window size is 64K, and any value smaller than that will be ignored.
// Importantly, this does not disable dynamic flow control.
// InitialStreamWindowSize returns a ServerOption that sets the window size for
// a stream without disabling dynamic flow control. The lower bound for a window
// size is 64K, and any value smaller than that will be ignored.

func InitialStreamWindowSize(s int32) ServerOption {
return InitialWindowSize(s)
}

// StaticStreamWindowSize returns a ServerOption to set the initial stream
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
Expand Down
8 changes: 7 additions & 1 deletion test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,10 @@ func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
e := tcpClearRREnv
te := newTest(t, e)
// Before behavior change in PR #8665, large window sizes set
// using InitialWindowSize disabled BDP by default. Post the
// behavior change, we have to explicitly disable BDP.
te.serverStaticWindowSize = true
te.serverInitialWindowSize = 65536
// Avoid overflowing connection level flow control window, which will lead to
// transport being closed.
Expand Down Expand Up @@ -1145,7 +1149,9 @@ func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *t
func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
e := tcpClearRREnv
te := newTest(t, e)
// disable BDP
// disable BDP explicitly.
te.serverStaticWindowSize = true
te.clientStaticWindowSize = true
te.serverInitialWindowSize = 65536
te.serverInitialConnWindowSize = 65536
te.clientInitialWindowSize = 65536
Expand Down
48 changes: 34 additions & 14 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ type test struct {
unaryServerInt grpc.UnaryServerInterceptor
streamServerInt grpc.StreamServerInterceptor
serverInitialWindowSize int32
serverStaticWindowSize bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name seems misleading. serverStaticWindowSize indicates that it should take size for static window as input whereas it takes a boolean. We could either change the variable name to indicate it is boolean, maybe something like isServerWindowStatic or staticServerWindow or something better. OR we could keep the same name and change the logic to actually take the window size and make sure if it is set we use the grpc.StaticStreamWindowSize option. Personally , the second option seems better to me.

serverInitialConnWindowSize int32
customServerOptions []grpc.ServerOption

Expand All @@ -509,6 +510,7 @@ type test struct {
unaryClientInt grpc.UnaryClientInterceptor
streamClientInt grpc.StreamClientInterceptor
clientInitialWindowSize int32
clientStaticWindowSize bool
clientInitialConnWindowSize int32
perRPCCreds credentials.PerRPCCredentials
customDialOptions []grpc.DialOption
Expand Down Expand Up @@ -605,10 +607,15 @@ func (te *test) listenAndServe(ts testgrpc.TestServiceServer, listen func(networ
if te.unknownHandler != nil {
sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
}
if te.serverInitialWindowSize > 0 {
sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
if te.serverStaticWindowSize && te.serverInitialWindowSize > 0 {
sopts = append(sopts, grpc.StaticStreamWindowSize(te.serverInitialWindowSize))
} else if te.serverInitialWindowSize > 0 {
sopts = append(sopts, grpc.InitialStreamWindowSize(te.serverInitialWindowSize))
}
if te.serverInitialConnWindowSize > 0 {

if te.serverStaticWindowSize && te.serverInitialConnWindowSize > 0 {
sopts = append(sopts, grpc.StaticConnWindowSize(te.serverInitialConnWindowSize))
} else if te.serverInitialConnWindowSize > 0 {
sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
}
la := ":0"
Expand Down Expand Up @@ -816,10 +823,14 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
if te.e.balancer != "" {
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer)))
}
if te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
if te.clientStaticWindowSize && te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithStaticStreamWindowSize(te.clientInitialWindowSize))
} else if te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithInitialStreamWindowSize(te.clientInitialWindowSize))
}
if te.clientInitialConnWindowSize > 0 {
if te.clientStaticWindowSize && te.clientInitialConnWindowSize > 0 {
opts = append(opts, grpc.WithStaticConnWindowSize(te.clientInitialConnWindowSize))
} else if te.clientInitialConnWindowSize > 0 {
opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
}
if te.perRPCCreds != nil {
Expand Down Expand Up @@ -5374,18 +5385,25 @@ func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) {
}

type windowSizeConfig struct {
serverStream int32
serverConn int32
clientStream int32
clientConn int32
serverStaticWindowSize bool
clientStaticWindowSize bool
Comment on lines +5388 to +5389
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar concern with the variable names here.

serverStream int32
serverConn int32
clientStream int32
clientConn int32
}

func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could also add tests to verify the new behavior i.e. not disable BDP estimation?

// Before behavior change in PR #8665, large window sizes set
// using WithInitialWindowSize disabled BDP by default. Post the
// behavior change, we have to explicitly disable BDP.
wc := windowSizeConfig{
serverStream: 8 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
serverStaticWindowSize: true,
clientStaticWindowSize: true,
serverStream: 8 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
}
for _, e := range listTestEnv() {
testConfigurableWindowSize(t, e, wc)
Expand All @@ -5406,6 +5424,8 @@ func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {

func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
te := newTest(t, e)
te.serverStaticWindowSize = wc.serverStaticWindowSize
te.clientStaticWindowSize = wc.clientStaticWindowSize
te.serverInitialWindowSize = wc.serverStream
te.serverInitialConnWindowSize = wc.serverConn
te.clientInitialWindowSize = wc.clientStream
Expand Down
7 changes: 5 additions & 2 deletions test/stream_cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func (s) TestStreamCleanup(t *testing.T) {
return &testpb.Empty{}, nil
},
}
if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {

// Use a static flow control window.
if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
Expand Down Expand Up @@ -81,7 +83,8 @@ func (s) TestStreamCleanupAfterSendStatus(t *testing.T) {
})
},
}
if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
// Use a static flow control window.
if err := ss.Start(nil, grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
Expand Down