1 Synopsis
Sometimes, we need to spawn a process to do some task asynchronously, without blocking the usual flow of an application. Instead of just calling a method of our model, Ubiquo Jobs provides a way to run jobs automatically at a given time, taking the resources (workers) that we give it.
As an example, here’s how we’d create a job to mail a couple of users a happy new year mail.
class Ubiquo::ArticlesController < UbiquoController
def index
...
MassMailer.run_async(:options => {
:recipients => ['one@one.one', 'two@two.two']
:subject => "Happy new year!"
},
:planified_at => DateTime.now.end_of_year,
:name => "New year's mail" ,
:priority => 1000)
end
MassMailer is a subclass of UbiquoJobs::Jobs::Base, that’s placed in app/jobs directory, and overrides do_job_work. do_job_work is the method that will be called when a worker take this job.
2 Creating job types
For every different kind of work that you want to be done you need to create a new job type. This essentially means that you have to:
- Create a subclass of UbiquoJobs::Jobs::Base
- Override and implement the do_job_work and place there the work that will be performed when the worker starts the job.
The following is a very simple job example, that implements this basic schema that every Job must have
# An example job class
# It simply calculates a Array.size, overriding do_job_work
# Passes the required arguments using the options hash
# Ex:
# ExampleJob.run_async(:options => {:set => [1,2]})
# will execute
# [1,2].size
# and store the result. To display the results use job.output_log
class ExampleJob < UbiquoJobs::Jobs::Base
def do_job_work
set_property :result_output, self.options[:set].size
end
end
By default, new job types should be places in the folder app/jobs/.
2.1 Parameters
If your job needs to be parameterized, use the options hash when creating it to set the needed parameters for it. All the parameters that you pass from the caller through the options hash will be retained until the job is run.
MassMailer.run_async(:options => {
:recipients => ['one@one.one', 'two@two.two']
:subject => t('app.mass_mailer.mail_subject')
:type => :one_time_mail
})
You can use virtual attributes to get parameters, but keep in mind that every thing that is not stored inside options will likely be lost at runtime (when a worker executes your job), since for custom parameters only the options hash has the persistence guaranteed.
2.2 Naming and Job scheduling
Jobs can be scheduled to run at a given time. To configure a job for a concrete time, we must pass a parameter to run_async that will indicate the DateTime when this job should wake up. We can also give it a name to make it easier to track when it’s on waiting queue.
MassMailer.run_async(:options => {
:recipients => ['one@one.one', 'two@two.two']
:subject => "Happy new year!"
:type => :one_time_mail
},
:planified_at => DateTime.now.end_of_year,
:name => "New year's mail" )
When a job is scheduled, it’s added to the job queue (normally kept in the database), and we can manage it from the Jobs tab in the superadmin mode page.
In the superadmin page, users (with appropiate rights) can see the progress of jobs, stop them and see the logs.
Here we see an screenshot of a job that is waiting to mail a new year newsletter

