Getting Started

Getting Started

Install

ExtensibleScheduler is a registered package. To add it to your Julia packages, simply do the following in REPL:

Pkg.add("ExtensibleScheduler")

Usage

using ExtensibleScheduler
using TimeFrames
using Dates: now

function print_time_noparam()
    println("From print_time_noparam $(now(Dates.UTC))")
end

function print_time_args(x)
    println("From print_time_args $(now(Dates.UTC)) $x")
end

function print_time_kwargs(; a="default")
    println("From print_time_kwargs $(now(Dates.UTC)) $a")
end

function sample()
    # Use BlockingScheduler with default jobstore, default executor...
    sched = BlockingScheduler()

    # Define what action will be executed
    action = Action(print_time_kwargs; Dict(:a=>"keyword")...)

    # Define when job should be triggered
    trigger = Trigger(DateTime(2017, 12, 17, 20, 8, 0))  # execute one time at given DateTime
    #trigger = Trigger(DateTime(2017, 12, 17, 20, 8, 5))  # execute one time at given DateTime
    #trigger = Trigger(now(Dates.UTC) - Dates.Second(5))  # execute one time at given DateTime (misfire)
    #trigger = Trigger(now(Dates.UTC) + Dates.Second(5))  # execute one time at given DateTime
    #trigger = Trigger(Dates.Time(15, 58), n=1)  # execute one time at given Time (of current day or of next day)
    #trigger = Trigger(Seconds(5), n=1)  # execute one time (5 seconds after being add)
    #trigger = Trigger(tf"5s")  # periodic job ; priority=0 by default
    
    # Add job to jobstore
    add(sched, action, trigger)

    # Run scheduler
    run(sched)
end

sample()

Download example

Library Outline

A generally useful event scheduler module.

The ExtensibleScheduler package provides advanced and extensible Julia events schedulers inspired by Python schedulers APScheduler, schedule and sched.

It's also inspired by Sched.jl, a Julia event scheduler inspired by Python sched.

Schedulers can use real time clock (system time) or simulated time (for simulation purpose).

source

Schedulers

AbstractScheduler is an abstract type for schedulers.

Schedulers are structs which are responsible of running Action at given instants (according a Trigger).

Several kind of schedulers can implement AbstractScheduler.

The most simple scheduler is BlockingScheduler which is monothread.

source
BlockingScheduler(; clock=real_time_clock, delayfunc=_sleep, jobconfig=JobConfig())

BlockingScheduler is the simplest scheduler. It implements AbstractScheduler.

This is a monothread implementation of scheduling job.

