sched
(c) Sierra Wireless, 2007-2012
Sched is a Lua collaborative scheduler: it allows several Lua tasks to
run in parallel, and to communicate together when they need to
interact.
It offers a convinient way to write programs which address multiple I/O driven issues simultaneously, with much less hassle than with preemptive multithreading frameworks; it also doesn't require developers to adopt unusual programming styles, as expected by Erlang, map-reduce variants, or callback-driven frameworks such as node.js. Among other appropriate usages, it allows to easily write and deploy the applications typically powering machine-to-machine infrastructures.
General principlesprocesses: concurrent fragments of programs, each having exclusive access to its own memory space.
threads: concurrent fragments of programs, sharing the same memory space, and which therefore need to synchronize their memory operations.
tasks: concurrent fragments of programs of any sort. Processes and threads are particular kinds of tasks.
"Collaborative" implies that the currently running task only changes when it calls a blocking function, i.e. a function which has to wait for an external event. For instance, an attempt to read on an empty socket will block until more data arrives from the network. If the current task tries to read from an empty socket, it will be unscheduled, and will let other tasks run, until more network data arrive. Most collaborative multi-tasking systems, including sched, cannot leverage multi-core architectures.
Collaborative multi-tasking has one big advantage: it considerably reduces the concurrency issues. Since there are few places where the running task can change, there are much fewer occasions for race conditions, deadlocks, and other concurrency-specific problems. Programmers used to develop preemptively multithreaded applications will be delighted to see how uneventful to debug collaborative concurrent systems are.
To quote the authors of Lua, "we did not (and still do not) believe in the standard multithreading model, which is preemptive concurrency with shared memory: we still think that no one can write correct programs in a language where a=a+1 is not deterministic" [pdf]. Hence, Lua coroutines share their memory, but give up non-deterministic preemptive scheduling.
Other languages make the complementary choice: for instance, Erlang retains preemptive concurrency, but forbids shared memory; although such languages allow to reach unmatched levels of reliability [pdf], they deeply change the way programmers have to model their programs. They are therefore less suitable to a generalist audience.
It should be noted that Unix' original design philosophy (many small and short-lived specialized processes, with separated memories, which communicate through file descriptors and IPC), also restrains memory sharing to keep maintainable preemptive scheduling.
Finally, some process oriented (separate memory and message-passing) multi-tasking systems are available in Lua, most notably Luaproc [pdf]. Although such systems make more sense for computation-intensive jobs on multi-core and distributed architectures, it is possible to make it cohabit with sched's multitasking (having several Luaproc processes, each running several sched threads).
Collaborative multi-tasking comes with a limitation, compared to the preemptive variant. Greedy tasks, which never pause nor perform any blocking API call, might monopolize the CPU indefinitely. Although it rarely happen unless on purpose, it means that collaborative schedulers are unsuitable for real-time systems. If such real-time needs occur in a sched-based application, they must be addressed in C in a separate process, and the underlying OS must offer the suitable real-time performances.
Another perceived issue is that a rogue task can crash all other tasks in the scheduler, but that's true of any pool of tasks sharing their memory, including systems like pthreads.
Sched principlesSched offers a fundamental communication mechanism, called the signal, over which other mechanisms (mutexes, fifos, synchronized program sections etc.) can be built. You will often find that signals are the simplest and most suitable way to coordinate tasks together, although the classic POSIX-like IPC systems are always available when needed or preferred.
A signal is composed of:
So every object in Lua can emit signals, and emitted signals can trigger reactions:
Objects are encouraged to emit signals every time an event of interest
happens to them, so that other tasks can react to these events. For
instance, the modem module emits a signal every time a SMS arrives; the
NTP time synchronizer advertises clock changes through signals; IP
interfaces signal important events such as going up or going down;
every task emits a 'die'
signal when it exits, allowing other tasks
to monitor their termination; etc. Many complex systems, such as the
telnet shell, the ReadyAgent initialization procedure, or TCP data
handling, are internally synchronized through signals.
sched.wait
A wait or a hook can be registered to many signals simultaneously, and conversely, a single signal can trigger many hooks and task wake-ups.
Most waits and hooks wait for signals from only one emitter. Consider the following example:
local event, status, result = sched.wait(some_task, 'die')
It will block the current task until the task some_task
terminates.
#sched.wait returns the triggering event (here always 'die'
),
and extra signal arguments. In the case of task 'die'
events, those are
the status (whether the task exited successfully or because of an
error), and any result returned by the function (or an error message
if status
is false
).
A task can wait for several events from a single emitter. For instance, the following statement will wait until the the network manager either succeeds or fails at mounting an IP interface:
sched.wait('NETMAN', {'MOUNTED', 'MOUNT_FAILED'})
If a number is added in the events list, it's a timeout in seconds: if
none of the subscribed events occur before the timeout elapses, then
#sched.wait returns with the event 'timeout'
:
local event = sched.wait('NETMAN', {'MOUNTED', 'MOUNT_FAILED', 30})
if event == 'MOUNTED' then print "Success!"
elseif event == 'MOUNT_FAILED' then print "Failure!"
elseif event == 'timeout' then print "Took more than 30s to mount!"
else assert(false, "This cannot happen") end
One can also subscribe to every events sent by a given emitter, by
subscribing to the special '*'
string:
local event = sched.wait('NETMAN', '*')
if event == 'MOUNTED' then print "Success!"
elseif event == 'MOUNT_FAILED' then print "Failure!"
else print("Ignore event "..event) end
Special '*'
event can be combined with a timeout, as in
sched.wait(X, {'*', 30})
.
A task might need to wait on signals sent by several potential emitters. This need is addressed by the sched.multiWait API, which works as sched.wait except that it takes a list of emitters rather than a single one.
Finally, a task can reschedule itself without blocking. It gives other tasks an opportunity to run; once every other task had a chance to run, the rescheduled task can restart. Task rescheduling is performed by calling sched.wait without argument.
for i=1, BIG_NUMBER do
perform_long_computation_chunk(i)
sched.wait()
end
Hooks can be registered the same way tasks can be blocked: they are
attached to a signal emitter, and to one, several or all ('*'
)
events. A hook has a function, which is executed when one of the
registered signal is registered. The variants of hook attachment
are:
#sched.sigHook(emitter, events, func, hook_args...)
: the function
func(event, hook_args..., signal_args...)
will be run every time
sched.signal(emitter, event, signal_args...)
is triggered. It is
run synchronously, i.e. before sched.signal()
returns, so it can't
contain any blocking API call. It will keep being triggered
every time one of the registered signal occurs, until the reference
returned by #sched.sigHook is passed to #sched.kill.
#sched.sigOnce(emitter, events, func, hook_args...)
: behaves as
#sched.sigHook, except that it's only run once. The hook
function is also forbidden from performing blocking calls.
#sched.sigRun(emitter, events, func, hook_args...)
: works as
#sched.sigHook, except that the function is run as a scheduled
task. As a result, there is no guarantee that it will be executed as
soon as the signal has been sent (there might be a delay), but it is
allowed to call blocking APIs. For most usages, this form is to be
preferred as simpler.
#sched.sigRunOnce(emitter, events, func, hook_args...)
: behaves
as #sched.sigRun, except that it's only run once. The hook
function is also allowed to perform blocking calls.
There is a guarantee that hooks won't miss an signal. Blocking a task might lose some signals, if they are not "consumed" fast enough.
For instance, if one task A produces a signal every 2 seconds, and a task B waits for them, but takes 3 seconds to process each of them, some of them will be lost: they will occur when B processes the previous one, and doesn't wait for any signal.
Therefore, if it is important not to lose any occurrence of a signal, a waiting task is not an adequate solution: either handle them in a hook, or pass them through a pipe.
sched.sigRun('FOO', 'BAR', function(event, arg)
print(">>> sigRun FOO.BAR received event " .. event .. ", arg " .. arg)
end
sched.sigRunOnce('FOO', 'BAR', function(event, arg)
print(">>> sigRunOnce FOO.BAR received event " .. event .. ", arg " .. arg)
end
sched.sigRun('FOO', '*', function(event, arg)
print(">>> sigRun FOO.* received event " .. event .. ", arg " .. arg)
end
sched.sigRunOnce('FOO', '*', function(event, arg)
print(">>> sigRunOnce FOO.* received event " .. event .. ", arg " .. arg)
end
sched.signal('FOO', 'GNAT', 1)
>>> sigRun FOO.* received event GNAT, arg 1
>>> sigRunOnce FOO.* received event GNAT, arg 1
sched.signal('FOO', 'BAR', 2) -- sigRunOnce FOO.* now detached
>>> sigRun FOO.BAR received event FOO, arg 2
>>> sigRunOnce FOO.BAR received event FOO, arg 2
>>> sigRun FOO.* received event FOO, arg 2
sched.signal('FOO', 'BAR', 3) -- sigRunOnce FOO.BAR now detached
>>> sigRun FOO.BAR received event FOO, arg 3
>>> sigRun FOO.* received event FOO, arg 2
sched.signal('GNAT', 'BAR', 2) -- wrong emitter, no hook triggered
Tasks are created by #sched.run: it takes a function, and
optionally arguments to this function, and runs it as a separate,
parallel task. It also returns the created task, as a regular
coroutine. The main use for this is to either cancel it
with #sched.kill, or wait for its termination with
sched.wait(task,'die')
.
Tasks are sorted in an internal table:
blocked tasks are indexed, in __tasks.waiting
, by the emitters and
events which might unblock them;
tasks which are ready to run are stacked in a __tasks.ready
list.
A task created with #sched.run doesn't start immediately, it is
merely stacked in __tasks.ready
. When the current task dies or
blocks, the next one in __tasks.ready
takes over; it can be the last
created one, or another one which went ready before it.
The termination of a task is advertised by a 'die'
signal. By
waiting for this signal, one can synchornize on a task's completion.
The 'die'
signal has additional arguments: a status (true
if the
task returned successfully, false
if it has been aborted by an
error, "killed"
if the task has been cancelled by #sched.kill),
followed by either the returned value(s) or the error message. Here
are some examples:
-- Common case, the task terminates successfully:
task = sched.run(function()
for i=1,3 do
print ">>> plop"
wait (2)
end
return "I'm done plopping."
end)
sched.sigHook(task, 'die', function(event, ...)
print(">>> He's dead, Jim. His last words were: "..sprint{...})
end)
>>> plop
>>> plop
>>> plop
>>> He's dead, Jim. His last words were: { true, "I'm done plopping" }
-- If the task is interrupted with sched.kill(task):
>>> plop
sched.kill(task)
>>> He's dead, Jim. His last words were: { "killed" }
-- If task dies with an error:
task = sched.run(function()
for i=1,3 do
print ">>> plop"
wait (2)
end
error "Aaargh!"
end)
sched.sigHook(task, 'die', function(event, ...)
print(">>> He's dead, Jim. His last words were: "..sprint{...})
end)
>>> plop
>>> plop
>>> plop
>>> He's dead, Jim. His last words were: { false, "Aaargh!" }
Finally, there is the issue of launching the scheduler at the
top-level. This issue is OS-dependent, but on POSIX-like systems, this
is done by calling sched.loop
: this function will run every task in
a loop, and perform timer management in order to avoid busy waits. It
never returns, unless you call os.exit
. Therefore, before starting
the loop, it is important to have prepared some tasks to launch, with
#sched.run.
As an example, here is a complete program which starts a telnet server and writes the time in parallel, in a file, every 10 seconds.
Notice that shell.telnet.init
creates a listening socket, which will
launch a new telnet client for each connection request, and each of
these clients will run concurrently with other tasks.
require 'sched'
require 'shell.telnet'
sched.run (function()
shell.telnet.init{
address = '0.0.0.0',
port = 2000,
editmode = "edit",
historysize = 100 }
end)
sched.run(function()
local f = io.open('/tmp/time.txt', 'w')
while true do
f :write(os.date(), '\n')
f :flush()
wait(10)
end
end)
sched.loop()
sched
sched.gc() | Does a full Garbage Collect and removes dead tasks from waiting lists. |
sched.kill(x) | Kills a task. |
sched.killSelf() | Kills the current task. |
sched.multiWait(emitters, events) | Waits on several emitters. |
sched.run(f) | Runs a function as a new thread. |
sched.sigHook(emitter, events, f, varargs) | Hooks a callback function to a set of signals. |
sched.sigOnce(emitter, events, f, varargs) | Hooks a callback function to a set of signals, to be triggered only once. |
sched.sigRun(emitters, events, f, varargs) | Hooks a callback function to a set of signals. |
sched.sigRunOnce(emitters, events, f, varargs) | Hooks a callback function to a set of signals. |
sched.signal(emitter, event) | Sends a signal from emitter with message event , plus optional extra args. |
sched.step() | Runs all the tasks which are ready to run, until every task is blocked, waiting for an event (i.e. |
sched.wait(emitter, varargs) | Forces the currently running task out of scheduling until a certain signal is received. |
sched
sched.gc()
There's usually no need to call this function explicitly, it will be triggered automatically when it's needed and the scheduler is about to go idle.
memory available (in number of bytes) after gc.
sched.kill(x)
Implementation principle: the task is killed by either
KILL_TOKEN
error if it is currently runningKILL_TOKEN
as an argument,
which in turn makes #sched.wait send a KILL_TOKEN
error.
x
:
task to kill, as returned by #sched.sigHook for example.
nil
if it killed another task,
sched.killSelf()
never returns, since the task is killed.
sched.multiWait(emitters, events)
Same as #sched.wait, except that:
emitter, event, args...
instead of just event, args...
;
indeed, the caller might want to know which emitter rescheduled it;
emitters
:
table containing a list of the emitters to wait on
events
:
table containing a list of the events to wait on, or a string
describing an event's kind, or a number defining timeout for this call.
emitter, event, args that caused this call to end.
sched.run(f)
sched.run(f[, args...])
schedules f(args...)
as a new task, which will
run in parallel with the current one, after the current one yields.
f
:
function or task to run
new thread created to run the function
sched.sigHook(emitter, events, f, varargs)
The callback is called synchronously as soon as the corresponding
signal is emitted, and every time it is emittes. That is, the function
f
will have returned before the call to #sched.signal which triggered
it returns.
This puts a constraint on f
: it must not block and try to reschedule itself.
If a hook function calls a blocking API, it will trigger an error.
The hook will receive as arguments the triggering event, and any extra params passed along with the signal.
emitter
:
list of signal emitters to watch or a string describing
single emitter to watch
events
:
events to watch from the emitters: a table containing a list
of the events to wait on, a string discribing an event's kind,
or a number defining timeout for this call.
f
:
function to be used as hook
varargs
:
extra optional params to be given to hook when called
registered hook, which can be passed to #sched.kill to cancel the registration.
sched.sigOnce(emitter, events, f, varargs)
emitter
:
a list of signal emitters to watch or a string describing
a single emitter to watch
events
:
events to watch from the emitters: a table containing a list
of the events to wait on, a string describing an event's kind,
or a number defining timeout for this call.
f
:
function to be used as hook
varargs
:
extra optional params to be given to hook when called
registred hook
sched.sigRun(emitters, events, f, varargs)
This function has the same API as #sched.sigHook, except that f
runs
in a separate thread. The consequences are:
f
is not run immediately, but after the task which called the triggering
signal rescheduled;f
is allowed to call blocking APIs.
emitters
:
a list of signal emitters to watch or a string describing
single emitter to watch
events
:
events to watch from the emitters: a table containing a list
of the events to wait on, a string describing an event's kind,
or a number defining timeout for this call.
f
:
function to be used as hook
varargs
:
extra optional params to be given to hook when called
registered hook, which can be passed to #sched.kill to cancel the registration.
sched.sigRunOnce(emitters, events, f, varargs)
This function has the same API and behavior as #sched.sigRun, except that the hook will only be run once: it detaches itself from all of its registrations when it's first triggered.
emitters
:
a list of signal emitters to watch or a string describing
a single emitter to watch
events
:
events to watch from the emitters: a table containing a list
of the events to wait on, a string describing an event's kind,
or a number defining timeout for this call.
f
:
function to be used as hook
varargs
:
extra optional params to be given to hook when called
registered hook, which can be passed to #sched.kill to cancel the registration.
sched.signal(emitter, event)
emitter
with message event
, plus optional extra args.
This means:
emitter.event
signal;once
field.
emitter
:
arbitrary Lua object which sends the signal.
event
:
a string representing the event's kind.
nothing.
sched.step()
__tasks.ready
is empty).
This function is used by the scheduler's top-level loop, but shouldn't be called explicitly by users.
sched.wait(emitter, varargs)
wait(x, "open")
is the same as
wait({x}, {"open"})
If no emitter or no event is listed, the task waits for nothing in particular, i.e. it puts itself back at the end of the scheduling queue, thus giving other threads a chance to run.
There must be a task currently running, i.e. __tasks.running ~= nil.
Cf. description of __tasks.waiting for a description of how tasks are put to wait on signals.
emitter
:
table listing emitters to wait on. Can also be string to define
single emitter, or a number to specify a timeout (wait used as sleep function)
varargs
:
optional varargs: can be events list in a table, or several events in
several arguments. Last event can be a number to specify a timeout call.
the event and extra parameters of the signal which unlocked the task.
--reschedules current task, giving other tasks a chance to run:
sched.wait()
-- blocks the current task for `timeout` seconds:
sched.wait(timeout)
-- waits until `emitter` signals this `event`:
sched.wait(emitter, event)
-- waits until `emitter` signals any event:
sched.wait(emitter, "*")
-- waits until `emitter` signals any one of the `eventN` in the list:
sched.wait(emitter, {event1, event2, ...})
-- waits until `emitter` signals any one of the `eventN` in the list,
-- or `timeout` seconds have elapsed, whichever occurs first:
sched.wait(emitter, {event1, event2, ...timeout})
-- admissible shorcut for `sched.wait(emitter, {event1, event2...}):
sched.wait(emitter, event1, event2, ...)