Skip to content

SSE connection may leak when fetch init.redirect = 'error' #4627

@liheng314

Description

@liheng314

Bug Description

Fetch abort may not take effect when fetch init.redirect = 'error' and this causes SSE connection leak

Reproducible By

SSE Server:

const express = require('express');
const app = express();
const PORT = 3000;

class SSEServer {
  constructor() {
    this.clients = new Set();
  }

  // 添加客户端
  addClient(res) {
    this.clients.add(res);
    
    // 设置 SSE 头
    res.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
      'Access-Control-Allow-Origin': '*'
    });

    // 发送欢迎消息
    this.sendToClient(res, 'connected', { message: 'Welcome to SSE server' });

    // 客户端断开时清理
    res.on('close', () => {
      this.clients.delete(res);
      console.log('Client disconnected. Total clients:', this.clients.size);
    });
  }

  // 向单个客户端发送消息
  sendToClient(res, event, data) {
    res.write(`event: ${event}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  }

  // 向所有客户端广播消息
  broadcast(event, data) {
    this.clients.forEach(client => {
      this.sendToClient(client, event, data);
    });
  }

  // 获取客户端数量
  getClientCount() {
    return this.clients.size;
  }
}

// 创建 SSE 服务器实例
const sseServer = new SSEServer();

// 中间件
app.use(express.json());
app.use(express.static('public'));

// SSE 端点
app.get('/sse', (req, res) => {
  sseServer.addClient(res);
  console.log('New client connected. Total clients:', sseServer.getClientCount());
});

// 手动触发事件端点(可选)
app.post('/send-message', (req, res) => {
  const { message } = req.body;
  sseServer.broadcast('custom-message', {
    message: message || 'Manual broadcast',
    timestamp: new Date().toISOString()
  });
  res.json({ success: true, clients: sseServer.getClientCount() });
});

// 启动定时广播
let eventCount = 1;
setInterval(() => {
  const eventData = {
    id: eventCount,
    timestamp: new Date().toISOString(),
    message: `Automatic event #${eventCount}`,
    random: Math.random().toFixed(4),
    clients: sseServer.getClientCount()
  };
  
  sseServer.broadcast('message', eventData);
  console.log(`Broadcasted event ${eventCount} to ${sseServer.getClientCount()} clients`);
  eventCount++;
}, 2000);

app.listen(PORT, () => {
  console.log(`Advanced SSE server running at http://localhost:${PORT}`);
});

test

import { EventSource } from 'eventsource'
import { fetch } from 'undici'
import { randomUUID } from 'crypto';
setInterval(() => {
    const es = new EventSource('http://127.0.0.1:3000/sse',
        {
            fetch: (url, init) => {
                init.redirect = 'error';
                return fetch(url, init);
            },
        }
    )
    es._id = randomUUID()
    es.onmessage = (event) => {
        if (es.readyState === 2) {
            console.log(es._id, es.readyState, 'receive data after closed!!!!!!!!!!!!!!!')
        }
    };

    setTimeout(() => {
        es.close();
    }, 5000);
}, 1000);

Expected Behavior

SSE connection should be closed properly after fetch abort

Logs & Screenshots

Image

I edit the source code and add code: console.log('requestFinalizer execute'), so the reason may be that the abortController ac was garbage collected too early.
Image

why use a copy or clone when redirect is not error?
Image

Environment

Node v20.19.2
undici 7.16.0

Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions