Skip to content

Commit 76a634e

Browse files
[BugFix] Fix read listener not support ssl (backport #65291) (#65357)
Signed-off-by: stdpain <[email protected]> Co-authored-by: stdpain <[email protected]>
1 parent 833283b commit 76a634e

File tree

5 files changed

+130
-3
lines changed

5 files changed

+130
-3
lines changed

fe/fe-core/src/main/java/com/starrocks/mysql/MysqlChannel.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.starrocks.common.util.NetUtils;
3838
import com.starrocks.mysql.nio.MySQLReadListener;
3939
import com.starrocks.mysql.ssl.SSLChannel;
40+
import com.starrocks.mysql.ssl.SSLDecoder;
4041
import com.starrocks.qe.ConnectContext;
4142
import com.starrocks.qe.ConnectProcessor;
4243
import org.apache.logging.log4j.LogManager;
@@ -143,6 +144,13 @@ public void setSSLChannel(SSLChannel sslChannel) {
143144
this.sslChannel = sslChannel;
144145
}
145146

147+
public SSLDecoder getSSLDecoder() {
148+
if (this.sslChannel == null) {
149+
return null;
150+
}
151+
return this.sslChannel.createDecoder();
152+
}
153+
146154
protected int readAll(ByteBuffer dstBuf) throws IOException {
147155
if (sslChannel != null) {
148156
return sslChannel.readAll(dstBuf);

fe/fe-core/src/main/java/com/starrocks/mysql/nio/MySQLReadListener.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.starrocks.common.Config;
1818
import com.starrocks.mysql.MysqlPackageDecoder;
1919
import com.starrocks.mysql.RequestPackage;
20+
import com.starrocks.mysql.ssl.SSLDecoder;
2021
import com.starrocks.qe.ConnectContext;
2122
import com.starrocks.qe.ConnectProcessor;
2223
import com.starrocks.rpc.RpcException;
@@ -31,14 +32,16 @@ public class MySQLReadListener implements ChannelListener<ConduitStreamSourceCha
3132
private static final Logger LOG = LogManager.getLogger(MySQLReadListener.class);
3233
private final ConnectContext ctx;
3334
private final ConnectProcessor connectProcessor;
34-
private final MysqlPackageDecoder decoder = new MysqlPackageDecoder();
35+
private final MysqlPackageDecoder packageDecoder = new MysqlPackageDecoder();
3536

3637
protected static final int DEFAULT_BUFFER_SIZE = 16 * 1024;
3738
private final ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
39+
private final SSLDecoder sslDecoder;
3840

3941
public MySQLReadListener(ConnectContext connectContext, ConnectProcessor connectProcessor) {
4042
this.ctx = connectContext;
4143
this.connectProcessor = connectProcessor;
44+
this.sslDecoder = this.ctx.getMysqlChannel().getSSLDecoder();
4245
}
4346

4447
@Override
@@ -66,11 +69,18 @@ public void handleEvent(ConduitStreamSourceChannel channel) {
6669
}
6770

6871
readBuffer.flip();
69-
decoder.consume(readBuffer);
72+
73+
if (sslDecoder != null) {
74+
sslDecoder.feed(readBuffer);
75+
packageDecoder.consume(sslDecoder.decode());
76+
} else {
77+
packageDecoder.consume(readBuffer);
78+
}
79+
7080
readBuffer.compact();
7181

7282
RequestPackage pkg;
73-
while ((pkg = decoder.poll()) != null) {
83+
while ((pkg = packageDecoder.poll()) != null) {
7484
final RequestPackage req = pkg;
7585
channel.getWorker().execute(() -> {
7686
handleRequest(req);

fe/fe-core/src/main/java/com/starrocks/mysql/ssl/SSLChannel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ public interface SSLChannel {
2424
int readAll(ByteBuffer buffer) throws IOException;
2525

2626
void write(ByteBuffer buffer) throws IOException;
27+
28+
SSLDecoder createDecoder();
2729
}

fe/fe-core/src/main/java/com/starrocks/mysql/ssl/SSLChannelImp.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,4 +433,8 @@ protected ByteBuffer handleBufferOverflow(SSLEngine engine, ByteBuffer buffer) {
433433
replaceBuffer.put(buffer);
434434
return replaceBuffer;
435435
}
436+
437+
public SSLDecoder createDecoder() {
438+
return new SSLDecoder(sslEngine, peerNetData, peerAppData);
439+
}
436440
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2021-present StarRocks, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.starrocks.mysql.ssl;
16+
17+
import org.apache.logging.log4j.LogManager;
18+
import org.apache.logging.log4j.Logger;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
import javax.net.ssl.SSLEngine;
23+
import javax.net.ssl.SSLEngineResult;
24+
25+
public class SSLDecoder {
26+
private static final Logger LOG = LogManager.getLogger(SSLDecoder.class);
27+
28+
private final SSLEngine sslEngine;
29+
30+
// Encrypted data buffer (network)
31+
private ByteBuffer netBuffer;
32+
// Decrypted plaintext buffer
33+
private ByteBuffer appBuffer;
34+
35+
public SSLDecoder(SSLEngine sslEngine, ByteBuffer netBuffer, ByteBuffer appBuffer) {
36+
this.sslEngine = sslEngine;
37+
this.netBuffer = netBuffer;
38+
this.appBuffer = appBuffer;
39+
this.netBuffer.compact();
40+
}
41+
42+
public void feed(ByteBuffer encryptedInput) throws IOException {
43+
// Check if netBuffer has enough remaining capacity
44+
if (encryptedInput.remaining() > netBuffer.remaining()) {
45+
// Expand netBuffer if needed
46+
ByteBuffer newBuf = ByteBuffer.allocate(
47+
Math.max(netBuffer.capacity() * 2, netBuffer.position() + encryptedInput.remaining()));
48+
netBuffer.flip();
49+
newBuf.put(netBuffer);
50+
netBuffer = newBuf;
51+
}
52+
netBuffer.put(encryptedInput);
53+
}
54+
55+
public ByteBuffer decode() throws IOException {
56+
if (!appBuffer.hasRemaining()) {
57+
appBuffer.clear();
58+
}
59+
netBuffer.flip();
60+
61+
while (netBuffer.hasRemaining()) {
62+
SSLEngineResult result = sslEngine.unwrap(netBuffer, appBuffer);
63+
switch (result.getStatus()) {
64+
case OK:
65+
// No progress (no bytes consumed or produced), break the loop to avoid busy-wait
66+
if (result.bytesConsumed() == 0 && result.bytesProduced() == 0) {
67+
netBuffer.compact();
68+
appBuffer.flip();
69+
return appBuffer;
70+
}
71+
break;
72+
case BUFFER_OVERFLOW:
73+
// appBuffer is too small, expand it and retry
74+
expandAppBuffer();
75+
continue;
76+
case BUFFER_UNDERFLOW:
77+
// Not enough encrypted data, preserve remaining data in netBuffer
78+
// and exit. More data will be fed in the next read event.
79+
netBuffer.compact();
80+
appBuffer.flip();
81+
return appBuffer;
82+
case CLOSED:
83+
throw new IOException("SSL engine closed");
84+
default:
85+
throw new IllegalStateException("Unexpected SSL status: " + result.getStatus());
86+
}
87+
}
88+
89+
// All encrypted data consumed, flip appBuffer for reading
90+
netBuffer.compact();
91+
appBuffer.flip();
92+
return appBuffer;
93+
}
94+
95+
private void expandAppBuffer() {
96+
int newCapacity = appBuffer.capacity() * 2;
97+
ByteBuffer newBuf = ByteBuffer.allocateDirect(newCapacity);
98+
appBuffer.flip();
99+
newBuf.put(appBuffer);
100+
appBuffer = newBuf;
101+
}
102+
103+
}

0 commit comments

Comments
 (0)