Skip to main content

Reactr

The next-gen function scheduler for Go and WebAssembly

Reactr Logo

The Basics

First, install Reactr's core package rt:

go get github.com/suborbital/reactr/rt

And then get started by defining something Runnable:

package main

import (
"fmt"
"github.com/suborbital/reactr/rt"
)

type generic struct{}

// Run runs a generic job
func (g generic) Run(job rt.Job, ctx *rt.Ctx) (interface{}, error) {
fmt.Println("doing job:", job.String()) // get the string value of the job's data

// do your work here

return fmt.Sprintf("finished %s", job.String()), nil
}

// OnChange is called when Reactr starts or stops a worker to handle jobs,
// and allows the Runnable to set up before receiving jobs or tear down if needed.
func (g generic) OnChange(change rt.ChangeEvent) error {
return nil
}

A Runnable is something that can take care of a job - all it needs to do is conform to the Runnable interface as you see above.

Once you have a Runnable, create a Reactr instance, register it, and Do some work:

package main

import (
"fmt"
"log"

"github.com/suborbital/reactr/rt"
)

func main() {
r := rt.New()

r.Register("generic", generic{})

result := r.Do(r.Job("generic", "hard work"))

res, err := result.Then()
if err != nil {
log.Fatal(err)
}

fmt.Println("done!", res.(string))
}

When you Do some work, you get a Result. A result is like a Rust future or a JavaScript promise; it is something you can get the job's result from once it is finished.

Calling Then() will block until the job is complete, and then give you the return value from the Runnable's Run. Cool, right?

Runnables pt. 2

There are some more complicated things you can do with Runnables:

type recursive struct{}

// Run runs a recursive job
func (r recursive) Run(job rt.Job, ctx *rt.Ctx) (interface{}, error) {
fmt.Println("doing job:", job.String())

if job.String() == "first" {
return ctx.Do(rt.NewJob("recursive", "second")), nil
} else if job.String() == "second" {
return ctx.Do(rt.NewJob("recursive", "last")), nil
}

return fmt.Sprintf("finished %s", job.String()), nil
}

func (r recursive) OnChange(change rt.ChangeEvent) error {
return nil
}

The rt.Ctx you see there is the job context, and one of the things it can do is run more things!

Calling ctx.Do will schedule another job to be executed and give you a Result. If you return a Result from Run, then the caller will recursively receive that Result when they call Then()!

For example:

r := r.Do(r.Job("recursive", "first"))

res, err := r.Then()
if err != nil {
log.Fatal(err)
}

fmt.Println("done!", res.(string))

Will cause this output:

doing job: first
doing job: second
doing job: last
done! finished last

The ability to chain jobs is quite powerful!

You won't always need or care about a job's output, and in those cases, make sure to call Discard() on the result to allow the underlying resources to be deallocated!

r.Do(r.Job("recursive", "first")).Discard()

To do something asynchronously with the Result once it completes, call ThenDo on the result:

r.Do(r.Job("generic", "first")).ThenDo(func(res interface{}, err error) {
if err != nil {
// do something with the error
}

//do something with the result
})

ThenDo will return immediately, and provided callback will be run on a background goroutine. This is useful for handling results that don't need to be consumed by your main program execution.

Groups

A reactr Group is a set of Results that belong together. If you're familiar with Go's errgroup.Group{}, it is similar. Adding results to a group will allow you to evaluate them all together at a later time.

grp := rt.NewGroup()

grp.Add(ctx.Do(rt.NewJob("recursive", "first")))
grp.Add(ctx.Do(rt.NewJob("generic", "group work")))
grp.Add(ctx.Do(rt.NewJob("generic", "group work")))

if err := grp.Wait(); err != nil {
log.Fatal(err)
}

Will print:

doing job: first
doing job: group work
doing job: group work
doing job: second
doing job: last

As you can see, the "recursive" jobs from the generic runner get queued up after the two jobs that don't recurse.

Note that you cannot get result values from result groups, the error returned from Wait() will be the first error from any of the results in the group, if any. To get result values from a group of jobs, put them in an array and call Then on them individually.

TIP If you return a group from a Runnable's Run, calling Then() on the result will recursively call Wait() on the group and return the error to the original caller! You can easily chain jobs and job groups in various orders.

Pools

Each Runnable that you register is given a worker to process their jobs. By default, each worker has one work thread processing jobs in sequence. If you want a particular worker to process more than one job concurrently, you can increase its PoolSize:

doGeneric := r.Register("generic", generic{}, rt.PoolSize(3))

grp := rt.NewGroup()
grp.Add(doGeneric("first"))
grp.Add(doGeneric("second"))
grp.Add(doGeneric("random"))

