Skip to content

Commit

Permalink
dispose web socket on also on error
Browse files Browse the repository at this point in the history
  • Loading branch information
vanderian committed Nov 29, 2018
1 parent 1140468 commit a746872
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
13 changes: 8 additions & 5 deletions library/src/main/java/ch/decent/sdk/net/ws/RxWebSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import ch.decent.sdk.net.ws.model.OnMessageText
import ch.decent.sdk.net.ws.model.OnOpen
import ch.decent.sdk.net.ws.model.WebSocketClosedException
import ch.decent.sdk.net.ws.model.WebSocketEvent
import com.google.gson.*
import com.google.gson.Gson
import com.google.gson.JsonNull
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import com.google.gson.annotations.SerializedName
import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.Maybe
import io.reactivex.Single
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.processors.AsyncProcessor
Expand All @@ -25,10 +27,8 @@ import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.WebSocket
import org.slf4j.Logger
import org.threeten.bp.LocalDateTime
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import javax.xml.stream.events.EndElement

internal class RxWebSocket(
private val client: OkHttpClient,
Expand Down Expand Up @@ -106,12 +106,15 @@ internal class RxWebSocket(
private fun connect() {
disposable.addAll(
events.log("RxWebSocket")
.doOnComplete {
.doOnTerminate {
webSocketAsync = null
disposable.clear()
apiId.clear()
callId = 0
}.subscribe(),
events.ofType(OnOpen::class.java).firstOrError()
.doOnSuccess { webSocketAsync!!.onNext(it.webSocket); webSocketAsync!!.onComplete() }
.ignoreElement().onErrorComplete()
.subscribe()
)
events.connect { it.addTo(disposable) }
Expand Down
34 changes: 33 additions & 1 deletion library/src/test/java/ch/decent/sdk/net/ws/ConnectionTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ConnectionTest : TimeOutTest() {
private lateinit var mockWebServer: CustomWebSocketService

@Before fun init() {
mockWebServer = CustomWebSocketService().apply { start() }
mockWebServer = CustomWebSocketService().apply { start(port) }
val logger = LoggerFactory.getLogger("RxWebSocket")
socket = RxWebSocket(
client(logger),
Expand Down Expand Up @@ -95,4 +95,36 @@ class ConnectionTest : TimeOutTest() {
test.assertComplete()
.assertNoErrors()
}

@Test fun `should fail, disconnect, connect and make request`() {
mockWebServer
.enqueue("keep alive", "")
.enqueue("""{"method":"call","params":[1,"login",["",""]],"id":3}""", """{"id":3,"result":true}""")

val websocket = socket.webSocket().test()

websocket.awaitTerminalEvent()
websocket.assertComplete()
.assertValueCount(1)

websocket.values().single().send("fail")
val fail = socket.events.test()
fail.awaitTerminalEvent()

mockWebServer.shutdown()
mockWebServer = CustomWebSocketService().apply { start(port) }
.enqueue("keep alive", "")
.enqueue("""{"method":"call","params":[1,"login",["",""]],"id":0}""", """{"id":0,"result":true}""")

val login = socket.request(Login).test()

login.awaitTerminalEvent()
login.assertComplete()
.assertNoErrors()
.assertValue(true)
}

companion object {
private const val port = 1111
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.decent.sdk.net.ws

import ch.decent.sdk.net.ws.model.WebSocketClosedException
import com.google.gson.Gson
import okhttp3.Response
import okhttp3.WebSocket
Expand All @@ -8,6 +9,7 @@ import okhttp3.mockwebserver.MockResponse
import okhttp3.mockwebserver.MockWebServer
import okio.ByteString
import org.slf4j.LoggerFactory
import java.lang.RuntimeException

class CustomWebSocketService {

Expand All @@ -21,8 +23,8 @@ class CustomWebSocketService {
mockWebServer.enqueue(response)
}

fun start() {
mockWebServer.start()
fun start(port: Int = 0) {
mockWebServer.start(port)
}

fun shutdown() {
Expand Down Expand Up @@ -59,6 +61,7 @@ class CustomWebSocketService {

override fun onMessage(webSocket: WebSocket?, text: String?) {
logger.info("message")
if (text == "fail") throw RuntimeException("fail")
responses[text!!]?.let {
webSocket?.send(it)
responses.remove(text)
Expand Down

0 comments on commit a746872

Please sign in to comment.