Optional arguments

  • clock::AbstractClock: clock that will be used by scheduler (it's by default real_time_clock, which is system UTC time but a SimClock struct can also be passed for simulation purpose).
  • delayfunc::DelayFunc: functor which is responsible (when called) of waiting until next task should be fired (_sleep is used by default but a NoSleep struct can also be passed for simulation purpose).
  • jobconfig::JobConfig: job configuration default settings (misfire_grace_period...)
source
add(sched, action, trigger; name=DEFAULT_JOB_NAME, priority=DEFAULT_PRIORITY)

Schedule when an Action named action should be triggered (according trigger).

source
Base.runMethod.
run(sched)

Run (in a blocking loop) a scheduler named sched.

source
run_pending(sched)

Run pending tasks of a scheduler sched.

This function should be called instead of run when using scheduler in simulation mode.

source
shutdown(sched)

Shutdown scheduler sched.

source

Action

Action(func, args...; kwargs...)

An Action is a structure (a functor in fact) which stores function, arguments and keyword arguments.

An Action can be run (in fact it's run internally by a scheduler when a Job is triggered.)

source
Base.runMethod.
run(action::Action)

Run action.

This function shouldn't be called directly. It's called by scheduler when a job is triggered.

source

JobStore

AbstractJobStore is an abstract type for jobstores

A jobstore is a data structure which is responsible of storing jobs that should be executed later.

source
MemoryJobStore()

MemoryJobStore implements AbstractJobStore.

This is a data structure which is responsible of storing into memory jobs that should be executed later.

source

Triggers

Triggers construction

Trigger(dt::DateTime)

Return an InstantTrigger which should trigger job at a given DateTime dt

source
Trigger(d::Date)

Return an InstantTrigger which should trigger job at a given Date d(at midnight)

source
Trigger(t::Dates.Time[, n=number_of_times])

Return an TimeTrigger which should trigger a job daily at a given time (once, a finite number of times or indefinitely).

source
Trigger(td::Dates.Period[, n=number_of_times])

Return an PeriodTrigger which should trigger a job after a given period (DatePeriod or TimePeriod).

source
Trigger(tf::TimeFrame[, n=number_of_times])

Return an TimeFrameTrigger which should trigger a job at a given instant according timeframe periodicity. (from TimeFrames.jl)

source
Trigger(f::Function[, n=number_of_times])

Return an CustomTrigger which should trigger a job according a function f.

source

Trigger iteration

Iteration can help to know when job should be triggered.

Base.iterateMethod.
iterate(trigger, dt[, n=number_of_times])

Iterate from instant dt using trigger with a given iteration number n if n < 0 (-1 by default), it iterates indefinitely.

Usage

julia> trigger = Trigger(Dates.Time(20, 30))

julia> for dt in iterate(trigger, DateTime(2020, 1, 1), n=3)
         @show dt
       end
dt = 2020-01-01T20:30:00
dt = 2020-01-02T20:30:00
dt = 2020-01-03T20:30:00

julia> collect(iterate(trigger, DateTime(2020, 1, 1), n=3))
3-element Array{Any,1}:
 2020-01-01T20:30:00
 2020-01-02T20:30:00
 2020-01-03T20:30:00
source

Private

InstantTrigger(dt::DateTime)

A trigger which should trigger job at a given instant (a given DateTime for example)

source
TimeTrigger(t::Dates.Time[, n=number_of_times])

A trigger which should trigger a job daily at a given time.

Optional parameter

  • n=1: trigger once
  • n=-1 (default): trigger every day indefinitely
  • n=value: trigger just a number of times
source
PeriodTrigger(t::Dates.Time[, n=number_of_times])

A trigger which should trigger a job after a given period (DatePeriod or TimePeriod)

Optional parameter

  • n=1: trigger once
  • n=-1 (default): trigger every day indefinitely
  • n=value: trigger just a number of times
source
TimeFrameTrigger(tf::TimeFrame)

A trigger which should trigger a job at a given instant according timeframe periodicity (from TimeFrames.jl)

Example

TimeFrameTrigger("H")

should run a job every hour

source
CustomTrigger(f::Function[, n=number_of_times])

A trigger which should trigger a job according a function f.

It's generally a better idea (cleaner implementation) to write your own trigger from AbstractTrigger, AbstractFiniteTrigger or AbstractInfiniteTrigger but passing a function to a CustomTrigger can be quite handy

Job can be triggered:

  • once (n=1)
  • a finite number of times (n=number_of_times)
  • indefinitely (without setting n)

Example

f = (dt_previous_fire, dt_now) -> dt_now + Dates.Minute(5)
trigger = CustomTrigger(f)

should run a job every 5 minutes

source

NoTrigger define a trigger that never trigger.

It's a useful struct for triggers operations such as applying offset or jitter to a trigger.

source

Triggers operations

TriggerOffset(trigger, offset)

or

TriggerOffset(offset)

A trigger operation to shift instant when a job should be triggered (adding an offset)

Addition + and substraction - are implemented so it's possible to define a new trigger using

Trigger("H") + TriggerOffset(Date.Minute(3))

to be able to run a job every hour at 3 minutes after round after.

This is same as:

TriggerOffset(Trigger("H"), Date.Minute(3))
source
TriggerJitter(trigger, offset)

or

TriggerOffset(offset)

A trigger operation that apply jitter to instant when a job should be triggered.

Addition + and substraction - are implemented so it's possible to define a new trigger using

Trigger("H") + TriggerJitter(Date.Minute(3))

to be able to run a job every hour with a random jitter of 3 minutes. This is same as:

TriggerJitter(Trigger("H"), Date.Minute(3))

Randomize next_dt_fire by adding or subtracting a random value (the jitter). If the resulting DateTime is in the past, returns the initial next_dt_fire without jitter.

nextdtfire - jitter <= result <= nextdtfire + jitter

source

Internals

Job

Job(id, action, trigger, name, priority, dt_created, dt_updated, dt_next_fire, n_triggered, config)

A job is an internal structure which store what action should be executed when triggered.

It also store several properties such as priority level, number of time a job is triggered, when will next trigger should occur...

source

Priority

Priority(time_, priority)

Priority of events.

Comparison is first done by time, and after (if same time) using priority value.

As in UNIX, lower priority numbers mean higher priority.

source

See also