Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XHTTP XMUX: Add hMaxRequestTimes and hKeepAlivePeriod #4163

Merged
merged 2 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,23 @@ type SplitHTTPConfig struct {
NoSSEHeader bool `json:"noSSEHeader"`
ScMaxEachPostBytes Int32Range `json:"scMaxEachPostBytes"`
ScMinPostsIntervalMs Int32Range `json:"scMinPostsIntervalMs"`
ScMaxBufferedPosts int64 `json:"scMaxConcurrentPosts"`
KeepAlivePeriod int64 `json:"keepAlivePeriod"`
Xmux Xmux `json:"xmux"`
ScMaxBufferedPosts int64 `json:"scMaxBufferedPosts"`
Xmux XmuxConfig `json:"xmux"`
DownloadSettings *StreamConfig `json:"downloadSettings"`
Extra json.RawMessage `json:"extra"`
}

type Xmux struct {
MaxConcurrency Int32Range `json:"maxConcurrency"`
MaxConnections Int32Range `json:"maxConnections"`
CMaxReuseTimes Int32Range `json:"cMaxReuseTimes"`
CMaxLifetimeMs Int32Range `json:"cMaxLifetimeMs"`
type XmuxConfig struct {
MaxConcurrency Int32Range `json:"maxConcurrency"`
MaxConnections Int32Range `json:"maxConnections"`
CMaxReuseTimes Int32Range `json:"cMaxReuseTimes"`
CMaxLifetimeMs Int32Range `json:"cMaxLifetimeMs"`
HMaxRequestTimes Int32Range `json:"hMaxRequestTimes"`
HKeepAlivePeriod int64 `json:"hKeepAlivePeriod"`
}

func splithttpNewRandRangeConfig(input Int32Range) *splithttp.RandRangeConfig {
return &splithttp.RandRangeConfig{
func newRangeConfig(input Int32Range) *splithttp.RangeConfig {
return &splithttp.RangeConfig{
From: input.From,
To: input.To,
}
Expand Down Expand Up @@ -281,33 +282,33 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
if c.Xmux.MaxConnections.To > 0 && c.Xmux.MaxConcurrency.To > 0 {
return nil, errors.New("maxConnections cannot be specified together with maxConcurrency")
}
if c.Xmux.MaxConcurrency.To == 0 &&
c.Xmux.MaxConnections.To == 0 &&
c.Xmux.CMaxReuseTimes.To == 0 &&
c.Xmux.CMaxLifetimeMs.To == 0 {
if c.Xmux == (XmuxConfig{}) {
c.Xmux.MaxConcurrency.From = 16
c.Xmux.MaxConcurrency.To = 32
c.Xmux.CMaxReuseTimes.From = 64
c.Xmux.CMaxReuseTimes.To = 128
c.Xmux.HMaxRequestTimes.From = 800
c.Xmux.HMaxRequestTimes.To = 900
}

config := &splithttp.Config{
Host: c.Host,
Path: c.Path,
Mode: c.Mode,
Headers: c.Headers,
XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes),
XPaddingBytes: newRangeConfig(c.XPaddingBytes),
NoGRPCHeader: c.NoGRPCHeader,
NoSSEHeader: c.NoSSEHeader,
ScMaxEachPostBytes: splithttpNewRandRangeConfig(c.ScMaxEachPostBytes),
ScMinPostsIntervalMs: splithttpNewRandRangeConfig(c.ScMinPostsIntervalMs),
ScMaxEachPostBytes: newRangeConfig(c.ScMaxEachPostBytes),
ScMinPostsIntervalMs: newRangeConfig(c.ScMinPostsIntervalMs),
ScMaxBufferedPosts: c.ScMaxBufferedPosts,
KeepAlivePeriod: c.KeepAlivePeriod,
Xmux: &splithttp.Multiplexing{
MaxConcurrency: splithttpNewRandRangeConfig(c.Xmux.MaxConcurrency),
MaxConnections: splithttpNewRandRangeConfig(c.Xmux.MaxConnections),
CMaxReuseTimes: splithttpNewRandRangeConfig(c.Xmux.CMaxReuseTimes),
CMaxLifetimeMs: splithttpNewRandRangeConfig(c.Xmux.CMaxLifetimeMs),
Xmux: &splithttp.XmuxConfig{
MaxConcurrency: newRangeConfig(c.Xmux.MaxConcurrency),
MaxConnections: newRangeConfig(c.Xmux.MaxConnections),
CMaxReuseTimes: newRangeConfig(c.Xmux.CMaxReuseTimes),
CMaxLifetimeMs: newRangeConfig(c.Xmux.CMaxLifetimeMs),
HMaxRequestTimes: newRangeConfig(c.Xmux.HMaxRequestTimes),
HKeepAlivePeriod: c.Xmux.HKeepAlivePeriod,
},
}

Expand Down
4 changes: 4 additions & 0 deletions transport/internet/splithttp/browser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
// has no fields because everything is global state :O)
type BrowserDialerClient struct{}

func (c *BrowserDialerClient) IsClosed() bool {
panic("not implemented yet")
}

func (c *BrowserDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
panic("not implemented yet")
}
Expand Down
22 changes: 21 additions & 1 deletion transport/internet/splithttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

// interface to abstract between use of browser dialer, vs net/http
type DialerClient interface {
IsClosed() bool

// (ctx, baseURL, payload) -> err
// baseURL already contains sessionId and seq
SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error
Expand All @@ -39,12 +41,17 @@ type DialerClient interface {
type DefaultDialerClient struct {
transportConfig *Config
client *http.Client
closed bool
httpVersion string
// pool of net.Conn, created using dialUploadConn
uploadRawPool *sync.Pool
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
}

func (c *DefaultDialerClient) IsClosed() bool {
return c.closed
}

func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
reader, writer := io.Pipe()
req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader)
Expand All @@ -59,6 +66,8 @@ func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.Writ
if err != nil {
errors.LogInfoInner(ctx, err, "failed to open ", pureURL)
} else {
// c.closed = true
response.Body.Close()
errors.LogInfo(ctx, "unexpected status ", response.StatusCode)
}
wrc.Close()
Expand All @@ -76,7 +85,14 @@ func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io
if !c.transportConfig.NoGRPCHeader {
req.Header.Set("Content-Type", "application/grpc")
}
go c.client.Do(req)
go func() {
if resp, err := c.client.Do(req); err == nil {
if resp.StatusCode != 200 {
// c.closed = true
}
resp.Body.Close()
}
}()
return writer
}

Expand Down Expand Up @@ -130,6 +146,7 @@ func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string)
}

if response.StatusCode != 200 {
// c.closed = true
response.Body.Close()
errors.LogInfo(ctx, "invalid status code on download:", response.Status)
gotDownResponse.Close()
Expand Down Expand Up @@ -180,6 +197,7 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
defer resp.Body.Close()

if resp.StatusCode != 200 {
// c.closed = true
return errors.New("bad status code:", resp.Status)
}
} else {
Expand Down Expand Up @@ -214,6 +232,8 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
return fmt.Errorf("error while reading response: %s", err.Error())
}
if resp.StatusCode != 200 {
// c.closed = true
// resp.Body.Close() // I'm not sure
return fmt.Errorf("got non-200 error response code: %d", resp.StatusCode)
}
}
Expand Down
47 changes: 29 additions & 18 deletions transport/internet/splithttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *Config) GetNormalizedQuery() string {
query += "&"
}

paddingLen := c.GetNormalizedXPaddingBytes().roll()
paddingLen := c.GetNormalizedXPaddingBytes().rand()
if paddingLen > 0 {
query += "x_padding=" + strings.Repeat("0", int(paddingLen))
}
Expand All @@ -58,7 +58,7 @@ func (c *Config) WriteResponseHeader(writer http.ResponseWriter) {
// CORS headers for the browser dialer
writer.Header().Set("Access-Control-Allow-Origin", "*")
writer.Header().Set("Access-Control-Allow-Methods", "GET, POST")
paddingLen := c.GetNormalizedXPaddingBytes().roll()
paddingLen := c.GetNormalizedXPaddingBytes().rand()
if paddingLen > 0 {
writer.Header().Set("X-Padding", strings.Repeat("0", int(paddingLen)))
}
Expand All @@ -72,9 +72,9 @@ func (c *Config) GetNormalizedScMaxBufferedPosts() int {
return int(c.ScMaxBufferedPosts)
}

func (c *Config) GetNormalizedScMaxEachPostBytes() RandRangeConfig {
func (c *Config) GetNormalizedScMaxEachPostBytes() RangeConfig {
if c.ScMaxEachPostBytes == nil || c.ScMaxEachPostBytes.To == 0 {
return RandRangeConfig{
return RangeConfig{
From: 1000000,
To: 1000000,
}
Expand All @@ -83,9 +83,9 @@ func (c *Config) GetNormalizedScMaxEachPostBytes() RandRangeConfig {
return *c.ScMaxEachPostBytes
}

func (c *Config) GetNormalizedScMinPostsIntervalMs() RandRangeConfig {
func (c *Config) GetNormalizedScMinPostsIntervalMs() RangeConfig {
if c.ScMinPostsIntervalMs == nil || c.ScMinPostsIntervalMs.To == 0 {
return RandRangeConfig{
return RangeConfig{
From: 30,
To: 30,
}
Expand All @@ -94,9 +94,9 @@ func (c *Config) GetNormalizedScMinPostsIntervalMs() RandRangeConfig {
return *c.ScMinPostsIntervalMs
}

func (c *Config) GetNormalizedXPaddingBytes() RandRangeConfig {
func (c *Config) GetNormalizedXPaddingBytes() RangeConfig {
if c.XPaddingBytes == nil || c.XPaddingBytes.To == 0 {
return RandRangeConfig{
return RangeConfig{
From: 100,
To: 1000,
}
Expand All @@ -105,9 +105,20 @@ func (c *Config) GetNormalizedXPaddingBytes() RandRangeConfig {
return *c.XPaddingBytes
}

func (m *Multiplexing) GetNormalizedCMaxReuseTimes() RandRangeConfig {
func (m *XmuxConfig) GetNormalizedCMaxRequestTimes() RangeConfig {
if m.HMaxRequestTimes == nil {
return RangeConfig{
From: 0,
To: 0,
}
}

return *m.HMaxRequestTimes
}

func (m *XmuxConfig) GetNormalizedCMaxReuseTimes() RangeConfig {
if m.CMaxReuseTimes == nil {
return RandRangeConfig{
return RangeConfig{
From: 0,
To: 0,
}
Expand All @@ -116,19 +127,19 @@ func (m *Multiplexing) GetNormalizedCMaxReuseTimes() RandRangeConfig {
return *m.CMaxReuseTimes
}

func (m *Multiplexing) GetNormalizedCMaxLifetimeMs() RandRangeConfig {
if m.CMaxLifetimeMs == nil || m.CMaxLifetimeMs.To == 0 {
return RandRangeConfig{
func (m *XmuxConfig) GetNormalizedCMaxLifetimeMs() RangeConfig {
if m.CMaxLifetimeMs == nil {
return RangeConfig{
From: 0,
To: 0,
}
}
return *m.CMaxLifetimeMs
}

func (m *Multiplexing) GetNormalizedMaxConnections() RandRangeConfig {
func (m *XmuxConfig) GetNormalizedMaxConnections() RangeConfig {
if m.MaxConnections == nil {
return RandRangeConfig{
return RangeConfig{
From: 0,
To: 0,
}
Expand All @@ -137,9 +148,9 @@ func (m *Multiplexing) GetNormalizedMaxConnections() RandRangeConfig {
return *m.MaxConnections
}

func (m *Multiplexing) GetNormalizedMaxConcurrency() RandRangeConfig {
func (m *XmuxConfig) GetNormalizedMaxConcurrency() RangeConfig {
if m.MaxConcurrency == nil {
return RandRangeConfig{
return RangeConfig{
From: 0,
To: 0,
}
Expand All @@ -154,7 +165,7 @@ func init() {
}))
}

func (c RandRangeConfig) roll() int32 {
func (c RangeConfig) rand() int32 {
if c.From == c.To {
return c.From
}
Expand Down
Loading
Loading