Skip to content

Commit cdd9a84

Browse files
committed
Add Fiber.timeout and #resolve_timeout?
Also `Fiber::TimeoutToken` and `Fiber::TimeoutResult`.
1 parent 142d42a commit cdd9a84

File tree

1 file changed

+118
-0
lines changed

1 file changed

+118
-0
lines changed

src/fiber.cr

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,124 @@ class Fiber
300300
Fiber.current.cancel_timeout
301301
end
302302

303+
struct TimeoutToken
304+
# :nodoc:
305+
getter value : UInt32
306+
307+
def initialize(@value : UInt32)
308+
end
309+
end
310+
311+
enum TimeoutResult
312+
EXPIRED
313+
CANCELED
314+
end
315+
316+
private TIMEOUT_FLAG = 1_u32
317+
private TIMEOUT_COUNTER = 2_u32
318+
319+
@timeout = Atomic(UInt32).new(0_u32)
320+
321+
# Suspends the current `Fiber` for *duration*.
322+
#
323+
# Yields a `TimeoutToken` before suspending the fiber. The token is required
324+
# to manually cancel the timeout before *duration* expires. See
325+
# `#resolve_timeout?` for details.
326+
#
327+
# The fiber will be automatically resumed after *duration* has elapsed, but it
328+
# may be resumed earlier if the timeout has been manually canceled, yet the
329+
# fiber will only ever be resumed once. The returned `TimeoutResult` can be
330+
# used to determine what happened and act accordingly, for example do some
331+
# cleanup or raise an exception if the timeout expired.
332+
#
333+
# ```
334+
# result = Fiber.timeout(5.seconds) do |cancelation_token|
335+
# enqueue_waiter(Fiber.current, cancelation_token)
336+
# end
337+
#
338+
# if result.expired?
339+
# dequeue_waiter(Fiber.current)
340+
# end
341+
# ```
342+
#
343+
# Consider `::sleep` if you don't need to cancel the timeout.
344+
def self.timeout(duration : Time::Span, & : TimeoutToken ->) : TimeoutResult
345+
token = Fiber.current.create_timeout
346+
yield token
347+
Crystal::EventLoop.current.timeout(Time.monotonic + duration, token)
348+
end
349+
350+
# Identical to `.timeout` but suspending the fiber until an absolute time, as
351+
# per the monotonic clock, is reached.
352+
#
353+
# For example, we can retry something until 5 seconds have elapsed:
354+
#
355+
# ```
356+
# time = Time.monotonic + 5.seconds
357+
# loop do
358+
# break if try_something?
359+
# result = Fiber.timeout(until: time) { |token| add_waiter(token) }
360+
# raise "timeout" if result.expired?
361+
# end
362+
# ```
363+
def self.timeout(*, until time : Time::Span, & : TimeoutToken ->) : TimeoutResult
364+
token = Fiber.current.create_timeout
365+
yield token
366+
Crystal::EventLoop.current.timeout(time, token)
367+
end
368+
369+
# Sets the timeout flag and increments the counter to avoid ABA issues with
370+
# parallel threads trying to resolve the timeout while the timeout was unset
371+
# then set again (new timeout). Since the current fiber is the only one that
372+
# can set the timeout, we can merely set the atomic (no need for CAS).
373+
protected def create_timeout : TimeoutToken
374+
value = (@timeout.get(:relaxed) | TIMEOUT_FLAG) &+ TIMEOUT_COUNTER
375+
@timeout.set(value, :relaxed)
376+
TimeoutToken.new(value)
377+
end
378+
379+
# Tries to resolve the timeout previously set on `Fiber` using the cancelation
380+
# *token*. See `Fiber.timeout` for details on setting the timeout.
381+
#
382+
# Returns true when the timeout has been resolved, false otherwise.
383+
#
384+
# The caller that succeeded to resolve the timeout owns the fiber and must
385+
# eventually enqueue it. Failing to do so means that the fiber will never be
386+
# resumed.
387+
#
388+
# A caller that failed to resolve the timeout must skip the fiber. Trying to
389+
# enqueue the fiber would lead the fiber to be resumed twice!
390+
#
391+
# ```
392+
# require "wait_group"
393+
#
394+
# WaitGroup.wait do |wg|
395+
# cancelation_token = nil
396+
#
397+
# suspended_fiber = wg.spawn do
398+
# result = Fiber.timeout(5.seconds) do |token|
399+
# # save the token so another fiber can try to cancel the timeout
400+
# cancelation_token = token
401+
# end
402+
#
403+
# # prints either EXPIRED or CANCELED
404+
# puts result
405+
# end
406+
#
407+
# sleep rand(4..6).seconds
408+
#
409+
# # let's try to cancel the timeout
410+
# if suspended_fiber.resolve_timeout?(cancelation_token)
411+
# # canceled: we must enqueue the fiber
412+
# suspended_fiber.enqueue
413+
# end
414+
# end
415+
# ```
416+
def resolve_timeout?(token : TimeoutToken) : Bool
417+
_, success = @timeout.compare_and_set(token.value, token.value & ~TIMEOUT_FLAG, :relaxed, :relaxed)
418+
success
419+
end
420+
303421
# Yields to the scheduler and allows it to swap execution to other
304422
# waiting fibers.
305423
#

0 commit comments

Comments
 (0)