Skip to content

Commit

Permalink
Merge pull request #21 from ReactiveCocoa/anders/fix-unprocessed-event
Browse files Browse the repository at this point in the history
Fix a scenario which enqueued events are left unprocessed.
  • Loading branch information
sergdort authored Aug 12, 2020
2 parents 84d9d09 + 1c701e0 commit 4eb3c52
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
44 changes: 26 additions & 18 deletions Loop/Floodgate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
)
}

reducerLock.perform { reentrant in
assert(reentrant == false)
drainEvents()
}
processEnqueuedEvents()
}

override func process(_ event: Event, for token: Token) {
enqueue(event, for: token)
processEnqueuedEvents()
}

private func processEnqueuedEvents() {
var continueToDrain = false

repeat {
Expand All @@ -76,7 +76,13 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
// to exhaustively drain the queue.
continueToDrain = reducerLock.tryPerform { isReentrant in
guard isReentrant == false else { return false }
drainEvents()

// Drain any recursively produced events.
while let next = dequeue() {
reducer(&state, next)
changeObserver.send(value: (state, next))
}

return true
}
} while queue.withValue({ $0.hasEvents }) && continueToDrain
Expand Down Expand Up @@ -107,7 +113,21 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
}

func withValue<Result>(_ action: (State, Bool) -> Result) -> Result {
reducerLock.perform { _ in action(state, hasStarted) }
let (result, isReentrant) = reducerLock.perform { isReentrant in
(action(state, hasStarted), isReentrant)
}

// If the lock acquisition above is not reentrant, we are the lock owner, and events could be enqueued in
// parallel during our brief ownership. So as the lock owner, we are obligated to defensively drain the event
// queue here, even if we did not enqueue any event.
//
// If the lock acquisition is reentrant, it means someone else in the call stack is the lock owner. So we can
// dodge this obligation.
if isReentrant == false {
processEnqueuedEvents()
}

return result
}

func dispose() {
Expand All @@ -132,16 +152,4 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
return $0.events.removeFirst().0
}
}

private func drainEvents() {
// Drain any recursively produced events.
while let next = dequeue() {
consume(next)
}
}

private func consume(_ event: Event) {
reducer(&state, event)
changeObserver.send(value: (state, event))
}
}
33 changes: 31 additions & 2 deletions LoopTests/FeedbackLoopSystemTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class FeedbackLoopSystemTests: XCTestCase {
.startWithValues { values.append($0) }

expect(values) == ["initial", "initial_a", "initial_a_a"]
expect(startCount) == 2
expect(startCount) == 3
}

func test_should_not_miss_delivery_to_reducer_when_started_asynchronously() {
Expand Down Expand Up @@ -334,7 +334,13 @@ class FeedbackLoopSystemTests: XCTestCase {
results.append(value)
}

expect(results) == [0, 2, 1002, 2004, 4006]
// 0
// 0 + 2 # from `then(.init(value: 2))`
// 2 + (2 + 1000) # from the 1st value yielded by `concat(...)`
// 1004 + (1004 + 1000) = 3008 # from the 2nd value yielded by `concat(...)`
// 3008 + (3008 + 1000) = 7016 # from the 3rd value yielded by `concat(...)`

expect(results) == [0, 2, 1004, 3008, 7016]
}

func test_should_not_deadlock_when_feedback_effect_starts_loop_producer_synchronously() {
Expand Down Expand Up @@ -373,6 +379,29 @@ class FeedbackLoopSystemTests: XCTestCase {
#endif
}

func test_should_process_events_enqueued_during_starting_loop_producer() {
let loop = Loop<Int, Int>(
initial: 0,
reducer: { $0 += $1 },
feedbacks: []
)

var latestCount: Int?
var hasSentEvent = false

loop.producer
.on(value: { _ in
// The value event here is delivered in the critical section protected by `Floodgate.withValue`.
if !hasSentEvent {
hasSentEvent = true
loop.send(1000)
}
})
.startWithValues { latestCount = $0 }

expect(latestCount) == 1000
}

func test_events_are_produced_in_correct_order() {
let (feedback, input) = Loop<Int, Int>.Feedback.input
var events: [Int] = []
Expand Down

0 comments on commit 4eb3c52

Please sign in to comment.