2.3 Priority
There are three different priorities that will rule the resource management when there are more jobs than workers.
We can set the priority when defining our jobs or change the priority of a given job from the Jobs list in superadmin view.
To set a different priority from the default (low), we can set our own priority with a :priority parameter, being 1 the highest priority.
Defaults are 10 for high, 100 for medium and 1000 for low priority (default).
2.4 Successful execution
The job manager needs to know if a job was executed successfully in order to set its new status. By default, if a job raises a standard error, it will be considered that its execution failed. Otherwise, if the job is executed without raising any standard error, it will be considered it was executed successfully and will be marked as finished.
The property result_output is where we can put a text that will be displayed in the jobs list telling how did the process go. When a worker takes a job, it puts itself as the runner of the job,
3 ActiveJob
UbiquoJobs::Jobs::ActiveJob is the default implementation of jobs. If you know for sure that you will be using the ActiveJob implementation, you can freely use ActiveModel/ActiveRecord goodies in your job classes, including validations or callbacks.
It’s possible to implement different Job backend, putting them in files, for example.
3.1 Validations
Validations will usually be done only on creation
attr_accessor :parts, :file
validates_presence_of :parts, :file, :on => :create
3.2 Callbacks
You can use the usual lifecycle callbacks too
attr_accessor :parts, :file
after_validation_on_create :store_options
def store_options
self.options = {:parts => parts * 20, :file => 'file://' + file}
end
Take in mind that you will usually just touch jobs on creation time, and then the handling is done automatically. For this reason, if your job needs some kind of parameter parsing, like in the above example, do it on creation to leave the job ready for execution.
4 Shell Jobs
If the job type you want to create will primarily execute a shell command, you will probably want to use the ShellJob helpers to do this a no-brainer. The main difference if you are using ShellJob is that, instead of implementing the do_job_work function, you need to override set_command. This is because what happens in execution time (command execution, and output/error handling) is automatically managed, so you just have to state the concrete command that will be run.
# An example shell job class
# Overrides set_command method using the 'path' virtual attribute
# Ex:
# ExampleShellJob.run_async(:path => '.')
# will execute
# ls .
class ExampleShellJob < UbiquoJobs::Jobs::Base
include UbiquoJobs::Helpers::ShellJob
attr_accessor :path
def set_command
self.command = 'ls ' + path
end
end
As you can see, we can use a virtual attribute here, since set_command is called in creation time.
5 Workers
Now that we know how to create new job types, it’s time to know how to execute them, The process that executes planified jobs is called a worker. Hence, all what is needed here is to start a worker.
Think of a worker as a thread or process that will be given a procedure to run. This procedure is one of the jobs that are in the job queue, so workers are just CPU power units not tied to a given task.
To spawn a worker, there’s a rake task for it.
rake ubiquo:worker:start[name,interval]
Where name stands for a user-friendly name of the worker, and interval is the interval you want the worker to search for new jobs to execute, in seconds.
ZSH users note: http://scottw.com/zsh-rake-parameters
Every worker must have a unique and known id. This is because when a job is started, the Job Manager needs to identify which worker is doing it. In the case that a worker is killed while executing a job, starting a new worker with the same id will alert the Manager that the previous one did not finish the job, and will make this one again available to be executed.
That’s all – The worker will find available jobs (with the planification threshold overcame) and execute them immediately.
To stop all the running workers use:
script/ubiquo_worker --stop
6 Creating your own managers
Sometimes can be useful to create different managers. An example of this situation is when you want to run different kind of jobs in different circumstances.
Ubiquo Jobs provides a default manager which will get ActiveJob jobs depending on priorities and schedule times:
def self.get(runner)
recovery(runner)
candidate_jobs = job_class.all(
:conditions => [
'planified_at <= ? AND state = ?',
Time.now.utc,
UbiquoJobs::Jobs::Base::STATES[:waiting]
],
:order => 'priority asc'
)
job = first_without_dependencies(candidate_jobs)
job.update_attributes({
:state => UbiquoJobs::Jobs::Base::STATES[:instantiated],
:runner => runner
}) if job
job
end
The job_class variable defaults to UbiquoJobs::Jobs::ActiveJob. If you want to make your own manager to handle special jobs, or change the way the jobs are picked, the best way to do so is to implement your own manager. A nice rails-like way to do that is include them in the lib/ folder of your ubiquo project. The class you should inherit from is UbiquoJobs::Managers::ActiveManager. If you wanted the manager to just pick up a specific subclass of ubiquo jobs, it would suffice to reimplement the self.job_class class method to return your own kind of job:
def self.job_class
UbiquoJobs::Jobs::YourJobClass
end
However, there’s a better way to do this. For this special case, the default UbiquoJob class provides a special member which stores the job’s class name, allowing you to select all objects subclasses of ActiveJob by its classname. For example, imagine you have a kind of job for special tasks that you know for sure will take a long time to complete. Seems reasonable to have a different manager to handle those jobs. You would create a new job in the file app/jobs/very_long_job.rb:
class VeryLongJob < UbiquoJobs::Jobs::ActiveJob
def do_job_work
#Do what needs to be done here
return 0
end
end
Then you could create a manager that handles only those kind of jobs by implementing your own subclass of the UbiquoJobs::Managers::ActiveManager class:
module JobManagers
class VeryLongJobManager < UbiquoJobs::Managers::ActiveManager
def self.get(runner)
recovery(runner)
candidate_jobs = job_class.all(
:conditions => [
'planified_at <= ? AND state = ? AND type = ?',
Time.now.utc,
UbiquoJobs::Jobs::Base::STATES[:waiting],
'VeryLongJob'
],
:order => 'priority asc'
)
job = first_without_dependencies(candidate_jobs)
job.update_attributes({
:state => UbiquoJobs::Jobs::Base::STATES[:instantiated],
:runner => runner
}) if job
job
end
end
end
The code is exactly the same as the default ActiveManager class, but the finder will take an extra parameter, 'VeryLongJob', to indicate that only the ActiveJob objects that are of the subclass VerylongJob should be taken.
After that, you need to modify the task that calls the workers so it takes your manager, or create a new task that will run your manager. The default task that will start a worker looks as this:
desc "Starts a new ubiquo worker"
task :start, [:name, :interval] => [:environment] do |t, args|
options = {
:sleep_time => args.interval.to_f
}.delete_if { |k,v| v.blank? }
UbiquoWorker.init(args.name, options)
end
This uses a special configuration parameter to determine the manager to use. This configuration option is stored in Ubiquo::Config.context(:ubiquo_jobs), the name of the configuration option is :job_manager_class, and takes the manager class as a value. So in order to create a task that will use your manager, you should create a new task like this one:
desc "Starts a new ubiquo worker"
task :start_very_long_jobs, [:name, :interval] => [:environment] do |t, args|
options = {
:sleep_time => args.interval.to_f
}.delete_if { |k,v| v.blank? }
Ubiquo::Config.context(:ubiquo_jobs).set(:job_manager_class, JobManagers::VeryLongJobManager)
UbiquoWorker.init(args.name, options)
end
Your should call this task like this (assuming it’s on the same namespace as the default task):
rake ubiquo:worker:start_very_long_jobs[name,interval]