Skip to content

Commit

Permalink
Add optional name parameter to .limitedParallelism (#4106)
Browse files Browse the repository at this point in the history
Fixes #4023
  • Loading branch information
qwwdfsad authored Apr 29, 2024
1 parent a9bb5d3 commit dfbd4a8
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 32 deletions.
6 changes: 4 additions & 2 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public synthetic fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher;
public static synthetic fun limitedParallelism$default (Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/String;ILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher;
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V
Expand Down Expand Up @@ -502,7 +504,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher {
public fun <init> ()V
public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher;
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher;
public fun toString ()Ljava/lang/String;
protected final fun toStringInternalImpl ()Ljava/lang/String;
}
Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ abstract class kotlinx.coroutines/CoroutineDispatcher : kotlin.coroutines/Abstra
open fun dispatchYield(kotlin.coroutines/CoroutineContext, kotlinx.coroutines/Runnable) // kotlinx.coroutines/CoroutineDispatcher.dispatchYield|dispatchYield(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}[0]
open fun isDispatchNeeded(kotlin.coroutines/CoroutineContext): kotlin/Boolean // kotlinx.coroutines/CoroutineDispatcher.isDispatchNeeded|isDispatchNeeded(kotlin.coroutines.CoroutineContext){}[0]
open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0]
open fun limitedParallelism(kotlin/Int, kotlin/String? =...): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0]
open fun toString(): kotlin/String // kotlinx.coroutines/CoroutineDispatcher.toString|toString(){}[0]
}
abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/CoroutineDispatcher { // kotlinx.coroutines/MainCoroutineDispatcher|null[0]
abstract val immediate // kotlinx.coroutines/MainCoroutineDispatcher.immediate|{}immediate[0]
abstract fun <get-immediate>(): kotlinx.coroutines/MainCoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.immediate.<get-immediate>|<get-immediate>(){}[0]
constructor <init>() // kotlinx.coroutines/MainCoroutineDispatcher.<init>|<init>(){}[0]
final fun toStringInternalImpl(): kotlin/String? // kotlinx.coroutines/MainCoroutineDispatcher.toStringInternalImpl|toStringInternalImpl(){}[0]
open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0]
open fun limitedParallelism(kotlin/Int, kotlin/String?): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0]
open fun toString(): kotlin/String // kotlinx.coroutines/MainCoroutineDispatcher.toString|toString(){}[0]
}
abstract fun interface <#A: in kotlin/Any?> kotlinx.coroutines.flow/FlowCollector { // kotlinx.coroutines.flow/FlowCollector|null[0]
Expand Down
21 changes: 15 additions & 6 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ public abstract class CoroutineDispatcher :
* // Background dispatcher for the application
* val dispatcher = newFixedThreadPoolContext(4, "App Background")
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2)
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2, "Image processor")
* // At most 3 threads will be processing JSON to avoid image processing starvation
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3)
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3, "Json processor")
* // At most 1 thread will be doing IO
* val fileWriterDispatcher = dispatcher.limitedParallelism(1)
* val fileWriterDispatcher = dispatcher.limitedParallelism(1, "File writer")
* ```
* Note how in this example the application has an executor with 4 threads, but the total sum of all limits
* is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism,
Expand All @@ -105,7 +105,7 @@ public abstract class CoroutineDispatcher :
* is established between them:
*
* ```
* val confined = Dispatchers.Default.limitedParallelism(1)
* val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher")
* var counter = 0
*
* // Invoked from arbitrary coroutines
Expand Down Expand Up @@ -135,15 +135,24 @@ public abstract class CoroutineDispatcher :
* Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement.
* For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded.
*
* @param name optional name for the resulting dispatcher string representation if a new dispatcher was created.
* Implementations are free to ignore this parameter.
* @throws IllegalArgumentException if the given [parallelism] is non-positive
* @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views
*/
@ExperimentalCoroutinesApi
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher {
parallelism.checkParallelism()
return LimitedDispatcher(this, parallelism)
return LimitedDispatcher(this, parallelism, name)
}

// Was experimental since 1.6.0, deprecated since 1.8.x
@Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead",
level = DeprecationLevel.HIDDEN,
replaceWith = ReplaceWith("limitedParallelism(parallelism, null)")
)
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null)

/**
* Requests execution of a runnable [block].
* The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool,
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}
}

final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
return this
return namedOrThis(name) // Single-threaded, short-circuit
}

open fun shutdown() {}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
*/
override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress"

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
// MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
return this
return namedOrThis(name)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Unconfined.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import kotlin.jvm.*
internal object Unconfined : CoroutineDispatcher() {

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined")
}

