Skip to content

Commit

Permalink
Progress meter for reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
tkf committed Oct 16, 2019
1 parent 7e8a367 commit 8673ac0
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
85 changes: 85 additions & 0 deletions src/progress.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,91 @@ end

Base.eltype(::ProgressLoggingFoldable{T}) where {T} = eltype(T)

struct LogProgressOnCombine{C} <: Transducer
chan::C
progress_interval::Float64
end

# Recrod started time:
start(rf::R_{LogProgressOnCombine}, result) =
wrap(rf, (time(), 0), start(inner(rf), result))

# Count processed number of items:
@inline next(rf::R_{LogProgressOnCombine}, result, input) =
wrapping(rf, result) do (t0, n), iresult
(t0, n + 1), next(inner(rf), iresult, input)
end

@inline complete(rf::R_{LogProgressOnCombine}, result) =
complete(inner(rf), unwrap(rf, result)[2])

# Send number of processed item to progress channel:
@inline function combine(rf::R_{LogProgressOnCombine}, a, b)
(ta, na), ira = unwrap(rf, a)
(tb, nb), irb = unwrap(rf, b)
irc = combine(inner(rf), ira, irb)

tc = time()
t0 = min(ta, tb)
nc = na + nb
if tc - t0 > xform(rf).progress_interval
put!(xform(rf).chan, nc)
nc = 0
else
tc = t0
end

return wrap(rf, (tc, nc), irc)
end

function _reduce_progress(reduce_impl, rf0, init, coll)
if rf0 isa R_{UseSIMD}
xf0 = xform(rf0)
rfinner = inner(rf0)
else
xf0 = IdentityTransducer()
rfinner = rf0
end

chan = Channel{Int}(0)
xf = xf0 |> LogProgressOnCombine(chan, coll.reducible.interval)
rf = Reduction(xf, rfinner)

reducible = @set coll.reducible = coll.reducible.foldable
progress_task = @async let n = length(coll.reducible.foldable)
__progress() do id
foreach(Scan(+), chan) do i
@debug "reduce" _id=id progress=i/n
end
end
end
try
result = reduce_impl(rf, init, reducible)
result isa Reduced && return result
# Manually unwrap LogProgressOnCombine's private state:
_, iresult = unwrap(rf, result)
return iresult
finally
close(chan)
wait(progress_task)
end
end

_reduce(ctx, rf, init, coll::SizedReducible{<:ProgressLoggingFoldable}) =
_reduce_progress(rf, init, coll) do rf, init, coll
_reduce(ctx, rf, init, coll)
end

if VERSION >= v"1.2"
_reduce_threads_for(rf, init, coll::SizedReducible{<:ProgressLoggingFoldable}) =
_reduce_progress(_reduce_threads_for, rf, init, coll)
else
# In earlier versions, Channel was not thread-safe (?)
# https://github.com/JuliaLang/julia/pull/30186
_reduce_threads_for(rf, init, coll::SizedReducible{<:ProgressLoggingFoldable}) =
_reduce_threads_for(rf, init, SizedReducible(coll.reducible.foldable))
end

struct RemoteFoldlWithLogging{C} <: Function
chan::C
progress_interval::Float64
Expand Down
14 changes: 14 additions & 0 deletions test/test_parallel_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,18 @@ end
end
end

@testset "withprogress" begin
xf = Map() do x
sleep(0.01)
x
end
@test reduce(+, xf, withprogress(1:100; interval=0); basesize=1) == 5050

xf2 = ScanEmit(0) do u, x
y = u + x
y, y
end
@test reduce(right, xf2, withprogress(1:100; interval=0); basesize=1) == 5050
end

end # module

0 comments on commit 8673ac0

Please sign in to comment.