if err := grp.Wait(); err != nil {
log.Fatal(err)
}

Passing PoolSize(3) will spawn three work threads to process generic jobs.

Autoscaling pools

By default, defining a pool size causes a static number of work threads to be started and will continue to run for the duration of the program's lifetime. If you have more variable workloads and need to scale your compute up and down to compensate, Reactr can handle that with the Autoscale option:

doGeneric := r.Register("generic", generic{}, rt.Autoscale(0))

for i := 0; i < 10000; i++ {
doGeneric("lots to do").Discard()
}

By passing the rt.Autoscale option, we indicate to Reactr that the worker should create and destroy threads as needed to handle the amount of work to be done. The parameter passed to Autoscale indicates the maximum number of threads. If you pass 0, it will default to the number of available CPUs.

Timeouts

By default, if a job becomes stuck and is blocking execution, it will block forever. If you want to have a worker time out after a certain amount of seconds on a stuck job, pass rt.TimeoutSeconds to Handle:

h := rt.New()

doTimeout := r.Register("timeout", timeoutRunner{}, rt.TimeoutSeconds(3))

When TimeoutSeconds is set and a job executes for longer than the provided number of seconds, the worker will move on to the next job and ErrJobTimeout will be returned to the Result. The failed job will continue to execute in the background, but its result will be discarded.

Schedules

The r.Do method will run your job immediately, but if you need to run a job at a later time, at a regular interval, or on some other schedule, then the Schedule interface will help. The Schedule interface allows for an object to choose when to execute a job. Any object that conforms to the interface can be used as a Schedule:

// Schedule is a type that returns an *optional* job if there is something that should be scheduled.
// Reactr will poll the Check() method at regular intervals to see if work is available.
type Schedule interface {
Check() *Job
Done() bool
}

The r.Schedule method will allow you to register a Schedule, and there are two built-in schedules(Every and After) to help:

r := rt.New()

r.Register("worker", &workerRunner{})

// runs every hour
r.Schedule(rt.Every(60*60, func() Job {
return NewJob("worker", nil)
}))

Reactr will poll all registered Schedules at a 1 second interval to Check for new jobs. Schedules can end their own execution by returning false from the Done method. You can use the Schedules provided with Reactr or develop your own.

Scheduled jobs' results are discarded automatically using Discard()

Advanced Runnables

The Runnable interface defines an OnChange function which gives the Runnable a chance to prepare itself for changes to the worker running it. For example, when a Runnable is registered with a pool size greater than 1, the Runnable may need to provision resources for itself to enable handling jobs concurrently, and OnChange will be called once each time a new worker starts up. Our Wasm implementation is a good example of this.

Most Runnables can return nil from this function, however returning an error will cause the worker start to be paused and retried until the required pool size has been achieved. The number of seconds between retries (default 3) and the maximum number of retries (default 5) can be configured when registering a Runnable:

doBad := r.Register("badRunner", badRunner{}, rt.RetrySeconds(1), rt.MaxRetries(10))

Any error from a failed worker will be returned to the first job that is attempted for that Runnable.

Pre-warming

When a Runnable is mounted, it is simply registered as available to receive work. The Runnable is not actually invoked until the first job of the given type is received. For basic Runnables, this is normally fine, but for Runnables who use the OnChange method to provision resources, this can cause the first job to be slow. The PreWarm option is available to allow Runnables to be started as soon as they are mounted, rather than waiting for the first job. This mitigates cold-starts when anything expensive is needed at startup.

doExpensive := r.Register("expensive", expensiveRunnable{}, rt.PreWarm())

Shortcuts

There are also some shortcuts to make working with Reactr a bit easier:

type input struct {
First, Second int
}

type math struct{}

// Run runs a math job
func (g math) Run(job rt.Job, ctx *rt.Ctx) (interface{}, error) {
in := job.Data().(input)

return in.First + in.Second, nil
}
doMath := r.Register("math", math{})

for i := 1; i < 10; i++ {
equals, _ := doMath(input{i, i * 3}).ThenInt()
fmt.Println("result", equals)
}

The Register function returns an optional helper function. Instead of passing a job name and full Job into r.Do, you can use the helper function to instead just pass the input data for the job, and you receive a Result as normal. doMath!

Additional features

Reactr can integrate with Grav, which is the decentralized message bus developed as part of the Suborbital Development Platform. Read about the integration on the grav documentation page.

Reactr provides the building blocks for scalable asynchronous systems. This should be everything you need to help you improve the performance of your application. When you are looking to take advantage of Reactr's other features, check out its Wasm capabilities!