Skip to content

Commit

Permalink
Merge pull request #49 from DECENTfoundation/feature/callid-concurrent
Browse files Browse the repository at this point in the history
DSDK-566 use atomic long for callid
  • Loading branch information
lukas1 authored May 6, 2019
2 parents 4a08889 + cc543e8 commit c316212
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 51 deletions.
8 changes: 5 additions & 3 deletions library/src/main/java/ch/decent/sdk/net/ws/RxWebSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicLong

internal sealed class MessageEvent
internal data class Message(val id: Long, val obj: JsonObject) : MessageEvent()
Expand All @@ -50,8 +51,9 @@ internal class RxWebSocket(
.publish()

private var webSocketAsync: AsyncProcessor<WebSocket>? = null
private var callId: Long = 0
get() = field++
private val backingId = AtomicLong(0)
val callId: Long
get() = backingId.getAndIncrement()

internal var timeout = TimeUnit.MINUTES.toSeconds(1)

Expand Down Expand Up @@ -125,7 +127,7 @@ internal class RxWebSocket(
private fun clearConnection() {
webSocketAsync = null
disposable.clear()
callId = 0
backingId.set(0)
}

internal fun webSocket(): Single<WebSocket> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,34 @@ import ch.decent.sdk.TimeOutTest
import ch.decent.sdk.net.model.request.GetChainId
import ch.decent.sdk.net.model.request.Login
import ch.decent.sdk.net.ws.model.WebSocketClosedException
import ch.decent.sdk.print
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import org.amshove.kluent.`should be equal to`
import org.junit.After
import org.junit.Before
import org.junit.Test
import org.slf4j.LoggerFactory

class ConnectionTest : TimeOutTest() {
class RxWebSocketTest : TimeOutTest() {

private lateinit var socket: RxWebSocket
private lateinit var mockWebServer: CustomWebSocketService

@Before fun init() {
mockWebServer = CustomWebSocketService().apply { start(port) }
val logger = LoggerFactory.getLogger("RxWebSocket")
socket = RxWebSocket(
Helpers.client(logger),
mockWebServer.getUrl(),
// url,
Helpers.wsUrl,
logger = logger,
gson = DCoreSdk.gsonBuilder.create()
)
}

@After fun close() {
mockWebServer.shutdown()
}

@Test fun `should connect, disconnect and connect`() {
mockWebServer
.enqueue("keep alive", "")

var websocket = socket.webSocket().test()

websocket.awaitTerminalEvent()
Expand All @@ -56,18 +54,14 @@ class ConnectionTest : TimeOutTest() {
}

@Test fun `should connect, disconnect, fail request and reconnect with success`() {
mockWebServer
.enqueue("keep alive", "")
.enqueue("""{"method":"call","params":[1,"login",["",""]],"id":0}""", """{"id":0,"result":true}""")

val websocket = socket.webSocket().test()

websocket.awaitTerminalEvent()
websocket.assertComplete()
.assertValueCount(1)

val fail = socket.request(GetChainId).test()
socket.disconnect()
val fail = socket.request(GetChainId).test()

fail.awaitTerminalEvent()
fail.assertError(WebSocketClosedException::class.java)
Expand All @@ -81,53 +75,28 @@ class ConnectionTest : TimeOutTest() {
}

@Test fun `should connect, disconnect, fail request and retry with success`() {
mockWebServer
.enqueue("keep alive", "")
.enqueue("""{"method":"call","params":[0,"get_chain_id",[]],"id":0}""", """{"id":0,"result":"17401602b201b3c45a3ad98afc6fb458f91f519bd30d1058adf6f2bed66376bc"}""")

val websocket = socket.webSocket().test()

websocket.awaitTerminalEvent()
websocket.assertComplete()
.assertValueCount(1)

val test = socket.request(GetChainId).retry(1).test()
socket.disconnect()
val test = socket.request(GetChainId).retry(1).test()

test.awaitTerminalEvent()
test.assertComplete()
.assertNoErrors()
}

@Test fun `should fail, disconnect, connect and make request`() {
mockWebServer
.enqueue("keep alive", "")
.enqueue("""{"method":"call","params":[1,"login",["",""]],"id":0}""", """{"id":0,"result":true}""")

val websocket = socket.webSocket().test()

websocket.awaitTerminalEvent()
websocket.assertComplete()
.assertValueCount(1)
@Test fun `should get unique call id`() {
val len = 9999
val test = Single.merge((1..len).map { Single.just(it).subscribeOn(Schedulers.newThread()).map { socket.callId } })
.toList()
.map { it.toSet().size }
.test()

websocket.values().single().send("fail")
val fail = socket.events.test()
fail.awaitTerminalEvent()

mockWebServer.shutdown()
mockWebServer = CustomWebSocketService().apply { start(port) }
.enqueue("keep alive", "")
.enqueue("""{"method":"call","params":[1,"login",["",""]],"id":0}""", """{"id":0,"result":true}""")

val login = socket.request(Login).test()

login.awaitTerminalEvent()
login.assertComplete()
.assertNoErrors()
.assertValue(true)
}

companion object {
private const val port = 1111
test.awaitTerminalEvent()
test.values().single() `should be equal to` len
}
}

0 comments on commit c316212

Please sign in to comment.