Skip to content

Commit

Permalink
Feature/timeout ws (#14)
Browse files Browse the repository at this point in the history
* add timeout to ws requests, sync check api access calls
  • Loading branch information
vanderian authored Dec 12, 2018
1 parent c4548fc commit f2d99c3
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 14 deletions.
4 changes: 4 additions & 0 deletions library/src/main/java/ch/decent/sdk/DCoreApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ class DCoreApi internal constructor(internal val core: DCoreSdk) {
*/
var transactionExpiration = 30

fun setTimeout(seconds: Long) {
core.setTimeout(seconds)
}

val accountApi = AccountApi(this)
val assetApi = AssetApi(this)
val validationApi = ValidationApi(this)
Expand Down
7 changes: 6 additions & 1 deletion library/src/main/java/ch/decent/sdk/DCoreSdk.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import io.reactivex.functions.BiFunction
import okhttp3.OkHttpClient
import org.slf4j.Logger
import org.threeten.bp.LocalDateTime
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

class DCoreSdk private constructor(
private val client: OkHttpClient,
webSocketUrl: String? = null,
restUrl: String? = null,
private val logger: Logger? = null
) {

private val rxWebSocket: RxWebSocket? = webSocketUrl?.let { RxWebSocket(client, it, gsonBuilder.create(), logger) }
private val rpc: RpcService? = restUrl?.let { RpcService(it, client, gsonBuilder.create()) }
private val chainId = GetChainId.toRequest().cache()
Expand Down Expand Up @@ -58,6 +59,10 @@ class DCoreSdk private constructor(
}
}

internal fun setTimeout(seconds: Long) {
rxWebSocket?.timeout = seconds
}

companion object {
@JvmStatic val gsonBuilder = GsonBuilder()
.disableHtmlEscaping()
Expand Down
4 changes: 2 additions & 2 deletions library/src/main/java/ch/decent/sdk/model/AssetFormatter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ interface AssetFormatter {

fun format(value: BigDecimal) = defaultFormatter.format(value) + " $symbol"

fun format(value: BigDecimal, formatter: NumberFormat.() -> NumberFormat = { this }) = formatter(defaultFormatter).format(value) + " $symbol"
fun format(value: BigDecimal, formatter: NumberFormat.() -> NumberFormat) = formatter(defaultFormatter).format(value) + " $symbol"

/**
* format raw value with asset symbol
Expand All @@ -59,7 +59,7 @@ interface AssetFormatter {

fun format(value: BigInteger) = defaultFormatter.format(fromRaw(value)) + " $symbol"

fun format(value: BigInteger, formatter: NumberFormat.() -> NumberFormat = { this }) = formatter(defaultFormatter).format(fromRaw(value)) + " $symbol"
fun format(value: BigInteger, formatter: NumberFormat.() -> NumberFormat) = formatter(defaultFormatter).format(fromRaw(value)) + " $symbol"

fun amount(value: String): AssetAmount = AssetAmount(toRaw(BigDecimal(value)), id)
fun amount(value: Double): AssetAmount = AssetAmount(toRaw(BigDecimal(value)), id)
Expand Down
32 changes: 21 additions & 11 deletions library/src/main/java/ch/decent/sdk/net/ws/RxWebSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import okhttp3.WebSocket
import org.slf4j.Logger
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

internal class RxWebSocket(
private val client: OkHttpClient,
Expand All @@ -39,17 +41,19 @@ internal class RxWebSocket(
) {

private val disposable = CompositeDisposable()
private val apiId = mutableMapOf<ApiGroup, Int>()
private val apiId = mutableMapOf<ApiGroup, Single<Int>>()
internal val events = Flowable.create<WebSocketEvent>({ emitter ->
val webSocket = client.newWebSocket(request, WebSocketEmitter(emitter))
emitter.setCancellable { webSocket.close(1000, null) }
}, BackpressureStrategy.BUFFER)
.publish()

private var webSocketAsync: AsyncProcessor<WebSocket>? = null
internal var callId: Long = 0
private var callId: Long = 0
get() = field++

internal var timeout = TimeUnit.MINUTES.toSeconds(1)

val connected: Boolean
get() = disposable.size() != 0

Expand Down Expand Up @@ -107,12 +111,7 @@ internal class RxWebSocket(
disposable.addAll(
events.log("RxWebSocket")
.onErrorResumeNext(Flowable.empty())
.doOnComplete {
webSocketAsync = null
disposable.clear()
apiId.clear()
callId = 0
}.subscribe(),
.doOnComplete { clearConnection() }.subscribe(),
events.ofType(OnOpen::class.java).firstOrError()
.doOnSuccess { webSocketAsync!!.onNext(it.webSocket); webSocketAsync!!.onComplete() }
.ignoreElement().onErrorComplete()
Expand All @@ -121,6 +120,13 @@ internal class RxWebSocket(
events.connect { it.addTo(disposable) }
}

private fun clearConnection() {
webSocketAsync = null
disposable.clear()
apiId.clear()
callId = 0
}

internal fun webSocket(): Single<WebSocket> {
if (webSocketAsync == null) {
webSocketAsync = AsyncProcessor.create()
Expand All @@ -131,11 +137,13 @@ internal class RxWebSocket(

private fun <T : WebSocket> Single<T>.checkApiAccess(request: BaseRequest<*>): Single<Pair<T, Int>> =
this.flatMap { ws ->
when {
request.apiGroup !in apiId && request.apiGroup == ApiGroup.LOGIN -> apiId[request.apiGroup] = Login.make(callId).map { 1 }.cache()
request.apiGroup !in apiId -> apiId[request.apiGroup] = RequestApiAccess(request.apiGroup).make(callId).cache()
}
when {
request === Login -> Single.just(ws to 1)
request.apiGroup !in apiId && request.apiGroup == ApiGroup.LOGIN -> Login.make(callId).doOnSuccess { apiId[ApiGroup.LOGIN] = 1 }.map { ws to 1 }
request.apiGroup !in apiId -> RequestApiAccess(request.apiGroup).make(callId).doOnSuccess { apiId[request.apiGroup] = it }.map { ws to it }
else -> Single.just(ws to apiId[request.apiGroup]!!)
else -> apiId[request.apiGroup]!!.map { ws to it }
}
}

Expand All @@ -152,6 +160,8 @@ internal class RxWebSocket(
.doOnNext { (_, obj) -> checkObjectNotFound(obj, this) }
.map { (_, obj) -> parseResultElement(returnClass, obj) }
.map { gson.fromJson<T>(it, returnClass) }
.timeout(timeout, TimeUnit.SECONDS)
.doOnError { if (it is TimeoutException) clearConnection() }

private fun <T> BaseRequest<T>.make(callId: Long): Single<T> =
makeStream(callId)
Expand Down
2 changes: 2 additions & 0 deletions library/src/test/java/ch/decent/sdk/api/BlockApiTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ch.decent.sdk.api

import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import org.junit.Test
import org.junit.runner.RunWith
Expand Down

0 comments on commit f2d99c3

Please sign in to comment.