Skip to content

Commit 7597171

Browse files
committed
update
1 parent e786b5e commit 7597171

File tree

2 files changed

+18
-28
lines changed

2 files changed

+18
-28
lines changed

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,6 @@ impl HttpQuery {
777777
}
778778

779779
pub async fn start_query(&mut self, sql: String) -> Result<()> {
780-
let span = fastrace::Span::enter_with_local_parent("HttpQuery::start_query");
781780
let (block_sender, query_context) = {
782781
let state = &mut self.executor.lock().state;
783782
let ExecuteState::Starting(state) = state else {
@@ -831,7 +830,9 @@ impl HttpQuery {
831830
block_sender_closer.abort();
832831
}
833832
}
834-
.in_span(span),
833+
.in_span(fastrace::Span::enter_with_local_parent(
834+
"HttpQuery::start_query",
835+
)),
835836
None,
836837
)?;
837838

src/query/service/src/servers/http/v1/query/sized_spsc.rs

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -329,14 +329,7 @@ where S: DataBlockSpill
329329
let page = match tp {
330330
Wait::Async => self.try_take_page().await?,
331331
Wait::Deadline(t) => {
332-
let d = match t.checked_duration_since(Instant::now()) {
333-
Some(d) if !d.is_zero() => d,
334-
_ => {
335-
// timeout() will return Ok if the future completes immediately
336-
return Ok((BlocksSerializer::empty(), self.chan.is_close()));
337-
}
338-
};
339-
match tokio::time::timeout(d, self.chan.recv()).await {
332+
match tokio::time::timeout_at((*t).into(), self.chan.recv()).await {
340333
Ok(true) => self.try_take_page().await?,
341334
Ok(false) => {
342335
info!("[HTTP-QUERY] Reached end of data blocks");
@@ -350,14 +343,11 @@ where S: DataBlockSpill
350343
}
351344
};
352345

353-
let page = match page {
354-
Some(page) => page,
355-
None => BlocksSerializer::empty(),
356-
};
357-
358346
// try to report 'no more data' earlier to client to avoid unnecessary http call
359-
let block_end = self.chan.is_close();
360-
Ok((page, block_end))
347+
Ok((
348+
page.unwrap_or_else(BlocksSerializer::empty),
349+
self.chan.is_close(),
350+
))
361351
}
362352

363353
#[fastrace::trace(name = "SizedChannelReceiver::try_take_page")]
@@ -405,34 +395,33 @@ where S: DataBlockSpill
405395
loop {
406396
let result = self.chan.buffer.lock().unwrap().try_add_block(block);
407397
match result {
398+
Err(SendFail::Closed) => return Ok(false),
408399
Ok(_) => {
409400
self.chan.notify_on_sent.notify_one();
410401
return Ok(true);
411402
}
412-
Err(SendFail::Closed) => {
413-
self.chan.notify_on_sent.notify_one();
414-
return Ok(false);
415-
}
416403
Err(SendFail::Full { page, remain }) => {
404+
self.chan.notify_on_sent.notify_one();
417405
let mut to_add = self.chan.try_spill_page(page).await?;
418406
loop {
419407
let result = self.chan.buffer.lock().unwrap().try_add_page(to_add);
420408
match result {
421-
Ok(_) => break,
422409
Err(SendFail::Closed) => return Ok(false),
410+
Ok(_) => {
411+
self.chan.notify_on_sent.notify_one();
412+
break;
413+
}
423414
Err(SendFail::Full { page, .. }) => {
424415
to_add = Page::Memory(page);
425416
self.chan.notify_on_recv.notified().await;
426417
}
427418
}
428419
}
429420

430-
match remain {
431-
Some(remain) => {
432-
block = remain;
433-
}
434-
None => return Ok(true),
435-
}
421+
let Some(remain) = remain else {
422+
return Ok(true);
423+
};
424+
block = remain;
436425
}
437426
}
438427
}

0 commit comments

Comments
 (0)