diff --git a/library/src/main/java/ch/decent/sdk/net/ws/RxWebSocket.kt b/library/src/main/java/ch/decent/sdk/net/ws/RxWebSocket.kt index 43dfc2da..1b2839fd 100644 --- a/library/src/main/java/ch/decent/sdk/net/ws/RxWebSocket.kt +++ b/library/src/main/java/ch/decent/sdk/net/ws/RxWebSocket.kt @@ -12,11 +12,13 @@ import ch.decent.sdk.net.ws.model.OnMessageText import ch.decent.sdk.net.ws.model.OnOpen import ch.decent.sdk.net.ws.model.WebSocketClosedException import ch.decent.sdk.net.ws.model.WebSocketEvent -import com.google.gson.* +import com.google.gson.Gson +import com.google.gson.JsonNull +import com.google.gson.JsonObject +import com.google.gson.JsonParser import com.google.gson.annotations.SerializedName import io.reactivex.BackpressureStrategy import io.reactivex.Flowable -import io.reactivex.Maybe import io.reactivex.Single import io.reactivex.disposables.CompositeDisposable import io.reactivex.processors.AsyncProcessor @@ -25,10 +27,8 @@ import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.WebSocket import org.slf4j.Logger -import org.threeten.bp.LocalDateTime import java.lang.reflect.ParameterizedType import java.lang.reflect.Type -import javax.xml.stream.events.EndElement internal class RxWebSocket( private val client: OkHttpClient, @@ -106,12 +106,15 @@ internal class RxWebSocket( private fun connect() { disposable.addAll( events.log("RxWebSocket") - .doOnComplete { + .doOnTerminate { webSocketAsync = null disposable.clear() + apiId.clear() + callId = 0 }.subscribe(), events.ofType(OnOpen::class.java).firstOrError() .doOnSuccess { webSocketAsync!!.onNext(it.webSocket); webSocketAsync!!.onComplete() } + .ignoreElement().onErrorComplete() .subscribe() ) events.connect { it.addTo(disposable) } diff --git a/library/src/test/java/ch/decent/sdk/net/ws/ConnectionTest.kt b/library/src/test/java/ch/decent/sdk/net/ws/ConnectionTest.kt index 799bd6aa..8561274a 100644 --- a/library/src/test/java/ch/decent/sdk/net/ws/ConnectionTest.kt +++ b/library/src/test/java/ch/decent/sdk/net/ws/ConnectionTest.kt @@ -16,7 +16,7 @@ class ConnectionTest : TimeOutTest() { private lateinit var mockWebServer: CustomWebSocketService @Before fun init() { - mockWebServer = CustomWebSocketService().apply { start() } + mockWebServer = CustomWebSocketService().apply { start(port) } val logger = LoggerFactory.getLogger("RxWebSocket") socket = RxWebSocket( client(logger), @@ -95,4 +95,36 @@ class ConnectionTest : TimeOutTest() { test.assertComplete() .assertNoErrors() } + + @Test fun `should fail, disconnect, connect and make request`() { + mockWebServer + .enqueue("keep alive", "") + .enqueue("""{"method":"call","params":[1,"login",["",""]],"id":3}""", """{"id":3,"result":true}""") + + val websocket = socket.webSocket().test() + + websocket.awaitTerminalEvent() + websocket.assertComplete() + .assertValueCount(1) + + websocket.values().single().send("fail") + val fail = socket.events.test() + fail.awaitTerminalEvent() + + mockWebServer.shutdown() + mockWebServer = CustomWebSocketService().apply { start(port) } + .enqueue("keep alive", "") + .enqueue("""{"method":"call","params":[1,"login",["",""]],"id":0}""", """{"id":0,"result":true}""") + + val login = socket.request(Login).test() + + login.awaitTerminalEvent() + login.assertComplete() + .assertNoErrors() + .assertValue(true) + } + + companion object { + private const val port = 1111 + } } \ No newline at end of file diff --git a/library/src/test/java/ch/decent/sdk/net/ws/CustomWebSocketService.kt b/library/src/test/java/ch/decent/sdk/net/ws/CustomWebSocketService.kt index 52cf5311..be7ce378 100644 --- a/library/src/test/java/ch/decent/sdk/net/ws/CustomWebSocketService.kt +++ b/library/src/test/java/ch/decent/sdk/net/ws/CustomWebSocketService.kt @@ -1,5 +1,6 @@ package ch.decent.sdk.net.ws +import ch.decent.sdk.net.ws.model.WebSocketClosedException import com.google.gson.Gson import okhttp3.Response import okhttp3.WebSocket @@ -8,6 +9,7 @@ import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer import okio.ByteString import org.slf4j.LoggerFactory +import java.lang.RuntimeException class CustomWebSocketService { @@ -21,8 +23,8 @@ class CustomWebSocketService { mockWebServer.enqueue(response) } - fun start() { - mockWebServer.start() + fun start(port: Int = 0) { + mockWebServer.start(port) } fun shutdown() { @@ -59,6 +61,7 @@ class CustomWebSocketService { override fun onMessage(webSocket: WebSocket?, text: String?) { logger.info("message") + if (text == "fail") throw RuntimeException("fail") responses[text!!]?.let { webSocket?.send(it) responses.remove(text)