Expand Down
16 changes: 11 additions & 5 deletions kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import kotlin.coroutines.*
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
private val parallelism: Int,
private val name: String?
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {

// Atomic is necessary here for the sake of K/N memory ordering,
Expand All @@ -34,10 +35,10 @@ internal class LimitedDispatcher(
private val workerAllocationLock = SynchronizedObject()

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= this.parallelism) return this
return super.limitedParallelism(parallelism)
if (parallelism >= this.parallelism) return namedOrThis(name)
return super.limitedParallelism(parallelism, name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down Expand Up @@ -95,7 +96,7 @@ internal class LimitedDispatcher(
}
}

override fun toString() = "$dispatcher.limitedParallelism($parallelism)"
override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)"

/**
* A worker that polls the queue and runs tasks until there are no more of them.
Expand Down Expand Up @@ -128,3 +129,8 @@ internal class LimitedDispatcher(
}

internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }

internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher {
if (name != null) return NamedDispatcher(this, name)
return this
}
25 changes: 25 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package kotlinx.coroutines.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.DefaultDelay
import kotlin.coroutines.*

/**
* Wrapping dispatcher that has a nice user-supplied `toString()` representation
*/
internal class NamedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val name: String
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {

override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)

override fun dispatch(context: CoroutineContext, block: Runnable) = dispatcher.dispatch(context, block)

@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatcher.dispatchYield(context, block)

override fun toString(): String {
return name
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay

abstract fun scheduleQueueProcessing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
return this
return namedOrThis(name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private class MissingMainCoroutineDispatcher(
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
missing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher =
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher =
missing()

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
Expand Down
20 changes: 12 additions & 8 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ internal object DefaultScheduler : SchedulerCoroutineDispatcher(
) {

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= CORE_POOL_SIZE) return this
return super.limitedParallelism(parallelism)
if (parallelism >= CORE_POOL_SIZE) {
return namedOrThis(name)
}
return super.limitedParallelism(parallelism, name)
}

// Shuts down the dispatcher, used only by Dispatchers.shutdown()
Expand Down Expand Up @@ -44,10 +46,12 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() {
}

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= MAX_POOL_SIZE) return this
return super.limitedParallelism(parallelism)
if (parallelism >= MAX_POOL_SIZE) {
return namedOrThis(name)
}
return super.limitedParallelism(parallelism, name)
}

// This name only leaks to user code as part of .limitedParallelism machinery
Expand All @@ -72,9 +76,9 @@ internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
// See documentation to Dispatchers.IO for the rationale
return UnlimitedIoScheduler.limitedParallelism(parallelism)
return UnlimitedIoScheduler.limitedParallelism(parallelism, name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
24 changes: 24 additions & 0 deletions kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
@file:OptIn(ExperimentalStdlibApi::class)

package kotlinx.coroutines

import kotlin.test.*
Expand All @@ -18,5 +20,27 @@ class DispatchersToStringTest {
assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString())
// Not overridden at all, limited parallelism returns `this`
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())

assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString())
assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString())
assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString())
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())

val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited")
assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString())
assertEquals("2", limitedNamed.limitedParallelism(2, "2").toString())
// We asked for too many threads with no name, this was returned
assertEquals("limited", limitedNamed.limitedParallelism(12).toString())
assertEquals("12", limitedNamed.limitedParallelism(12, "12").toString())

runBlocking {
val d = coroutineContext[CoroutineDispatcher]!!
assertContains(d.toString(), "BlockingEventLoop")
val limited = d.limitedParallelism(2)
assertContains(limited.toString(), "BlockingEventLoop")
assertFalse(limited.toString().contains("limitedParallelism"))
val named = d.limitedParallelism(2, "Named")
assertEquals("Named", named.toString())
}
}
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/native/src/Dispatchers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ internal object DefaultIoScheduler : CoroutineDispatcher() {
private val io = unlimitedPool.limitedParallelism(64) // Default JVM size

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
// See documentation to Dispatchers.IO for the rationale
return unlimitedPool.limitedParallelism(parallelism)
return unlimitedPool.limitedParallelism(parallelism, name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
1 change: 1 addition & 0 deletions test-utils/common/src/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ public class TestRuntimeException(message: String? = null, private val data: Any
public class RecoverableTestException(message: String? = null) : RuntimeException(message)
public class RecoverableTestCancellationException(message: String? = null) : CancellationException(message)

// Erases identity and equality checks for tests
public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
return object : CoroutineDispatcher() {
Expand Down

0 comments on commit dfbd4a8

Please sign in to comment.