From 69322e5cf47090dc6d235b2b12191cff252cc003 Mon Sep 17 00:00:00 2001 From: wayerr Date: Mon, 20 Mar 2017 22:18:38 +0300 Subject: [PATCH] partially implement tty proxy for #23 --- .../configuration/WebSocketConfiguration.java | 28 +++++- .../dm/cluman/ui/tty/TtyProxy.java | 96 ++++++++++++++++++ .../dm/cluman/ui/tty/WsTtyHandler.java | 98 +++++++++++++++++++ 3 files changed, 219 insertions(+), 3 deletions(-) create mode 100644 cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/tty/TtyProxy.java create mode 100644 cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/tty/WsTtyHandler.java diff --git a/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/configuration/WebSocketConfiguration.java b/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/configuration/WebSocketConfiguration.java index b28f1c27..555952b2 100644 --- a/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/configuration/WebSocketConfiguration.java +++ b/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/configuration/WebSocketConfiguration.java @@ -17,7 +17,9 @@ package com.codeabovelab.dm.cluman.ui.configuration; import com.codeabovelab.dm.cluman.security.TempAuth; +import com.codeabovelab.dm.cluman.ui.tty.WsTtyHandler; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -28,9 +30,9 @@ import org.springframework.messaging.support.ExecutorChannelInterceptor; import org.springframework.security.core.Authentication; import org.springframework.stereotype.Component; -import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer; -import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; -import org.springframework.web.socket.config.annotation.StompEndpointRegistry; +import org.springframework.web.socket.client.WebSocketClient; +import org.springframework.web.socket.client.standard.StandardWebSocketClient; +import org.springframework.web.socket.config.annotation.*; import java.security.Principal; @@ -123,4 +125,24 @@ public void afterReceiveCompletion(Message message, MessageChannel channel, E } } + @Configuration + static class PreWsConfig { + @Bean + WebSocketClient webSocketClient() { + return new StandardWebSocketClient(); + } + } + + @EnableWebSocket + @Configuration + static class WsConfig implements WebSocketConfigurer { + + @Autowired + private WsTtyHandler wsTtyHandler; + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(wsTtyHandler, "/ui/tty"); + } + } } \ No newline at end of file diff --git a/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/tty/TtyProxy.java b/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/tty/TtyProxy.java new file mode 100644 index 00000000..cdb54625 --- /dev/null +++ b/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/tty/TtyProxy.java @@ -0,0 +1,96 @@ +/* + * Copyright 2017 Code Above Lab LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.codeabovelab.dm.cluman.ui.tty; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +/** + */ +@Slf4j +class TtyProxy implements WebSocketHandler { + + private static final String KEY = TtyProxy.class.getName(); + private final String containerId; + private final WebSocketSession frontend; + private volatile WebSocketSession backend; + + + TtyProxy(String containerId, WebSocketSession frontend) { + this.containerId = containerId; + this.frontend = frontend; + } + + static void set(WebSocketSession session, TtyProxy ttyProxy) { + session.getAttributes().put(KEY, ttyProxy); + } + + static TtyProxy get(WebSocketSession session) { + return (TtyProxy) session.getAttributes().get(KEY); + } + + static void close(WebSocketSession session) throws Exception { + TtyProxy tty = (TtyProxy) session.getAttributes().get(KEY); + tty.close(); + } + + private synchronized void close() throws Exception { + frontend.getAttributes().remove(KEY, this); + WebSocketSession localBackend = this.backend; + if(localBackend != null) { + localBackend.close(); + } + } + + synchronized void toBackend(WebSocketMessage message) throws Exception { + WebSocketSession localBackend = this.backend; + if(localBackend != null) { + localBackend.sendMessage(message); + } + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + synchronized (this) { + this.backend = backend; + } + log.info("Success connect to backed with sessions: front={}, back={}", frontend, backend); + } + + @Override + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + frontend.sendMessage(message); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + log.error("Backend transport error:", exception); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + close(); + } + + @Override + public boolean supportsPartialMessages() { + return false; + } +} diff --git a/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/tty/WsTtyHandler.java b/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/tty/WsTtyHandler.java new file mode 100644 index 00000000..84cb1b85 --- /dev/null +++ b/cluster-manager/src/main/java/com/codeabovelab/dm/cluman/ui/tty/WsTtyHandler.java @@ -0,0 +1,98 @@ +/* + * Copyright 2017 Code Above Lab LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.codeabovelab.dm.cluman.ui.tty; + +import com.codeabovelab.dm.cluman.ds.container.ContainerRegistration; +import com.codeabovelab.dm.cluman.ds.container.ContainerStorage; +import com.codeabovelab.dm.cluman.ds.nodes.NodeRegistration; +import com.codeabovelab.dm.cluman.ds.nodes.NodeStorage; +import com.codeabovelab.dm.cluman.validate.ExtendedAssert; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.MultiValueMap; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.client.WebSocketClient; +import org.springframework.web.util.UriComponents; +import org.springframework.web.util.UriComponentsBuilder; + +/** + */ +@Slf4j +@Component +public class WsTtyHandler implements WebSocketHandler { + + @Autowired + private ContainerStorage containerStorage; + + @Autowired + private NodeStorage nodeStorage; + + @Autowired + private WebSocketClient webSocketClient; + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + UriComponents uc = UriComponentsBuilder.fromUri(session.getUri()).build(); + MultiValueMap params = uc.getQueryParams(); + + String containerId = params.getFirst("container"); + ContainerRegistration containerReg = containerStorage.getContainer(containerId); + ExtendedAssert.notFound(containerReg, "Can not find container: " + containerId); + NodeRegistration nodeReg = nodeStorage.getNodeRegistration(containerReg.getNode()); + TtyProxy tty = new TtyProxy(containerReg.getId(), session); + TtyProxy.set(session, tty); + ListenableFuture future = webSocketClient.doHandshake(tty, getContainerUri(containerReg.getId(), nodeReg)); + future.addCallback((r) -> {}, (e) -> { + log.error("failure to open backend connection to '{}' of cluster '{}' due to error: ", containerId, nodeReg.getCluster(), e); + }); + } + + private String getContainerUri(String containerId, NodeRegistration nr) { + String addr = nr.getNodeInfo().getAddress(); + return "ws://" + addr + ":2375/containers/" + containerId + + "/attach/ws?stream=true&stdin=true&stdout=true&stderr=true"; + } + + @Override + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + TtyProxy tty = TtyProxy.get(session); + if(tty == null) { + return; + } + tty.toBackend(message); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + log.error("Frontend transport error: ", exception); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + TtyProxy.close(session); + } + + @Override + public boolean supportsPartialMessages() { + return false; + } +}