#| Scheduler that cues tasks in sequence using a Channel.
class MessageQueueScheduler does Scheduler {
my Channel $queue; # Single channel instance to queue across all Schedulers
has Scheduler $!defer; # Alternate Scheduler to defer :at and :in promises to.
#| Create the MessageQueueScheduler
method BUILD(Scheduler :$!defer = ThreadPoolScheduler.new(:max_threads(1))) {
$queue //= Channel.new;
}
#| Schedule the tasks
method cue(&code, Instant :$at, :$in, :$every, :$times = 1; :&catch --> Cancellation) {
my Promise @promises;
#| We defer to the alternate Scheduler for at and in Promises
$at && @promises.push: Promise.at($at, :scheduler($!defer));
$in && @promises.push: Promise.in($in, :scheduler($!defer));
#| Sink context to schedule so we're always returning a Nil Cancellation object
sink if @promises {
Promise.allof(@promises).then({$queue.send: &code});
} else {
$queue.send: &code;
}
}
method loads { #`(When does this even get called?) };
#| Close the queue – helpful at end of processing.
method close {
$queue.close;
}
# During the END phaser, process everything in the queue.
END {
$queue and react {
whenever $queue -> $code {
$code.();
}
whenever Promise.in(2) {
# Don't run for more than 2 seconds.
$queue.close;
done;
}
}
}
}
# For the rest of the script, schedule with our MessageQueueScheduler
my $*SCHEDULER = MessageQueueScheduler.new;
# This code will execute in the END phase
start { say ‘Second’ };
Promise.in(0.2).then(
{ say ‘Third’;
Promise.in(0.5).then(
{ Promise.in(0.1).then({ say ‘Fifth’ });
say ‘Fourth’;
});
});
sleep 0.2;
say ‘First’;