Module sched
Sched is a Lua collaborative scheduler: it allows several Lua tasks to run in parallel, and to communicate together when they need to interact.
It offers a convinient way to write programs which address multiple I/O driven issues simultaneously, with much less hassle than with preemptive multithreading frameworks; it also doesn't require developers to adopt unusual programming styles, as expected by Erlang, map-reduce variants, or callback-driven frameworks such as node.js. Among other appropriate usages, it allows to easily write and deploy the applications typically powering machine-to-machine infrastructures.
General principles
Vocabulary
processes: concurrent fragments of programs, each having exclusive access to its own memory space.
threads: concurrent fragments of programs, sharing the same memory space, and which therefore need to synchronize their memory operations.
tasks: concurrent fragments of programs of any sort. Processes and threads are particular kinds of tasks.
Collaborative multi-tasking
"Collaborative" implies that the currently running task only changes when it calls a blocking function, i.e. a function which has to wait for an external event. For instance, an attempt to read on an empty socket will block until more data arrives from the network. If the current task tries to read from an empty socket, it will be unscheduled, and will let other tasks run, until more network data arrive. Most collaborative multi-tasking systems, including sched, cannot leverage multi-core architectures.
Collaborative multi-tasking has one big advantage: it considerably reduces the concurrency issues. Since there are few places where the running task can change, there are much fewer occasions for race conditions, deadlocks, and other concurrency-specific problems. Programmers used to develop preemptively multithreaded applications will be delighted to see how uneventful to debug collaborative concurrent systems are.
To quote the authors of Lua, "we did not (and still do not) believe in the standard multithreading model, which is preemptive concurrency with shared memory: we still think that no one can write correct programs in a language where a=a+1 is not deterministic" (pdf). Hence, Lua coroutines share their memory, but give up non-deterministic preemptive scheduling.
Alternatives
Other languages make the complementary choice: for instance, Erlang retains preemptive concurrency, but forbids shared memory; although such languages allow to reach unmatched levels of reliability (pdf), they deeply change the way programmers have to model their programs. They are therefore less suitable to a generalist audience.
It should be noted that Unix' original design philosophy (many small and short-lived specialized processes, with separated memories, which communicate through file descriptors and IPC), also restrains memory sharing to keep maintainable preemptive scheduling.
Finally, some process oriented (separate memory and message-passing) multi-tasking systems are available in Lua, most notably Luaproc (pdf). Although such systems make more sense for computation-intensive jobs on multi-core and distributed architectures, it is possible to make it cohabit with sched's multitasking (having several Luaproc processes, each running several sched threads).
Collaborative limitation
Collaborative multi-tasking comes with a limitation, compared to the preemptive variant. Greedy tasks, which never pause nor perform any blocking API call, might monopolize the CPU indefinitely. Although it rarely happen unless on purpose, it means that collaborative schedulers are unsuitable for real-time systems. If such real-time needs occur in a sched-based application, they must be addressed in C in a separate process, and the underlying OS must offer the suitable real-time performances.
Another perceived issue is that a rogue task can crash all other tasks in the scheduler, but that's true of any pool of tasks sharing their memory, including systems like pthreads.
Sched principles
Communication between tasks
Sched offers a fundamental communication mechanism, called the signal, over which other mechanisms (mutexes, fifos, synchronized program sections etc.) can be built. You will often find that signals are the simplest and most suitable way to coordinate tasks together, although the classic POSIX-like IPC systems are always available when needed or preferred.
A signal is composed of:
- an arbitrary emitter object;
- an event: a string describing what noteworthy event happened to the emitter;
- optional arguments, which complete the description of the event's circumstances.
So every object in Lua can emit signals, and emitted signals can trigger reactions:
- a signal can wake up a task which was waiting for it;
- a signal can trigger the execution of a hook, i.e. a function which is run every time a registered signal is detected.
Objects are encouraged to emit signals every time an event of interest
happens to them, so that other tasks can react to these events. For
instance, the modem module emits a signal every time a SMS arrives; the
NTP time synchronizer advertises clock changes through signals; IP
interfaces signal important events such as going up or going down;
every task emits a 'die'
signal when it exits, allowing other tasks
to monitor their termination; etc. Many complex systems, such as the
telnet shell, the Agent initialization process, or TCP data
handling, are internally synchronized through signals.
Blocking tasks on signals with sched.wait
A wait or a hook can be registered to many signals simultaneously, and conversely, a single signal can trigger many hooks and task wake-ups.
Most waits and hooks wait for signals from only one emitter. Consider the following example:
local event, status, result = sched.wait(some_task, 'die')
It will block the current task until the task some_task
terminates.
sched.wait returns the triggering event (here always 'die'
),
and extra signal arguments. In the case of task 'die'
events, those are
the status (whether the task exited successfully or because of an
error), and any result returned by the function (or an error message
if status
is false
).
A task can wait for several events from a single emitter. For instance, the following statement will wait until the the network manager either succeeds or fails at mounting an IP interface:
sched.wait('NETMAN', {'MOUNTED', 'MOUNT_FAILED'})
If a number is added in the events list, it's a timeout in seconds: if
none of the subscribed events occur before the timeout elapses, then
sched.wait returns with the event 'timeout'
:
local event = sched.wait('NETMAN', {'MOUNTED', 'MOUNT_FAILED', 30})
if event == 'MOUNTED' then print "Success!"
elseif event == 'MOUNT_FAILED' then print "Failure!"
elseif event == 'timeout' then print "Took more than 30s to mount!"
else assert(false, "This cannot happen") end
One can also subscribe to every events sent by a given emitter, by
subscribing to the special '*'
string:
local event = sched.wait('NETMAN', '*')
if event == 'MOUNTED' then print "Success!"
elseif event == 'MOUNT_FAILED' then print "Failure!"
else print("Ignore event "..event) end
Special '*'
event can be combined with a timeout, as in
sched.wait(X, {'*', 30})
.
A task might need to wait on signals sent by several potential emitters. This need is addressed by the sched.multiWait API, which works as sched.wait except that it takes a list of emitters rather than a single one.
Finally, a task can reschedule itself without blocking. It gives other tasks an opportunity to run; once every other task had a chance to run, the rescheduled task can restart. Task rescheduling is performed by calling sched.wait without argument.
for i=1, BIG_NUMBER do
perform_long_computation_chunk(i)
sched.wait()
end
Attaching hooks to signals
Hooks can be registered the same way tasks can be blocked: they are
attached to a signal emitter, and to one, several or all ('*'
)
events. A hook has a function, which is executed when one of the
registered signal is registered. The variants of hook attachment
are:
sched.sigHook
(emitter, events, func, hook_args...)
: the functionfunc(event, hook_args..., signal_args...)
will be run every timesched.signal(emitter, event, signal_args...)
is triggered. It is run synchronously, i.e. beforesched.signal()
returns, so it can't contain any blocking API call. It will keep being triggered every time one of the registered signal occurs, until the reference returned by sched.sigHook is passed to sched.kill.sched.sigOnce
(emitter, events, func, hook_args...)
: behaves as sched.sigHook, except that it's only run once. The hook function is also forbidden from performing blocking calls.sched.sigRun
(emitter, events, func, hook_args...)
: works as sched.sigHook, except that the function is run as a scheduled task. As a result, there is no guarantee that it will be executed as soon as the signal has been sent (there might be a delay), but it is allowed to call blocking APIs. For most usages, this form is to be preferred as simpler.sched.sigRunOnce
(emitter, events, func, hook_args...)
: behaves as sched.sigRun, except that it's only run once. The hook function is also allowed to perform blocking calls.
There is a guarantee that hooks won't miss an signal. Blocking a task might lose some signals, if they are not "consumed" fast enough.
For instance, if one task A produces a signal every 2 seconds, and a task B waits for them, but takes 3 seconds to process each of them, some of them will be lost: they will occur when B processes the previous one, and doesn't wait for any signal.
Therefore, if it is important not to lose any occurrence of a signal, a waiting task is not an adequate solution: either handle them in a hook, or pass them through a pipe.
Example
sched.sigRun('FOO', 'BAR', function(event, arg)
print(">>> sigRun FOO.BAR received event " .. event .. ", arg " .. arg)
end
sched.sigRunOnce('FOO', 'BAR', function(event, arg)
print(">>> sigRunOnce FOO.BAR received event " .. event .. ", arg " .. arg)
end
sched.sigRun('FOO', '*', function(event, arg)
print(">>> sigRun FOO.* received event " .. event .. ", arg " .. arg)
end
sched.sigRunOnce('FOO', '*', function(event, arg)
print(">>> sigRunOnce FOO.* received event " .. event .. ", arg " .. arg)
end
sched.signal('FOO', 'GNAT', 1)
>>> sigRun FOO.* received event GNAT, arg 1
>>> sigRunOnce FOO.* received event GNAT, arg 1
sched.signal('FOO', 'BAR', 2) -- sigRunOnce FOO.* now detached
>>> sigRun FOO.BAR received event FOO, arg 2
>>> sigRunOnce FOO.BAR received event FOO, arg 2
>>> sigRun FOO.* received event FOO, arg 2
sched.signal('FOO', 'BAR', 3) -- sigRunOnce FOO.BAR now detached
>>> sigRun FOO.BAR received event FOO, arg 3
>>> sigRun FOO.* received event FOO, arg 2
sched.signal('GNAT', 'BAR', 2) -- wrong emitter, no hook triggered
Tasks life cycle
Tasks are created by sched.run: it takes a function, and
optionally arguments to this function, and runs it as a separate,
parallel task. It also returns the created task, as a regular
coroutine. The main use for this is to either cancel it
with sched.kill, or wait for its termination with
sched.wait(task,'die')
.
Tasks are sorted in an internal table:
blocked tasks are indexed, in
__tasks.waiting
, by the emitters and events which might unblock them;tasks which are ready to run are stacked in a
__tasks.ready
list.
A task created with sched.run doesn't start immediately, it is
merely stacked in __tasks.ready
. When the current task dies or
blocks, the next one in __tasks.ready
takes over; it can be the last
created one, or another one which went ready before it.
The termination of a task is advertised by a 'die'
signal. By
waiting for this signal, one can synchornize on a task's completion.
The 'die'
signal has additional arguments: a status (true
if the
task returned successfully, false
if it has been aborted by an
error, "killed"
if the task has been cancelled by sched.kill),
followed by either the returned value(s) or the error message. Here
are some examples:
-- Common case, the task terminates successfully:
task = sched.run(function()
for i=1,3 do
print ">>> plop"
wait (2)
end
return "I'm done plopping."
end)
sched.sigHook(task, 'die', function(event, ...)
print(">>> He's dead, Jim. His last words were: "..sprint{...})
end)
>>> plop
>>> plop
>>> plop
>>> He's dead, Jim. His last words were: { true, "I'm done plopping" }
-- If the task is interrupted with sched.kill(task):
>>> plop
sched.kill(task)
>>> He's dead, Jim. His last words were: { "killed" }
-- If task dies with an error:
task = sched.run(function()
for i=1,3 do
print ">>> plop"
wait (2)
end
error "Aaargh!"
end)
sched.sigHook(task, 'die', function(event, ...)
print(">>> He's dead, Jim. His last words were: "..sprint{...})
end)
>>> plop
>>> plop
>>> plop
>>> He's dead, Jim. His last words were: { false, "Aaargh!" }
Finally, there is the issue of launching the scheduler at the
top-level. This issue is OS-dependent, but on POSIX-like systems, this
is done by calling sched.loop
: this function will run every task in
a loop, and perform timer management in order to avoid busy waits. It
never returns, unless you call os.exit
. Therefore, before starting
the loop, it is important to have prepared some tasks to launch, with
sched.run.
As an example, here is a complete program which starts a telnet server and writes the time in parallel, in a file, every 10 seconds.
Notice that shell.telnet.init
creates a listening socket, which will
launch a new telnet client for each connection request, and each of
these clients will run concurrently with other tasks.
require 'sched'
require 'shell.telnet'
sched.run (function()
shell.telnet.init{
address = '0.0.0.0',
port = 2000,
editmode = "edit",
historysize = 100 }
end)
sched.run(function()
local f = io.open('/tmp/time.txt', 'w')
while true do
f :write(os.date(), '\n')
f :flush()
wait(10)
end
end)
sched.loop()
Type sched
sched.gc() |
Does a full Garbage Collect and removes dead tasks from waiting lists. |
sched.kill(x) |
Kills a task. |
sched.killSelf() |
Kills the current task. |
sched.multiWait(emitters, events) |
Waits on several emitters. |
sched.run(f, ...) |
Runs a function as a new thread. |
sched.sigHook(emitter, events, f, varargs) |
Hooks a callback function to a set of signals. |
sched.sigOnce(emitter, events, f, varargs) |
Hooks a callback function to a set of signals, to be triggered only once. |
sched.sigRun(emitter, events, f, varargs) |
Hooks a callback function to a set of signals. |
sched.sigRunOnce(emitters, events, f, varargs) |
Hooks a callback function to a set of signals. |
sched.signal(emitter, event, ...) |
Sends a signal from |
sched.step() |
Runs all the tasks which are ready to run, until every task is blocked, waiting for an event (i.e. |
sched.wait(emitter, varargs) |
Forces the currently running task out of scheduling until a certain signal is received. |
Type sched
Field(s)
- sched.gc()
-
Does a full Garbage Collect and removes dead tasks from waiting lists.
Dead tasks are removed when the expected event happens or when the expected event emitter dies. If that never occurs, and you still want to claim the memory associated with these dead tasks, you can always call this function and it will remove them.
There's usually no need to call this function explicitly, it will be triggered automatically when it's needed and the scheduler is about to go idle.
Return value
memory available (in number of bytes) after gc.
- sched.kill(x)
-
Kills a task.
Implementation principle: the task is killed by either
- making it send a
KILL_TOKEN
error if it is currently running - waking it up from a sched.wait yielding with
KILL_TOKEN
as an argument, which in turn makes sched.wait send aKILL_TOKEN
error.
Parameter
-
x
: task to kill, as returned by sched.sigHook for example.
Return values
-
nil
if it killed another task, -
never returns if it killed the calling task.
- making it send a
- sched.killSelf()
-
Kills the current task.
Return value
never returns, since the task is killed.
- sched.multiWait(emitters, events)
-
Waits on several emitters.
Same as sched.wait, except that:
- the first argument is a list of emitters rather than a single emitter;
- it returns
emitter, event, args...
instead of justevent, args...
; indeed, the caller might want to know which emitter rescheduled it; - it's illegal not to enclose events in a list.
Parameters
-
emitters
: table containing a list of the emitters to wait on -
events
: table containing a list of the events to wait on, or a string describing an event's kind, or a number defining timeout for this call.
Return value
emitter, event, args that caused this call to end.
- sched.run(f, ...)
-
Runs a function as a new thread.
sched.run(f[, args...])
schedulesf(args...)
as a new task, which will run in parallel with the current one, after the current one yields.Parameters
-
f
: function or task to run -
...
: optional arguments to pass to paramf
Return value
new thread created to run the function
-
- sched.sigHook(emitter, events, f, varargs)
-
Hooks a callback function to a set of signals.
Signals are described as for sched.wait. See this function for more details.
The callback is called synchronously as soon as the corresponding signal is emitted, and every time it is emittes. That is, the function
f
will have returned before the call to sched.signal which triggered it returns.This puts a constraint on
f
: it must not block and try to reschedule itself. If a hook function calls a blocking API, it will trigger an error.The hook will receive as arguments the triggering event, and any extra params passed along with the signal.
Parameters
-
emitter
: list of signal emitters to watch or a string describing single emitter to watch -
events
: events to watch from the emitters: a table containing a list of the events to wait on, a string discribing an event's kind, or a number defining timeout for this call. -
f
: function to be used as hook -
varargs
: extra optional params to be given to hook when called
Return value
registered hook, which can be passed to sched.kill to cancel the registration.
-
- sched.sigOnce(emitter, events, f, varargs)
-
Hooks a callback function to a set of signals, to be triggered only once.
This function has the same API and behavior as sched.sigHook, except that the hook will only be run once: it detaches itself from all of its registrations when it's first triggered.
Parameters
-
emitter
: a list of signal emitters to watch or a string describing a single emitter to watch -
events
: events to watch from the emitters: a table containing a list of the events to wait on, a string describing an event's kind, or a number defining timeout for this call. -
f
: function to be used as hook -
varargs
: extra optional params to be given to hook when called
Return value
registred hook
-
- sched.sigRun(emitter, events, f, varargs)
-
Hooks a callback function to a set of signals.
This function has the same API as sched.sigHook, except that
f
runs in a separate thread. The consequences are:f
is not run immediately, but after the task which called the triggering signal rescheduled;f
is allowed to call blocking APIs.
Parameters
-
emitter
: a signal emitter to watch -
events
: events to watch from the emitters: a table containing a list of the events to wait on, a string describing an event's kind, or a number defining timeout for this call. -
f
: function to be used as hook -
varargs
: extra optional params to be given to hook when called
Return value
registered hook, which can be passed to sched.kill to cancel the registration.
- sched.sigRunOnce(emitters, events, f, varargs)
-
Hooks a callback function to a set of signals.
The hook will be called in a new thread (allowing the hook to block), and only one time.
This function has the same API and behavior as sched.sigRun, except that the hook will only be run once: it detaches itself from all of its registrations when it's first triggered.
Parameters
-
emitters
: a signal emitter to watch -
events
: events to watch from the emitters: a table containing a list of the events to wait on, a string describing an event's kind, or a number defining timeout for this call. -
f
: function to be used as hook -
varargs
: extra optional params to be given to hook when called
Return value
registered hook, which can be passed to sched.kill to cancel the registration.
-
- sched.signal(emitter, event, ...)
-
Sends a signal from
emitter
with messageevent
, plus optional extra args.This means:
- rescheduling every task waiting for this
emitter.event
signal; - immediately running the hooks listening to this signal;
- reattaching the hook cells which don't have a
once
field.
Parameters
-
emitter
: arbitrary Lua object which sends the signal. -
event
: a string representing the event's kind. -
...
: extra args passed to the woken up tasks and triggered hooks.
Return value
nothing.
- rescheduling every task waiting for this
- sched.step()
-
Runs all the tasks which are ready to run, until every task is blocked, waiting for an event (i.e.
until
__tasks.ready
is empty).This function is used by the scheduler's top-level loop, but shouldn't be called explicitly by users.
- sched.wait(emitter, varargs)
-
Forces the currently running task out of scheduling until a certain signal is received.
Take a list of emitters, and a list of events. The task will be rescheduled the next time one of the listed events happens to one of the listed emitters. If there is a single emitter (resp. a single event), it can be passed outside of a table, i.e.
wait(x, "open")
is the same as
wait({x}, {"open"})
If no emitter or no event is listed, the task waits for nothing in particular, i.e. it puts itself back at the end of the scheduling queue, thus giving other threads a chance to run.
There must be a task currently running, i.e. __tasks.running ~= nil.
Cf. description of __tasks.waiting for a description of how tasks are put to wait on signals.
Parameters
-
emitter
: table listing emitters to wait on. Can also be string to define single emitter, or a number to specify a timeout in seconds (wait used as sleep function) -
varargs
: optional varargs: can be events list in a table, or several events in several arguments. Last event can be a number to specify a timeout call.
Return value
the event and extra parameters of the signal which unlocked the task.
Usage:
--reschedules current task, giving other tasks a chance to run: sched.wait() -- blocks the current task for `timeout` seconds: sched.wait(timeout) -- waits until `emitter` signals this `event`: sched.wait(emitter, event) -- waits until `emitter` signals any event: sched.wait(emitter, "*") -- waits until `emitter` signals any one of the `eventN` in the list: sched.wait(emitter, {event1, event2, ...}) -- waits until `emitter` signals any one of the `eventN` in the list, -- or `timeout` seconds have elapsed, whichever occurs first: sched.wait(emitter, {event1, event2, ...timeout}) -- admissible shorcut for `sched.wait(emitter, {event1, event2...}): sched.wait(emitter, event1, event2, ...)
-