Defining Workflows
Workflows are the main concept inside Venture. A workflow defines both the jobs that belong to it as well as how these jobs are connected with each other. This allows you to build up complex processes from simple building blocks.
Creating workflows
Workflows are defined as classes that extend from AbstractWorkflow
. The AbstractWorkflow
class defines an abstract method definition
that you will have to implement.
To continue with our example of publishing a podcast, let's create a workflow called PublishPodcastWorkflow
.
use App\Podcast;
use Sassnowski\Venture\AbstractWorkflow;
use Sassnowski\Venture\WorkflowDefinition;
class PublishPodcastWorkflow extends AbstractWorkflow
{
public function __construct(private Podcast $podcast)
{
}
public function definition(): WorkflowDefinition
{
return $this->define('Publish new podcast');
}
}
The define
method accepts the name of the workflow and returns an empty WorkflowDefinition
object. Since a workflow without any jobs isn’t very useful, let’s look at how to add jobs next.
Adding jobs to a workflow
To add a job to a workflow, you may call the addJob
method on the definition instance.
$this->define('Publish new podcast')
->addJob(new ProcessPodcast($this->podcast));
Venture provides a fluent interface to define workflows, so you can simply keep chaining method calls to build up your workflow.
$this->define('Publish new podcast')
->addJob(new ProcessPodcast($this->podcast))
->addJob(new OptimizePodcast($this->podcast));
In this example, we have added two jobs to the workflow. Since we haven’t defined any dependencies for these jobs, both of them would be started in parallel when you start the workflow.
Jobs with dependencies
Venture really starts to shine once you start adding jobs that have dependencies.
To define a job's dependencies, you may pass in an array of class names as the second parameter to the addJob
method.
$this->define('Publish new podcast')
->addJob(new ProcessPodcast($this->podcast))
->addJob(new OptimizePodcast($this->podcast))
->addJob(new PublishPodcastOnTransistorFM($this->podcast), [
ProcessPodcast::class,
OptimizePodcast::class
]);
In this example, you're telling Venture that the PublishPodcastOnTransistorFM
job can only run once both ProcessPodcast
and OptimizePodcast
have finished.
The workflow we have configured so far would look like this.
From this point, you can keep adding jobs to the workflow and it will keep track of all dependencies. All you need to do is to define a job's direct dependencies.
$this->define('Publish new podcast')
->addJob(new ProcessPodcast($this->podcast))
->addJob(new OptimizePodcast($this->podcast))
->addJob(new PublishPodcastOnTransistorFM($this->podcast), [
ProcessPodcast::class,
OptimizePodcast::class
])
->addJob(new PublishPodcastOnApplePodcasts($this->podcast), [
ProcessPodcast::class,
OptimizePodcast::class
])
->addJob(new CreateAudioTranscription($this->podcast), [
ProcessPodcast::class,
])
->addJob(new TranslateAudioTranscription($this->podcast), [
CreateAudioTranscription::class,
]);
Direct and transitive dependencies
If we look at the diagram above, TranslateAudioTranscription
has a direct dependency on CreateAudioTranscription
. CreateAudioTranscription
in turn has a dependency on ProccessPodcast
. This makes ProcessPodcast
a transitive dependency of TranslateAudioTranscription
(think dependency of a dependency).
You don't have to worry about how exactly a job fits into a workflow, however. All you need to know is what other jobs a job directly depends on and Venture will figure out the rest.
Multiple instances of the same job
Since you specify the dependencies of a job by using class names, you might be wondering what happens if the workflow contains multiple instances of the same job. To learn how to deal with this situation, check out the section on using multiple instances of the same job.
Naming jobs
If you want, you can provide an optional name for a job that will be saved in the database.
$this->define('Publish new podcast')
->addJob(new ProcessPodcast($this->podcast), [], 'Process podcast');
If no explicit name is provided, the fully qualified class name (FQCN) of the job will be used instead.
You may access a step's name via its $name
property.
$workflow->jobs[0]->name;
// "Process podcast"
Tip
If you plan on displaying the workflow jobs to your users, you could pass in a translation string as the job name. This way, you would be able to display the localized name of the job in the UI by using Laravel's built-in localization features.
Adding closures as jobs
Venture also allows you to add closures to a workflow.
$this->define('My Workflow')
->addJob(function () {
Log::info('Star Wars is fantasy, change my mind');
}, id: 'log-truth');
When adding a Closure to a workflow, you must specify an explicit ID for the job, since there is no class name to fall back on. You can specify the ID of a job by providing the id
parameter of the addJob
method.
You may typehint any dependencies that the Closure needs and they will be resolved out of Laravel’s container when the job gets executed.
$this->define('My Workflow')
->addJob(function (TruthGenerator $truths) {
Log::info($truths->getHotTake());
}, id: 'log-truth');
You can access the closure's job by accepting a $job
parameter.
use Sassnowski\Venture\WorkflowableJob;
$this->define('My Workflow')
->addJob(function (WorkflowableJob $job) {
Log::info($job->workflow());
});
Note
The parameter name must be $job
in order for the job to get injected properly.
Gated jobs
By default, a job will get dispatched automatically as soon as all of its dependencies have successfully run. Sometimes, it might be useful to require that certain jobs need to be started manually, even if their dependencies have finished. Venture supports this by allowing you to add gated jobs to a workflow.
To add a gated job, you may use the addGatedJob
method when defining your workflow.
$this->define('Publish Podcast')
->addJob(new ProcessPodcast($this->podcast))
->addJob(new OptimizePodcast($this->podcast))
->addJob(new PublishOnTransistorFM($this->podcast), [
ProcessPodcast::class,
OptimizePodcast::class,
])
->addGatedJob(new NotifySubscribers($this->podcast), [
PublishOnTransistorFM::class,
]);
In this example, NotifySubscribers
depends on PublishOnTransistorFM
. However, it won't get started automatically after PublishOnTransistorFM
finished. Instead, it will be marked as gated.
You can check if a job is gated by calling the isGated
method on the WorkflowJob
instance.
// Fetch the job by querying the `jobs` relation on the `Workflow` model.
$job = $workflow
->jobs()
->where('name', NotifySubscribers::class)
->first();
$job->isGated();
// => true
Note that isGated
will return false
if the job’s dependencies have not been finished yet. In other words, if isGated
returns true
, it means that the job is ready to run but needs to be started manually.
To start a gated job, you may call the start
method on the job.
// $job is the same `WorkflowJob` instance from above
$job->start();
After a gated job was started, that branch of the workflow will continue evaluating as normal.
Delaying a job
If you don't want to immediately execute a job as soon as it can be run, you can define a delay for it. To do so, you may call the withDelay
method on the job.
$this->define('Publish new podcast')
->addJob(
(new ProcessPodcast($this->podcast))->delay(now()->addDay())
);
This Laravel’s built-in delayed dispatching feature. For more information check Laravel's documentation on the topic.
You can also pass the delay as the fourth parameter to the addJob
method. In this case, it may be helpful to use named parameters to avoid having to pass the intermediate arguments.
$this->define('Publish new podcast')
->addJob(
new ProcessPodcast($this->podcast),
[],
null,
now()->addDay(),
);
// Or, using named parameters
$this->define('Publish new podcast')
->addJob(
new ProcessPodcast($this->podcast),
delay: now()->addDay(),
);
Conditional Jobs
Sometimes, it might be useful to only add a job or nested workflow to a workflow if a certain condition is true. To do so, you may use the when
method when defining your workflow.
class PublishPodcastWorkflow extends AbstractWorkflow
{
public function __construct(
private User $user,
private Podcast $podcast,
) {
}
public function definition(): WorkflowDefinition
{
return $this->define('Publish Podcast')
->addJob(new ProcessPodcast($this->podcast))
->when($this->user->is_pro, function (WorkflowDefinition $definition) {
$definition->addJob(
new OptimizePodcast($this->podcast),
[ProcessPodcast::class],
);
});
}
}
If the first parameter passed to the when
method is truthy, the provided callback gets called with the current WorkflowDefinition
. You can then add jobs or workflows as normal.
The when
method also takes an optional, third parameter which gets called with the current WorkflowDefinition
if the value is falsy.
return $this->define('Publish Podcast')
->addJob(new ProcessPodcast($this->podcast))
->when($this->user->is_pro, function (WorkflowDefinition $definition) {
$definition->addJob(
new OptimizePodcast($this->podcast),
[ProcessPodcast::class],
);
}, function (WorkflowDefinition $definition) {
$definition
->addJob(
new DowngradePodcastQuality($this->podcast),
[ProcessPodcast::class],
)
->addJob(new SendUpsellingEmail($this->user));
});
The example above produces two possible workflows. One if the user is a pro user and if the user isn’t a pro user.
Depending on conditional jobs
Conditional jobs can change the structure of a workflow. You might be wondering how to deal with a situation where a job should depend on different jobs, depending on whether a certain job was added to the workflow or not.
Let’s look at the example from above. Say we want to add a job called PublishOnTransistorFM
. If the user is a pro user, this job should run after OptimizePodcast
has finshed. If the user isn’t a pro user, it should run after DowngradePodcastQuality
has finished.
To deal with this, you may use the ConditionalDependency
class that comes with Venture.
use Sassnowski\Venture\Graph\ConditionalDependency;
return $this->define('Publish Podcast')
->addJob(new ProcessPodcast($this->podcast))
->when($this->user->is_pro, function (WorkflowDefinition $definition) {
$definition->addJob(
new OptimizePodcast($this->podcast),
[ProcessPodcast::class],
);
}, function (WorkflowDefinition $definition) {
$definition
->addJob(
new DowngradePodcastQuality($this->podcast),
[ProcessPodcast::class],
)
->addJob(new SendUpsellingEmail($this->user));
})
->addJob(
new PublishOnTransistorFM($this->podcast),
[
ConditionalDependency::whenDefined(
OptimizePodcast::class,
DowngradePodcastQuality::class
),
],
)
->addJob(
new PublishOnApplePodcasts($this->podcast),
[
ConditionalDependency::whenDefined(
OptimizePodcast::class,
DowngradePodcastQuality::class
),
],
);
If the OptimizePodcast
job exists in the workflow, it would get added as a dependency for the PublishOnTransistorFM
and PublishOnApplePodcasts
jobs. If not, both jobs would depend on DowngradePodcastQuality
instead.
Testing workflow definitions
Things are starting to get complicated now! You might also want to check out the section on how to go about testing your workflow definitions.
Conditional dependencies can be combined with regular dependencies, too.
$this->define('Publish Podcast')
// ...
->addJob(
new PublishOnTransistorFM($this->podcast),
[
SomeOtherJob::class,
ConditionalDependency::whenDefined(
OptimizePodcast::class,
DowngradePodcastQuality::class,
),
],
);
You can also leave out the second parameter to the whenDefined
method. In this case, the dependency will be completely removed from the job if no corresponding job exists in the workflow. If that leaves a job without any dependencies, it will be dispatched immediately after the workflow starts.
Grouped jobs
It can often be useful to add jobs to a workflow in a loop. For example, instead of adding a single NotifySubscribers
job to the workflow, maybe you want to add one job per subscriber instead. One option to do this would be to loop over the podcast's subscribers and manually call $workflow->addJob
for each subscriber.
$definition = $this->define('Publish Podcast')
->addJob(new ProcessPodcast($this->podcast))
->addJob(new OptimizePodcast($this->podcast))
->addJob(new PublishOnTransistorFM($this->podcast), [
ProcessPodcast::class,
OptimizePodcast::class,
]);
foreach ($this->podcast->subscribers as $subscriber) {
$definition->addJob(
new NotifySubscriber($this->podcast, $subscriber),
[PublishOnTransistorFM::class],
);
}
return $definition;
This works, but it's pretty verbose and forces us to break the fluent interface of the definition. In a case like this, Venture provides an each
method which allows us to add an instance of a job to the workflow for each element in a collection. Using the each
method, we can rewrite the previous example like this, instead.
return $this->define('Publish Podcast')
->addJob(new ProcessPodcast($this->podcast))
->addJob(new OptimizePodcast($this->podcast))
->addJob(new PublishOnTransistorFM($this->podcast), [
ProcessPodcast::class,
OptimizePodcast::class,
])
->each(
$this->podcast->subscribers,
fn (Subscriber $subscriber) => new NotifySubscriber(
$this->podcast,
$subscriber,
),
dependencies: [PublishOnTransitorFM::class],
);
Venture will call the provided callback for each element of the provided collection and add the returned job to the workflow. Behind the scenes, Venture will automatically configure the dependencies of each of the added job.
Venture also takes care of assigning each job a unique id. By default, Venture will enumerate the FQCN of the job, for instance App\Jobs\NotifySubscribers-1
, App\Jobs\NotifySubscribers-2
, and so on.
Configuring grouped jobs
Another benefit of grouping jobs is that it allows us to configure each job in the group the same way without having to repeat ourselves.
$this->define('Publish Podcast')
->each(
$this->podcast->subscribers,
fn (Subscriber $subscriber) => new NotifySubscriber(
$this->podcast,
$subscriber,
),
// Sets the dependencies of each added job
dependencies: [],
// Sets the name of each added job
name: 'Notify podcast subscriber'
// Sets the delay of each added job
delay: 60 * 60,
// Sets the id of each added job
id: 'notify-subscribers',
);
When providing an explicit id, Venture will take care of adding a prefix to each job's id to guarantee unique ids. All we have to do is provide an id the same way we would for a single job.
Passing an explicit id to the each
method also registers the jobs as a group inside the workflow's dependency graph. This allows us to easily reference all jobs inside a group, for example when we want to add a job that should depend on all jobs of a group.
Depending on grouped jobs
Say we want to add a new job to the workflow which should only run once all jobs inside a group have finished. To do so, we can use the GroupDependency
when defining the dependencies of a job.
use Sassnowski\Venture\Graph\GroupDependency;
$this->define('Publish Podcast')
->each(
$this->podcast->subscribers,
fn (Subscriber $subscriber) => new NotifySubscriber(
$this->podcast,
$subscriber,
)
id: 'notify-subscribers',
)
->addJob(
new NotifyAuthor($this->podcast),
[GroupDependency::forGroup('notify-subscribers')],
);
This will make sure the NotifyAuthor
job will only run once all jobs inside the notify-subscribers
group have successfully finished. The group name passed to the GroupDependency::forGroup
method needs to match the id
of the group.
Note
When adding a group dependency, make sure that you defined an explicit id
for the group. Otherwise, Venture won't register the jobs as a group and will be unable to resolve the group referenced inside GroupDependency::forGroup
.
Configuring job queues
You have multiple options to configure which queue or queue connection each job in a workflow should get dispatched on. Since workflow jobs are just regular Laravel jobs, everything from Laravel’s queue documentation still applies to them.
Configuring individual jobs
To configure the queue or queue connection of an individual job inside a workflow, you may call the onQueue
and onConnection
methods on a job instance, respectively.
$this->define('Publish Podcast')
->addJob(
(new ProcessPodcast())->onConnection('sqs'),
)
->addJob(
(new OptimizePodcast($this->podcast))->onQueue('high-priority'),
[ProcessPodcast::class],
);
This will override any defaults that might have been set inside the job class itself.
Configuring all jobs of a workflow at once
If you want to change the queue or queue connections for all jobs in a workflow, you may call the allOnQueue
and allOnConnection
methods on the workflow definition, respectively.
$this->define('Publish Podcast')
->allOnConnection('sqs')
->allOnQueue('high-priority')
->addJob(new ProcessPodcast())
->addJob(
new OptimizePodcast($this->podcast),
[ProcessPodcast::class],
);
Configuration precedence
Note that calling allOnQueue
or allOnConnection
will always take precedence over a job’s individual configuration.
$this->define('Publish Podcast')
->allOnQueue('high-priority')
->addJob(
(new ProcessPodcast())->onQueue('medium-priority'),
)
->addJob(
new OptimizePodcast($this->podcast),
[ProcessPodcast::class],
);
In this example, the ProcessPodcast
job will still get dispatched on the high-priority
queue since allOnQueue
takes precedence.
Starting a workflow
Now that you have defined you workflow, you may start it from anywhere within you application by calling its static start
method.
$workflow = PublishPodcastWorkflow::start($podcast);
Any parameter you pass to the start
method will be passed to the workflow's constructor.
The start
method returns the Workflow
Eloquent model for the workflow that you just started. Check out the section on how to keep track of workflows to learn about what you can do with this model.
Venture will now figure out which jobs can be immediately dispatched and process them in parallel. Every time a job finishes, it will check if any of the job's dependent jobs are now ready to be run. If so, it will dispatch them.
Starting workflows synchronously
Venture also provides a way to start a workflow synchronously. To do so, you may call the startSync
method when starting your workflow.
PublishPodcastWorkflow::startSync($podcast);
What this will do is set the queue connection for all jobs of the workflow to use Laravel’s sync
driver. This can be useful when developing locally or when debugging a workflow.
Synchronous evaluation of workflows
By definition, Venture cannot process multiple jobs in parallel when running a workflow synchronously. Instead Venture will perform a depth-first evaluation of the workflow’s dependency graph.
In a depth-first evaluation, Venture will start by running the first job of the workflow. After that job has finished, Venture will then try to recursively evaluate that job’s dependent jobs before moving on to the next job. In other words, Venture will try and process each branch of the workflow as deeply as it can until it hits a job that is still waiting on another dependency to be resolved.
This won’t change the actual behavior of your workflow. I just thought it was neat.
Starting workflows on different queue connections
It’s also possible to explicitly override the queue connection of all jobs when starting a workflow. To do so, you may call the startOnConnection
method to start your workflow.
ProcessPodcastWorkflow::startOnConnection('sqs', $podcast);
This method takes the queue connection as its first parameter. Any other parameters will be passed to the workflow’s constructor.
SendAnnoyingNewsletterWorkflow::startOnConnection(
'sqs',
$user,
$newsletter,
);
The provided connection has to correspond to one of the connections defined in your application’s queue config.
Defining a completion callback
You might want to perform an action after a workflow has finished successfully. To do so, you may call the then
method on the WorkflowDefinition
and pass in a closure.
<?php
use App\Notifications\PodcastPublished;
use App\Models\Podcast;
use Sassnowski\Venture\AbstractWorkflow;
use Sassnowski\Venture\WorkflowDefinition;
class PublishPodcastWorkflow extends AbstractWorkflow
{
public function __construct(private Podcast $podcast)
{
}
public function definition(): WorkflowDefinition
{
return $this->define('Publish Podcast')
// ...
->then(function (Workflow $workflow) {
$this->podcast->user->notify(
new PodcastPublished($this->podcast)
);
});
}
}
Alternatively, you may pass an invokable class to the then
callback.
$this->define('Publish Podcast')
// ...
->then(new SendNotification());
Global event listeners
If you want to perform some action after any workflow has finished, check out the section on writing plugins.
Defining an error callback
You may want to perform some action whenever a job in your workflow fails. To do so, you may use the catch
method when defining your workflow to register an error handler.
<?php
use Throwable;
use App\Notifications\PublishingPodcastFailed;
use App\Models\Podcast;
use Sassnowski\Venture\AbstractWorkflow;
use Sassnowski\Venture\WorkflowDefinition;
use Sassnowski\Venture\WorkflowableJob;
class PublishPodcastWorkflow extends AbstractWorkflow
{
public function __construct(private Podcast $podcast)
{
}
public function definition(): WorkflowDefinition
{
return $this->define('Publish Podcast')
// ...
->catch(function (Workflow $workflow, WorkflowableJob $step, Throwable $exception) {
$this->podcast->user->notify(
new PublishingPodcastFailed(
$this->podcast,
$step->getName(),
$e
),
);
});
}
}
This callback will be called any time one of the workflow’s jobs fails. The catch-callback takes three parameters:
$workflow
is the Eloquent model of the current workflow$step
is the job instance of the failed job$exception
is the exception that caused the job to fail
By default, Venture will keep processing other jobs of the workflow that are unaffected by the failed job. This means that the catch
callback for a workflow can get called multiple times if multiple jobs fail.
Dealing with errors
The fact that Venture keeps processing a workflow even if one of the workflow’s steps has failed is a feature, not a bug. Check out the section on dealing with errors on why this is a useful property and also how to change this behavior when necessary.
Adding jobs at runtime
It is also possible to add jobs to an instance of a workflow class. This allows you to dynamically add jobs to a workflow that aren’t defined inside the workflows definition
method.
To do so, you may call the tapDefinition
method on an instance of a workflow.
$workflow = new ProcessPodcastWorkflow($podcast);
$workflow->tapDefinition(function (WorkflowDefinition $definition) use ($podcast) {
$definition->addJob(new OptimizePodcast($podcast));
});
This method takes a callback which gets passed the WorkflowDefinition
object of the workflow instance. You can then add jobs or workflows to that definition just like you would inside the definition
method itself.
Note that this only changes the definition for this instance of the workflow.
With great power comes great responsibility
While this feature can be useful in certain situations to dynamically add jobs to a workflow’s definition, it is something you should use sparingly. The recommended approach most of the time is to define all jobs a workflow can have inside the definition
method. This way, you can see the entire structure of a workflow by just looking at this method.
To start a workflow instance, you may call the run
method on it.
$workflow->run();
The run
method takes an optional $connection
parameter that allows you to specifiy the queue connection for all jobs of the workflow.
$workflow->run('sync');
You cannot change a started workflow’s definition
Note that is a way of dynamically changing a workflow’s definition before it gets started. This won’t have an effect for workflows that are already running as their definitions are immutable.
Lifecycle hooks
Workflows expose several hooks which allow you to perform actions during certain parts of a workflow’s lifecycle.
beforeCreate
The beforeCreate
hook gets called before a workflow gets persisted to the database for the first time. This typically happens after the start
method was called on a workflow but before the workflow has actually started.
<?php
use Sassnowski\Venture\Models\Workflow;
use Sassnowski\Venture\AbstractWorkflow;
use Sassnowski\Venture\WorkflowDefinition;
class PublishPodcastWorkflow extends AbstractWorkflow
{
public function beforeCreate(Workflow $workflow): void
{
}
}
The beforeCreate
hook gets passed the Workflow
Eloquent model for the workflow. At this point, the model hasn’t been saved to the database yet, which means it won’t have an ID yet.
Performing the same action for all workflows
The beforeCreate
hook on the workflow allows you define actions that are specific to a workflow. If you want to perform the same action for all of your workflows, you should write a plugin, instead.
If all you want to do is associate workflows with models, you should check out the built-in Entity Aware Workflows plugin.
beforeNesting
The beforeNesting
hook gets called when adding a nested workflow to a workflow’s definition. This happens when the addWorkflow
method gets called on the workflow’s definition.
<?php
use Sassnowski\Venture\Models\Workflow;
use Sassnowski\Venture\AbstractWorkflow;
use Sassnowski\Venture\WorkflowDefinition;
class PublishPodcastWorkflow extends AbstractWorkflow
{
/**
* @param array<string, WorkflowableJob> $jobs
*/
public function beforeNesting(array $jobs): void
{
}
}
The beforeNesting
method gets passed the jobs of the nested workflow. At this point, you can still change these jobs before they get added to the workflow.