Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove intrinsics usage in RibCoroutineWorker #627

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,10 @@
package com.uber.rib.core

import com.uber.autodispose.coroutinesinterop.asScopeProvider
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.intercepted
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.resume
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -122,10 +112,18 @@ public fun CoroutineScope.bind(
worker: RibCoroutineWorker,
context: CoroutineContext = RibDispatchers.Default,
): BindWorkerHandle {
val bindJob: CompletableJob // A job that completes once worker's onStart completes
var bindJob: CompletableJob? = null // A job that completes once worker's onStart completes
val unbindJob =
launch(context, { bindJob = createBindingJob() }) { bindAndAwaitCancellation(worker, bindJob) }
return BindWorkerHandleImpl(bindJob, unbindJob)
launch(context, CoroutineStart.UNDISPATCHED) {
bindJob =
createBindingJob().also {
// launch again -- this time, we will dispatch if installed dispatcher
// tell us to (CoroutineDispatcher.isDispatchNeeded()).
launch { bindAndAwaitCancellation(worker, it) }
}
}
// !! is safe here -- outer coroutine was started undispatched.
return BindWorkerHandleImpl(bindJob!!, unbindJob)
}

/** Binds [workers] in a scope that is a child of the [CoroutineScope] receiver. */
Expand All @@ -139,46 +137,6 @@ public fun CoroutineScope.bind(
}
}

/**
* Guarantees to run synchronous [init] block exactly once in an undispatched manner.
*
* **Exceptions thrown in [init] block will be rethrown at call site.**
*/
@OptIn(ExperimentalContracts::class)
private fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
init: CoroutineScope.() -> Unit = {},
block: suspend CoroutineScope.() -> Unit,
): Job {
contract {
callsInPlace(init, InvocationKind.EXACTLY_ONCE)
callsInPlace(block, InvocationKind.AT_MOST_ONCE)
}
var initError: Throwable? = null
val job =
launch(context, CoroutineStart.UNDISPATCHED) {
runCatching(init).onFailure { initError = it }.getOrThrow()
dispatchIfNeeded()
block()
}
initError?.let { throw it }
return job
}

private suspend inline fun dispatchIfNeeded() {
suspendCoroutineUninterceptedOrReturn sc@{ cont ->
val context = cont.context
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
if (!dispatcher.isDispatchNeeded(context)) return@sc Unit
// Coroutine was not in the right context -- we'll dispatch.
context.ensureActive()
cont.intercepted().resume(Unit)
COROUTINE_SUSPENDED
}
// Don't continue if coroutine was cancelled after returning from dispatch.
coroutineContext.ensureActive()
}

private fun CoroutineScope.createBindingJob(): CompletableJob =
Job(coroutineContext.job).also {
// Cancel `unbindJob` if `bindJob` has cancelled. This is important to abort `onStart` if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class RibCoroutineWorkerTest {
worker.doOnStart { error(onStartErrorMsg) }
worker.doOnStop { error(onStopErrorMsg) }
bind(worker).join()
runCurrent()
assertThat(throwable).isInstanceOf(IllegalStateException::class.java)
assertThat(throwable).hasMessageThat().isEqualTo(onStartErrorMsg)
val suppressed = throwable?.suppressed?.firstOrNull()
Expand Down
Loading