diff --git a/default_dial_option_server_option_test.go b/default_dial_option_server_option_test.go index 67ea9e80da8c..444a8ddeed13 100644 --- a/default_dial_option_server_option_test.go +++ b/default_dial_option_server_option_test.go @@ -146,6 +146,10 @@ func (s) TestJoinDialOption(t *testing.T) { if cc.dopts.copts.InitialWindowSize != initialWindowSize { t.Fatalf("Unexpected cc.dopts.copts.InitialWindowSize: %d != %d", cc.dopts.copts.InitialWindowSize, initialWindowSize) } + // Make sure static window size is not enabled when using WithInitialWindowSize. + if cc.dopts.copts.StaticWindowSize { + t.Fatalf("Unexpected cc.dopts.copts.StaticWindowSize: %t", cc.dopts.copts.StaticWindowSize) + } } // TestJoinServerOption tests the join server option. It configures a joined @@ -162,6 +166,10 @@ func (s) TestJoinServerOption(t *testing.T) { if s.opts.initialWindowSize != initialWindowSize { t.Fatalf("Unexpected s.opts.initialWindowSize: %d != %d", s.opts.initialWindowSize, initialWindowSize) } + // Make sure static window size is not enabled when using InitialWindowSize. + if s.opts.staticWindowSize { + t.Fatalf("Unexpected s.opts.staticWindowSize: %t", s.opts.staticWindowSize) + } } // funcTestHeaderListSizeDialOptionServerOption tests diff --git a/dialoptions.go b/dialoptions.go index 7a5ac2e7c494..8ac56ee7f1ab 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -210,10 +210,13 @@ func WithReadBufferSize(s int) DialOption { // WithInitialWindowSize returns a DialOption which sets the value for initial // window size on a stream. The lower bound for window size is 64K and any value // smaller than that will be ignored. +// +// Deprecated: use WithInitialStreamWindowSize to set a stream window size without disabling +// dynamic flow control. +// Will be supported throughout 1.x. func WithInitialWindowSize(s int32) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.InitialWindowSize = s - o.copts.StaticWindowSize = true }) } @@ -223,10 +226,17 @@ func WithInitialWindowSize(s int32) DialOption { func WithInitialConnWindowSize(s int32) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.InitialConnWindowSize = s - o.copts.StaticWindowSize = true }) } +// WithInitialStreamWindowSize returns a DialOption which sets the value for +// a initial window size on a stream. The lower bound for window size is 64K +// and any value smaller than that will be ignored. Importantly, this does +// not disable dynamic flow control. +func WithInitialStreamWindowSize(s int32) DialOption { + return WithInitialWindowSize(s) +} + // WithStaticStreamWindowSize returns a DialOption which sets the initial // stream window size to the value provided and disables dynamic flow control. func WithStaticStreamWindowSize(s int32) DialOption { diff --git a/server.go b/server.go index ddd377341191..8411ffc58127 100644 --- a/server.go +++ b/server.go @@ -278,10 +278,13 @@ func ReadBufferSize(s int) ServerOption { // InitialWindowSize returns a ServerOption that sets window size for stream. // The lower bound for window size is 64K and any value smaller than that will be ignored. +// +// Deprecated: use InitialStreamWindowSize to set a stream window size without disabling +// dynamic flow control. +// Will be supported throughout 1.x. func InitialWindowSize(s int32) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.initialWindowSize = s - o.staticWindowSize = true }) } @@ -290,10 +293,16 @@ func InitialWindowSize(s int32) ServerOption { func InitialConnWindowSize(s int32) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.initialConnWindowSize = s - o.staticWindowSize = true }) } +// InitialStreamWindowSize returns a ServerOption that sets the window size for a stream. +// THe lower bound for a window size is 64K, and any value smaller than that will be ignored. +// Importantly, this does not disable dynamic flow control. +func InitialStreamWindowSize(s int32) ServerOption { + return InitialWindowSize(s) +} + // StaticStreamWindowSize returns a ServerOption to set the initial stream // window size to the value provided and disables dynamic flow control. // The lower bound for window size is 64K and any value smaller than that diff --git a/test/channelz_test.go b/test/channelz_test.go index 93847126dbe4..8de0be6e69f1 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -1061,6 +1061,10 @@ func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) { e := tcpClearRREnv te := newTest(t, e) + // Before behavior change in PR #8665, large window sizes set + // using InitialWindowSize disabled BDP by default. Post the + // behavior change, we have to explicitly disable BDP. + te.serverStaticWindowSize = true te.serverInitialWindowSize = 65536 // Avoid overflowing connection level flow control window, which will lead to // transport being closed. @@ -1145,7 +1149,9 @@ func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *t func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { e := tcpClearRREnv te := newTest(t, e) - // disable BDP + // disable BDP explicitly. + te.serverStaticWindowSize = true + te.clientStaticWindowSize = true te.serverInitialWindowSize = 65536 te.serverInitialConnWindowSize = 65536 te.clientInitialWindowSize = 65536 diff --git a/test/end2end_test.go b/test/end2end_test.go index 9157c525c094..1cd68a1a463b 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -490,6 +490,7 @@ type test struct { unaryServerInt grpc.UnaryServerInterceptor streamServerInt grpc.StreamServerInterceptor serverInitialWindowSize int32 + serverStaticWindowSize bool serverInitialConnWindowSize int32 customServerOptions []grpc.ServerOption @@ -509,6 +510,7 @@ type test struct { unaryClientInt grpc.UnaryClientInterceptor streamClientInt grpc.StreamClientInterceptor clientInitialWindowSize int32 + clientStaticWindowSize bool clientInitialConnWindowSize int32 perRPCCreds credentials.PerRPCCredentials customDialOptions []grpc.DialOption @@ -605,10 +607,15 @@ func (te *test) listenAndServe(ts testgrpc.TestServiceServer, listen func(networ if te.unknownHandler != nil { sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler)) } - if te.serverInitialWindowSize > 0 { - sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) + if te.serverStaticWindowSize && te.serverInitialWindowSize > 0 { + sopts = append(sopts, grpc.StaticStreamWindowSize(te.serverInitialWindowSize)) + } else if te.serverInitialWindowSize > 0 { + sopts = append(sopts, grpc.InitialStreamWindowSize(te.serverInitialWindowSize)) } - if te.serverInitialConnWindowSize > 0 { + + if te.serverStaticWindowSize && te.serverInitialConnWindowSize > 0 { + sopts = append(sopts, grpc.StaticConnWindowSize(te.serverInitialConnWindowSize)) + } else if te.serverInitialConnWindowSize > 0 { sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) } la := ":0" @@ -816,10 +823,14 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) if te.e.balancer != "" { opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer))) } - if te.clientInitialWindowSize > 0 { - opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) + if te.clientStaticWindowSize && te.clientInitialWindowSize > 0 { + opts = append(opts, grpc.WithStaticStreamWindowSize(te.clientInitialWindowSize)) + } else if te.clientInitialWindowSize > 0 { + opts = append(opts, grpc.WithInitialStreamWindowSize(te.clientInitialWindowSize)) } - if te.clientInitialConnWindowSize > 0 { + if te.clientStaticWindowSize && te.clientInitialConnWindowSize > 0 { + opts = append(opts, grpc.WithStaticConnWindowSize(te.clientInitialConnWindowSize)) + } else if te.clientInitialConnWindowSize > 0 { opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) } if te.perRPCCreds != nil { @@ -5374,18 +5385,25 @@ func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) { } type windowSizeConfig struct { - serverStream int32 - serverConn int32 - clientStream int32 - clientConn int32 + serverStaticWindowSize bool + clientStaticWindowSize bool + serverStream int32 + serverConn int32 + clientStream int32 + clientConn int32 } func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { + // Before behavior change in PR #8665, large window sizes set + // using WithInitialWindowSize disabled BDP by default. Post the + // behavior change, we have to explicitly disable BDP. wc := windowSizeConfig{ - serverStream: 8 * 1024 * 1024, - serverConn: 12 * 1024 * 1024, - clientStream: 6 * 1024 * 1024, - clientConn: 8 * 1024 * 1024, + serverStaticWindowSize: true, + clientStaticWindowSize: true, + serverStream: 8 * 1024 * 1024, + serverConn: 12 * 1024 * 1024, + clientStream: 6 * 1024 * 1024, + clientConn: 8 * 1024 * 1024, } for _, e := range listTestEnv() { testConfigurableWindowSize(t, e, wc) @@ -5406,6 +5424,8 @@ func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) { func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { te := newTest(t, e) + te.serverStaticWindowSize = wc.serverStaticWindowSize + te.clientStaticWindowSize = wc.clientStaticWindowSize te.serverInitialWindowSize = wc.serverStream te.serverInitialConnWindowSize = wc.serverConn te.clientInitialWindowSize = wc.clientStream diff --git a/test/stream_cleanup_test.go b/test/stream_cleanup_test.go index 600fb1fadc21..0facf149c6bf 100644 --- a/test/stream_cleanup_test.go +++ b/test/stream_cleanup_test.go @@ -48,7 +48,9 @@ func (s) TestStreamCleanup(t *testing.T) { return &testpb.Empty{}, nil }, } - if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil { + + // Use a static flow control window. + if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() @@ -81,7 +83,8 @@ func (s) TestStreamCleanupAfterSendStatus(t *testing.T) { }) }, } - if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil { + // Use a static flow control window. + if err := ss.Start(nil, grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop()