Module sched

Sched is a Lua collaborative scheduler: it allows several Lua tasks to run in parallel, and to communicate together when they need to interact.

It offers a convinient way to write programs which address multiple I/O driven issues simultaneously, with much less hassle than with preemptive multithreading frameworks; it also doesn't require developers to adopt unusual programming styles, as expected by Erlang, map-reduce variants, or callback-driven frameworks such as node.js. Among other appropriate usages, it allows to easily write and deploy the applications typically powering machine-to-machine infrastructures.

General principles

Vocabulary

  • processes: concurrent fragments of programs, each having exclusive access to its own memory space.

  • threads: concurrent fragments of programs, sharing the same memory space, and which therefore need to synchronize their memory operations.

  • tasks: concurrent fragments of programs of any sort. Processes and threads are particular kinds of tasks.

Collaborative multi-tasking

"Collaborative" implies that the currently running task only changes when it calls a blocking function, i.e. a function which has to wait for an external event. For instance, an attempt to read on an empty socket will block until more data arrives from the network. If the current task tries to read from an empty socket, it will be unscheduled, and will let other tasks run, until more network data arrive. Most collaborative multi-tasking systems, including sched, cannot leverage multi-core architectures.

Collaborative multi-tasking has one big advantage: it considerably reduces the concurrency issues. Since there are few places where the running task can change, there are much fewer occasions for race conditions, deadlocks, and other concurrency-specific problems. Programmers used to develop preemptively multithreaded applications will be delighted to see how uneventful to debug collaborative concurrent systems are.

To quote the authors of Lua, "we did not (and still do not) believe in the standard multithreading model, which is preemptive concurrency with shared memory: we still think that no one can write correct programs in a language where a=a+1 is not deterministic" (pdf). Hence, Lua coroutines share their memory, but give up non-deterministic preemptive scheduling.

Alternatives

Other languages make the complementary choice: for instance, Erlang retains preemptive concurrency, but forbids shared memory; although such languages allow to reach unmatched levels of reliability (pdf), they deeply change the way programmers have to model their programs. They are therefore less suitable to a generalist audience.

It should be noted that Unix' original design philosophy (many small and short-lived specialized processes, with separated memories, which communicate through file descriptors and IPC), also restrains memory sharing to keep maintainable preemptive scheduling.

Finally, some process oriented (separate memory and message-passing) multi-tasking systems are available in Lua, most notably Luaproc (pdf). Although such systems make more sense for computation-intensive jobs on multi-core and distributed architectures, it is possible to make it cohabit with sched's multitasking (having several Luaproc processes, each running several sched threads).

Collaborative limitation

Collaborative multi-tasking comes with a limitation, compared to the preemptive variant. Greedy tasks, which never pause nor perform any blocking API call, might monopolize the CPU indefinitely. Although it rarely happen unless on purpose, it means that collaborative schedulers are unsuitable for real-time systems. If such real-time needs occur in a sched-based application, they must be addressed in C in a separate process, and the underlying OS must offer the suitable real-time performances.

Another perceived issue is that a rogue task can crash all other tasks in the scheduler, but that's true of any pool of tasks sharing their memory, including systems like pthreads.

Sched principles

Communication between tasks

Sched offers a fundamental communication mechanism, called the signal, over which other mechanisms (mutexes, fifos, synchronized program sections etc.) can be built. You will often find that signals are the simplest and most suitable way to coordinate tasks together, although the classic POSIX-like IPC systems are always available when needed or preferred.

A signal is composed of:

  • an arbitrary emitter object;
  • an event: a string describing what noteworthy event happened to the emitter;
  • optional arguments, which complete the description of the event's circumstances.

So every object in Lua can emit signals, and emitted signals can trigger reactions:

  • a signal can wake up a task which was waiting for it;
  • a signal can trigger the execution of a hook, i.e. a function which is run every time a registered signal is detected.

Objects are encouraged to emit signals every time an event of interest happens to them, so that other tasks can react to these events. For instance, the modem module emits a signal every time a SMS arrives; the NTP time synchronizer advertises clock changes through signals; IP interfaces signal important events such as going up or going down; every task emits a 'die' signal when it exits, allowing other tasks to monitor their termination; etc. Many complex systems, such as the telnet shell, the Agent initialization process, or TCP data handling, are internally synchronized through signals.

Blocking tasks on signals with sched.wait

A wait or a hook can be registered to many signals simultaneously, and conversely, a single signal can trigger many hooks and task wake-ups.

Most waits and hooks wait for signals from only one emitter. Consider the following example:

 local event, status, result = sched.wait(some_task, 'die')

It will block the current task until the task some_task terminates. sched.wait returns the triggering event (here always 'die'), and extra signal arguments. In the case of task 'die' events, those are the status (whether the task exited successfully or because of an error), and any result returned by the function (or an error message if status is false).

A task can wait for several events from a single emitter. For instance, the following statement will wait until the the network manager either succeeds or fails at mounting an IP interface:

 sched.wait('NETMAN', {'MOUNTED', 'MOUNT_FAILED'})

If a number is added in the events list, it's a timeout in seconds: if none of the subscribed events occur before the timeout elapses, then sched.wait returns with the event 'timeout':

 local event = sched.wait('NETMAN', {'MOUNTED', 'MOUNT_FAILED', 30})
 if     event == 'MOUNTED' then print "Success!"
 elseif event == 'MOUNT_FAILED' then print "Failure!"
 elseif event == 'timeout' then print "Took more than 30s to mount!"
 else assert(false, "This cannot happen") end

One can also subscribe to every events sent by a given emitter, by subscribing to the special '*' string:

 local event = sched.wait('NETMAN', '*')
 if     event == 'MOUNTED' then print "Success!"
 elseif event == 'MOUNT_FAILED' then print "Failure!"
 else   print("Ignore event "..event) end

Special '*' event can be combined with a timeout, as in sched.wait(X, {'*', 30}).

A task might need to wait on signals sent by several potential emitters. This need is addressed by the sched.multiWait API, which works as sched.wait except that it takes a list of emitters rather than a single one.

Finally, a task can reschedule itself without blocking. It gives other tasks an opportunity to run; once every other task had a chance to run, the rescheduled task can restart. Task rescheduling is performed by calling sched.wait without argument.

for i=1, BIG_NUMBER do
    perform_long_computation_chunk(i)
    sched.wait()
end

Attaching hooks to signals

Hooks can be registered the same way tasks can be blocked: they are attached to a signal emitter, and to one, several or all ('*') events. A hook has a function, which is executed when one of the registered signal is registered. The variants of hook attachment are:

  • sched.sigHook(emitter, events, func, hook_args...): the function func(event, hook_args..., signal_args...) will be run every time sched.signal(emitter, event, signal_args...) is triggered. It is run synchronously, i.e. before sched.signal() returns, so it can't contain any blocking API call. It will keep being triggered every time one of the registered signal occurs, until the reference returned by sched.sigHook is passed to sched.kill.

  • sched.sigOnce(emitter, events, func, hook_args...): behaves as sched.sigHook, except that it's only run once. The hook function is also forbidden from performing blocking calls.

  • sched.sigRun(emitter, events, func, hook_args...): works as sched.sigHook, except that the function is run as a scheduled task. As a result, there is no guarantee that it will be executed as soon as the signal has been sent (there might be a delay), but it is allowed to call blocking APIs. For most usages, this form is to be preferred as simpler.

  • sched.sigRunOnce(emitter, events, func, hook_args...): behaves as sched.sigRun, except that it's only run once. The hook function is also allowed to perform blocking calls.

There is a guarantee that hooks won't miss an signal. Blocking a task might lose some signals, if they are not "consumed" fast enough.

For instance, if one task A produces a signal every 2 seconds, and a task B waits for them, but takes 3 seconds to process each of them, some of them will be lost: they will occur when B processes the previous one, and doesn't wait for any signal.

Therefore, if it is important not to lose any occurrence of a signal, a waiting task is not an adequate solution: either handle them in a hook, or pass them through a pipe.

Example

sched.sigRun('FOO', 'BAR', function(event, arg)
    print(">>> sigRun FOO.BAR received event " .. event .. ", arg " .. arg)
end

sched.sigRunOnce('FOO', 'BAR', function(event, arg)
    print(">>> sigRunOnce FOO.BAR received event " .. event .. ", arg " .. arg)
end

sched.sigRun('FOO', '*', function(event, arg)
    print(">>> sigRun FOO.* received event " .. event .. ", arg " .. arg)
end

sched.sigRunOnce('FOO', '*', function(event, arg)
    print(">>> sigRunOnce FOO.* received event " .. event .. ", arg " .. arg)
end

sched.signal('FOO', 'GNAT', 1)
>>> sigRun FOO.* received event GNAT, arg 1
>>> sigRunOnce FOO.* received event GNAT, arg 1

sched.signal('FOO', 'BAR', 2) -- sigRunOnce FOO.* now detached
>>> sigRun FOO.BAR received event FOO, arg 2
>>> sigRunOnce FOO.BAR received event FOO, arg 2
>>> sigRun FOO.* received event FOO, arg 2

sched.signal('FOO', 'BAR', 3) -- sigRunOnce FOO.BAR now detached
>>> sigRun FOO.BAR received event FOO, arg 3
>>> sigRun FOO.* received event FOO, arg 2

sched.signal('GNAT', 'BAR', 2) -- wrong emitter, no hook triggered

Tasks life cycle

Tasks are created by sched.run: it takes a function, and optionally arguments to this function, and runs it as a separate, parallel task. It also returns the created task, as a regular coroutine. The main use for this is to either cancel it with sched.kill, or wait for its termination with sched.wait(task,'die').

Tasks are sorted in an internal table:

  • blocked tasks are indexed, in __tasks.waiting, by the emitters and events which might unblock them;

  • tasks which are ready to run are stacked in a __tasks.ready list.

A task created with sched.run doesn't start immediately, it is merely stacked in __tasks.ready. When the current task dies or blocks, the next one in __tasks.ready takes over; it can be the last created one, or another one which went ready before it.

The termination of a task is advertised by a 'die' signal. By waiting for this signal, one can synchornize on a task's completion. The 'die' signal has additional arguments: a status (true if the task returned successfully, false if it has been aborted by an error, "killed" if the task has been cancelled by sched.kill), followed by either the returned value(s) or the error message. Here are some examples:

-- Common case, the task terminates successfully:
task = sched.run(function()
    for i=1,3 do
       print ">>> plop"
       wait (2)
    end
    return "I'm done plopping."
end)
sched.sigHook(task, 'die', function(event, ...)
    print(">>> He's dead, Jim. His last words were: "..sprint{...})
end)
>>> plop
>>> plop
>>> plop
>>> He's dead, Jim. His last words were: { true, "I'm done plopping" }

-- If the task is interrupted with sched.kill(task):
>>> plop
sched.kill(task)
>>> He's dead, Jim. His last words were: { "killed" }

-- If task dies with an error:
task = sched.run(function()
    for i=1,3 do
       print ">>> plop"
       wait (2)
    end
    error "Aaargh!"
end)
sched.sigHook(task, 'die', function(event, ...)
    print(">>> He's dead, Jim. His last words were: "..sprint{...})
end)
>>> plop
>>> plop
>>> plop
>>> He's dead, Jim. His last words were: { false, "Aaargh!" }

Finally, there is the issue of launching the scheduler at the top-level. This issue is OS-dependent, but on POSIX-like systems, this is done by calling sched.loop: this function will run every task in a loop, and perform timer management in order to avoid busy waits. It never returns, unless you call os.exit. Therefore, before starting the loop, it is important to have prepared some tasks to launch, with sched.run.

As an example, here is a complete program which starts a telnet server and writes the time in parallel, in a file, every 10 seconds.

Notice that shell.telnet.init creates a listening socket, which will launch a new telnet client for each connection request, and each of these clients will run concurrently with other tasks.

require 'sched'
require 'shell.telnet'

sched.run (function()
    shell.telnet.init{
        address     = '0.0.0.0',
        port        = 2000,
        editmode    = "edit",
        historysize = 100 }
end)

sched.run(function()
    local f = io.open('/tmp/time.txt', 'w')
    while true do
        f :write(os.date(), '\n')
        f :flush()
        wait(10)
    end
end)

sched.loop()

Type sched

sched.gc()

Does a full Garbage Collect and removes dead tasks from waiting lists.

sched.kill(x)

Kills a task.

sched.killSelf()

Kills the current task.

sched.multiWait(emitters, events)

Waits on several emitters.

sched.run(f, ...)

Runs a function as a new thread.

sched.sigHook(emitter, events, f, varargs)

Hooks a callback function to a set of signals.

sched.sigOnce(emitter, events, f, varargs)

Hooks a callback function to a set of signals, to be triggered only once.

sched.sigRun(emitter, events, f, varargs)

Hooks a callback function to a set of signals.

sched.sigRunOnce(emitters, events, f, varargs)

Hooks a callback function to a set of signals.

sched.signal(emitter, event, ...)

Sends a signal from emitter with message event, plus optional extra args.

sched.step()

Runs all the tasks which are ready to run, until every task is blocked, waiting for an event (i.e.

sched.wait(emitter, varargs)

Forces the currently running task out of scheduling until a certain signal is received.

Type sched

Field(s)

sched.gc()

Does a full Garbage Collect and removes dead tasks from waiting lists.

Dead tasks are removed when the expected event happens or when the expected event emitter dies. If that never occurs, and you still want to claim the memory associated with these dead tasks, you can always call this function and it will remove them.

There's usually no need to call this function explicitly, it will be triggered automatically when it's needed and the scheduler is about to go idle.

Return value

memory available (in number of bytes) after gc.

sched.kill(x)

Kills a task.

Implementation principle: the task is killed by either

  • making it send a KILL_TOKEN error if it is currently running
  • waking it up from a sched.wait yielding with KILL_TOKEN as an argument, which in turn makes sched.wait send a KILL_TOKEN error.

Parameter

Return values

  1. nil if it killed another task,

  2. never returns if it killed the calling task.

sched.killSelf()

Kills the current task.

Return value

never returns, since the task is killed.

sched.multiWait(emitters, events)

Waits on several emitters.

Same as sched.wait, except that:

  • the first argument is a list of emitters rather than a single emitter;
  • it returns emitter, event, args... instead of just event, args...; indeed, the caller might want to know which emitter rescheduled it;
  • it's illegal not to enclose events in a list.

Parameters

  • emitters : table containing a list of the emitters to wait on

  • events : table containing a list of the events to wait on, or a string describing an event's kind, or a number defining timeout for this call.

Return value

emitter, event, args that caused this call to end.

sched.run(f, ...)

Runs a function as a new thread.

sched.run(f[, args...]) schedules f(args...) as a new task, which will run in parallel with the current one, after the current one yields.

Parameters

  • f : function or task to run

  • ... : optional arguments to pass to param f

Return value

new thread created to run the function

sched.sigHook(emitter, events, f, varargs)

Hooks a callback function to a set of signals.

Signals are described as for sched.wait. See this function for more details.

The callback is called synchronously as soon as the corresponding signal is emitted, and every time it is emittes. That is, the function f will have returned before the call to sched.signal which triggered it returns.

This puts a constraint on f: it must not block and try to reschedule itself. If a hook function calls a blocking API, it will trigger an error.

The hook will receive as arguments the triggering event, and any extra params passed along with the signal.

Parameters

  • emitter : list of signal emitters to watch or a string describing single emitter to watch

  • events : events to watch from the emitters: a table containing a list of the events to wait on, a string discribing an event's kind, or a number defining timeout for this call.

  • f : function to be used as hook

  • varargs : extra optional params to be given to hook when called

Return value

registered hook, which can be passed to sched.kill to cancel the registration.

sched.sigOnce(emitter, events, f, varargs)

Hooks a callback function to a set of signals, to be triggered only once.

This function has the same API and behavior as sched.sigHook, except that the hook will only be run once: it detaches itself from all of its registrations when it's first triggered.

Parameters

  • emitter : a list of signal emitters to watch or a string describing a single emitter to watch

  • events : events to watch from the emitters: a table containing a list of the events to wait on, a string describing an event's kind, or a number defining timeout for this call.

  • f : function to be used as hook

  • varargs : extra optional params to be given to hook when called

Return value

registred hook

sched.sigRun(emitter, events, f, varargs)

Hooks a callback function to a set of signals.

This function has the same API as sched.sigHook, except that f runs in a separate thread. The consequences are:

  • f is not run immediately, but after the task which called the triggering signal rescheduled;
  • f is allowed to call blocking APIs.

Parameters

  • emitter : a signal emitter to watch

  • events : events to watch from the emitters: a table containing a list of the events to wait on, a string describing an event's kind, or a number defining timeout for this call.

  • f : function to be used as hook

  • varargs : extra optional params to be given to hook when called

Return value

registered hook, which can be passed to sched.kill to cancel the registration.

sched.sigRunOnce(emitters, events, f, varargs)

Hooks a callback function to a set of signals.

The hook will be called in a new thread (allowing the hook to block), and only one time.

This function has the same API and behavior as sched.sigRun, except that the hook will only be run once: it detaches itself from all of its registrations when it's first triggered.

Parameters

  • emitters : a signal emitter to watch

  • events : events to watch from the emitters: a table containing a list of the events to wait on, a string describing an event's kind, or a number defining timeout for this call.

  • f : function to be used as hook

  • varargs : extra optional params to be given to hook when called

Return value

registered hook, which can be passed to sched.kill to cancel the registration.

sched.signal(emitter, event, ...)

Sends a signal from emitter with message event, plus optional extra args.

This means:

  • rescheduling every task waiting for this emitter.event signal;
  • immediately running the hooks listening to this signal;
  • reattaching the hook cells which don't have a once field.

Parameters

  • emitter : arbitrary Lua object which sends the signal.

  • event : a string representing the event's kind.

  • ... : extra args passed to the woken up tasks and triggered hooks.

Return value

nothing.

sched.step()

Runs all the tasks which are ready to run, until every task is blocked, waiting for an event (i.e.

until __tasks.ready is empty).

This function is used by the scheduler's top-level loop, but shouldn't be called explicitly by users.

sched.wait(emitter, varargs)

Forces the currently running task out of scheduling until a certain signal is received.

Take a list of emitters, and a list of events. The task will be rescheduled the next time one of the listed events happens to one of the listed emitters. If there is a single emitter (resp. a single event), it can be passed outside of a table, i.e.

 wait(x, "open")

is the same as

 wait({x}, {"open"})

If no emitter or no event is listed, the task waits for nothing in particular, i.e. it puts itself back at the end of the scheduling queue, thus giving other threads a chance to run.

There must be a task currently running, i.e. __tasks.running ~= nil.

Cf. description of __tasks.waiting for a description of how tasks are put to wait on signals.

Parameters

  • emitter : table listing emitters to wait on. Can also be string to define single emitter, or a number to specify a timeout in seconds (wait used as sleep function)

  • varargs : optional varargs: can be events list in a table, or several events in several arguments. Last event can be a number to specify a timeout call.

Return value

the event and extra parameters of the signal which unlocked the task.

Usage:


    --reschedules current task, giving other tasks a chance to run:
    sched.wait()

    -- blocks the current task for `timeout` seconds:
    sched.wait(timeout)

    -- waits until `emitter` signals this `event`:
    sched.wait(emitter, event)

    -- waits until `emitter` signals any event:
    sched.wait(emitter, "*")

    -- waits until `emitter` signals any one of the `eventN` in the list:
    sched.wait(emitter, {event1, event2, ...})

    -- waits until `emitter` signals any one of the `eventN` in the list,
    -- or `timeout` seconds have elapsed, whichever occurs first:
    sched.wait(emitter, {event1, event2, ...timeout})

    -- admissible shorcut for `sched.wait(emitter, {event1, event2...}):
    sched.wait(emitter, event1, event2, ...)