Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.cloudbeaver.server.servlets.CBImageServlet;
import io.cloudbeaver.server.servlets.CBStaticServlet;
import io.cloudbeaver.server.servlets.WebStatusServlet;
import io.cloudbeaver.server.websockets.CBEventsLongPollingServlet;
import io.cloudbeaver.server.websockets.CBEventsWebSocket;
import io.cloudbeaver.server.websockets.CBWebSocketServerConfigurator;
import io.cloudbeaver.service.DBWServiceBindingServlet;
Expand All @@ -52,6 +53,7 @@
public class CBJettyServer {

private static final Log log = Log.getLog(CBJettyServer.class);

static {
// Set Jetty log level to WARN
System.setProperty("org.eclipse.jetty.util.log.class", "org.eclipse.jetty.util.log.StdErrLog");
Expand Down Expand Up @@ -118,6 +120,9 @@ public void runServer() {

servletContextHandler.addServlet(new ServletHolder("status", new WebStatusServlet()), "/status");

ServletHolder eventsServletHolder = new ServletHolder("events", new CBEventsLongPollingServlet());
servletContextHandler.addServlet(eventsServletHolder, serverConfiguration.getServicesURI() + "events/*");

GraphQLEndpoint endpoint = new GraphQLEndpoint(new ServerConfigurationTimeLimitFilter(application));
application.addApplicationContextValue(GraphQL.class.getName(), endpoint.getGraphQL());
String gqlServletPath = serverConfiguration.getServicesURI() + "gql/*";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2024 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudbeaver.server.websockets;

import io.cloudbeaver.DBWebException;
import io.cloudbeaver.model.session.BaseWebSession;
import io.cloudbeaver.model.session.WebSession;
import org.jkiss.code.NotNull;
import org.jkiss.code.Nullable;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.websocket.WSUtils;
import org.jkiss.dbeaver.model.websocket.event.WSClientEvent;
import org.jkiss.dbeaver.model.websocket.event.client.*;
import org.jkiss.utils.CommonUtils;

public class CBClientEventProcessor {

private static final Log log = Log.getLog(CBClientEventProcessor.class);

final BaseWebSession webSession;

public CBClientEventProcessor(@NotNull BaseWebSession webSession) {
this.webSession = webSession;
}

public void process(@Nullable String message) {
if (CommonUtils.isEmpty(message)) {
return;
}
WSClientEvent clientEvent;
try {
clientEvent = WSUtils.clientGson.fromJson(message, WSClientEvent.class);
} catch (Exception e) {
log.error("Error parsing event: " + e.getMessage(), e);
webSession.addSessionError(new DBWebException("Invalid event: " + e.getMessage()));
return;
}

switch (clientEvent.getId()) {
case WSSubscribeOnTopicClientEvent.ID: {
webSession.getEventsFilter().subscribeOnEventTopic(clientEvent.getTopicId());
break;
}
case WSUnsubscribeFromTopicClientEvent.ID: {
webSession.getEventsFilter().unsubscribeFromEventTopic(clientEvent.getTopicId());
break;
}
case WSUpdateActiveProjectsClientEvent.ID: {
var projectEvent = (WSUpdateActiveProjectsClientEvent) clientEvent;
webSession.getEventsFilter().setSubscribedProjects(projectEvent.getProjectIds());
break;
}
case WSSessionPingClientEvent.ID: {
if (webSession instanceof WebSession session) {
session.updateInfo(true);
}
break;
}
case WSSessionTaskConfirmationEvent.ID: {
if (webSession instanceof WebSession session) {
var taskConfirmationEvent = (WSSessionTaskConfirmationEvent) clientEvent;
session.handleTaskConfirmation(
taskConfirmationEvent.getTaskId(),
taskConfirmationEvent.isConfirmed(),
taskConfirmationEvent.isSkipConfirmations()
);
}
break;
}
default:
var e = new DBWebException("Unknown client event: " + clientEvent.getId());
log.error(e.getMessage(), e);
webSession.addSessionError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2024 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudbeaver.server.websockets;

import io.cloudbeaver.model.session.BaseWebSession;
import io.cloudbeaver.websocket.CBWebSessionEventHandler;
import org.jkiss.code.NotNull;
import org.jkiss.code.Nullable;
import org.jkiss.dbeaver.Log;
import org.jkiss.dbeaver.model.websocket.event.WSEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class CBEventsLongPolling implements CBWebSessionEventHandler {

private static final Log log = Log.getLog(CBEventsLongPolling.class);

private static final int QUEUE_CAPACITY = 1000;

private final BaseWebSession webSession;
private final BlockingQueue<WSEvent> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
private final CBClientEventProcessor processor;
private volatile long lastPoll;

public CBEventsLongPolling(@NotNull BaseWebSession webSession) {
this.webSession = webSession;
this.lastPoll = System.currentTimeMillis();
this.webSession.addEventHandler(this);
this.processor = new CBClientEventProcessor(this.webSession);
}

@NotNull
public BaseWebSession webSession() {
return webSession;
}

public long lastPoll() {
return lastPoll;
}

public void touch() {
lastPoll = System.currentTimeMillis();
}

@NotNull
public List<WSEvent> pollEvents(long timeoutSec) throws InterruptedException {
WSEvent first = queue.poll(timeoutSec, TimeUnit.SECONDS);
if (first == null) {
return List.of();
}

List<WSEvent> result = new ArrayList<>();
result.add(first);
queue.drainTo(result);
return result;
}

@Override
public void handleWebSessionEvent(@NotNull WSEvent event) {
if (!queue.offer(event)) {
log.warn("Event queue overflow: sid=" + webSession.getSessionId() +
", eventId=" + event.getId());
}
}

public void onMessage(@Nullable String message) {
processor.process(message);
}

@Override
public void close() {
webSession.removeEventHandler(this);
queue.clear();
}

@Override
public String toString() {
return "CBEventsLongPolling{" +
"sid=" + webSession.getSessionId() +
", size=" + queue.size() +
", lastPoll=" + lastPoll +
'}';
}

}
Loading
Loading