Skip to content

SSE connection closed prematurely #14090

@xuchenCN

Description

@xuchenCN

Environment:

NGINX Ingress controller
  Release:       v1.13.3
  Build:         93851f05e61d99eea49140c9be73499a3cb92ccc
  Repository:    https://github.com/kubernetes/ingress-nginx
  nginx version: nginx/1.27.1

What happened:
The server outputs the current time every second for half an hour

Client is a python code to issue a POST request

Test result :

Client -> SVC -> POD all good

Client -> Ingress -> SVC -> POD closed prematurely error log ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

Client program duration 813.37s. (The duration of running the same code is not same)

Multi different started client closed at same time.

Server code

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

func main() {
	http.HandleFunc("/service-large-test-2/v1/chat/completions", completionsHandler)

	port := ":8080"
	log.Printf("Server starting on port %s", port)
	log.Fatal(http.ListenAndServe(port, nil))
}

func completionsHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Access-Control-Allow-Origin", "*")
	w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
	w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")

	if r.Method == "OPTIONS" {
		w.WriteHeader(http.StatusOK)
		return
	}

	if r.Method != "POST" {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Access-Control-Allow-Origin", "*")
	w.Header().Set("X-Accel-Buffering", "no")
	//w.Header().Set("Transfer-Encoding", "chunked")
	log.Print(w.Header())

	w.WriteHeader(http.StatusOK)
	flusher, ok := w.(http.Flusher)
	if !ok {
		w.Write([]byte("error to get flusher"))
		return
	}

	duration := 30 * time.Minute
	endTime := time.Now().Add(duration)

	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	counter := 0
	for {
		select {
		case <-ticker.C:
			counter++
			currentTime := time.Now().Format("15:04:05")
			message := fmt.Sprintf("data: #%d, time: %s\n\n",
				counter, currentTime)

			fmt.Fprintf(w, message)
			flusher.Flush()

			if time.Now().After(endTime) {
				log.Print(w, "end stream: %d", counter)
				fmt.Fprintf(w, "data: [DONE]\n\n")
				flusher.Flush()
				return
			}

		case <-r.Context().Done():
			log.Print("client interrupted the connection")
			return
		}
	}
}

Client code

import requests
import json
import sys
import time 
from urllib3.exceptions import InvalidChunkLength
from requests.exceptions import ConnectionError


start_time = time.time()


requests.packages.urllib3.disable_warnings()

url = "https://10.110.158.100/service-large-test/v1/chat/completions"

headers = {
    "Content-Type": "application/json"
}

data = {
    "messages": [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Sixteen chairs are arranged in a row. Eight people each select a chair in which to sit so that no person sits next to two other people. Let $N$ be the number of subsets of the 16 chairs that could be selected. Find the remainder when $N$ is divided by $1000$."}
    ],
    "temperature": 0.2,
    "stream": True
}

try:
    response = requests.post(
        url,
        headers=headers,
        data=json.dumps(data),
        verify=False,
        stream=True
    )


    for line in response.iter_lines():
        try:
            if not line:
                print("empty line")
                continue

            raw_str = line.decode('utf-8', errors='ignore')
            print(raw_str)

        except (InvalidChunkLength, ConnectionError) as e:
            print(f"connect error: {str(e)},skip...", file=sys.stderr)
            continue

        except Exception as e:
            print(f"inner error: {str(e)},skip...", file=sys.stderr)
            continue

    print("closed exit")

except Exception as e:
    print(f"init error: {e}", file=sys.stderr)

finally:

    end_time = time.time()
    duration = end_time - start_time
    print(f"\n: {duration:.2f} s")

Server ingress

piVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
    nginx.ingress.kubernetes.io/proxy-buffering: "off"
  creationTimestamp: "2025-10-29T02:42:44Z"
  generation: 1
  name: service-large-test-ingress
  namespace: xxxxxxx
spec:
  ingressClassName: nginx
  rules:
  - http:
      paths:
      - backend:
          service:
            name: llm-test-app
            port:
              name: http
        path: /service-large-test/
        pathType: ImplementationSpecific

ingress logs please check the attachment

ngx-31.log

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind/bugCategorizes issue or PR as related to a bug.needs-prioritytriage/acceptedIndicates an issue or PR is ready to be actively worked on.

    Type

    No type

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions