Job Queue

3 min read · last updated at

Sometimes you need to do work, that takes long, while not being urgent. If you build a podacsting site, your users might upload a audio file, which you then transcode into several different codecs, generate a transcript, and finally notify a third-party API that a new episode is ready. All of this might take a few minutes – a time no user wants to wait and stare at loading spinners.

Motivation

The simplest solution would be to just run all tasks directly when a user uploads their episode and show them a loading spinner. Again, the whole process takes several minutes to complete. This might look something like this:

app/Http/Controller/PodcastController.php
<?php
namespace App\Http\Controller;
use App\Http\Requests\UploadNewEpisodeRequest;
class PodcastController
{
public function store(UploadNewEpisodeRequest $episode)
{
transcodeAudioCodec($episode->audioFile);
generateTranscript($episode->audioFile);
notifyThirdPartyApi($episode->title);
return "Everything went well!";
}
}
Sources/App/Controllers/PodcastController.swift
import Vapor
struct PodcastController: RouteCollection {
func boot(routes: RoutesBuilder) throws {
let podcasts = routes.grouped("podcasts")
podcasts.post(use: create)
}
func create(req: Request) async throws -> UploadNewEpisodeRequest in {
let episode = try req.content.decode(UploadNewEpisodeRequest.self)
transcodeAudioCodec(episode.audioFile)
generateTranscript(episode.audioFile)
notifyThirdPartyApi(episode.title)
return "Everything went well!"
}
}

This approach works, but when more and more requests arrive at the system, running such time-intensive tasks in a web context can be problematic. Your server likely spawns several worker threads that handle incoming requests and run your logic. The amount of these worker threads is limited by the number of physical CPU cores the machine running your code has. When all worker threads are occupied by long running transcoding tasks, a new incoming request won’t be answered until a worker is free. Therefore, a user requesting your homepage will not get a timely response, since your whole web server is busy transcoding audio files – not ideal!

A job queue is a commonly used solution for such a problem. Instead of occupying your precious HTTP threads with non urgent tasks, you create separate queue workers soley responsible for running code asynchronously. By splitting the responsibilities you guarantee that all web requests can be answered, regardless of how many long running tasks you are processing.

The way it usually works is that the data necessary to run the job, in our case the name of the uploaded file, is stored persistently somewhere. This could be a relational database like PostgreSQL or MySQL, but NoSQL stores like MongoDB or specialized queueing system like Redis or RabbitMQ are also commonly used. The queue workers then periodically ask the data store if new job data is available and process it when it arrives.

For our example, we could instead queue our logic to be run in a queue worker. This could look like this:

We now first store the audio somewhere on our system (so we can retrieve it later) and then place a new ProcessEpisode job in our queue. Laravel handles the serialization of the job data for us. Now we also inform the user, that the publishing will happen asynchronously.

TODO: Model Identifiers with link to docs

app/Http/Controller/PodcastController.php
<?php
namespace App\Http\Controller;
use App\Http\Requests\UploadNewEpisodeRequest;
use App\Jobs\ProcessEpisode;
class PodcastController
{
public function store(UploadNewEpisodeRequest $episode)
{
// Important! We need to store the file somewhere,
// so we can retrieve it later when we actually
// run our job.
$path = storeAudioFile($episode->audioFile);
ProcessEpisode::dispatch($path, $episode->title);
return "Your episode was successfully uploaded! "
. "We still need to generate the transcript "
. "and process the audio, so it might take a bit "
. "until it's released.";
}
}

The job then looks like this:

app/Jobs/ProcessEpisode.php
<?php
namespace App\Jobs;
use App\Http\Requests\UploadNewEpisodeRequest;
class ProcessEpisode
{
public function __construct(
private string $path,
private string $title,
) {
}
public function __invoke(): void
{
$audioFile = retrieveAudioFile($this->path);
transcodeAudioCodec($audioFile);
generateTranscript($audioFile);
notifyThirdPartyApi($this->title);
}
}

The job data only contained the path to the file, since serializing a whole file and storing it on the job queue is not encouraged. The worker and job queue might run on different systems, requiring data to be transferred.

Otherwise, the job definition simply contains the functions we moved from the controller.

We now first store the audio somewhere on our system (so we can retrieve it later) and then place a new ProcessEpisode job in our queue. Vapor handles the serialization of the job data for us. Now we also inform the user, that the publishing will happen asynchronously.

Sources/App/Controllers/PodcastController.swift
import Vapor
struct PodcastController: RouteCollection {
func boot(routes: RoutesBuilder) throws {
let podcasts = routes.grouped("podcasts")
podcasts.post(use: create)
}
// ... Maybe somehow collapse the top section
func create(req: Request) async throws -> UploadNewEpisodeRequest in {
// Important! We need to store the file somewhere,
// so we can retrieve it later when we actually
// run our job.
let episode = try req.content.decode(UploadNewEpisodeRequest.self)
let path = await storeAudioFile(episode, req)
try await req.queue.dispatch(
ProcessEpisode.self,
.init(path: path, title: episode.title))
return "Your episode was successfully uploaded! "
+ "We still need to generate the transcript "
+ "and process the audio, so it might take a bit "
+ "until it's released."
}
}

The job then looks like this:

Sources/App/Jobs/ProcessEpisode.swift
import Vapor
import Foundation
import Queues
struct PodcastEpisode: Codable {
let path: String
let title: String
}
struct ProcessEpisode: AsyncJob {
typealias Payload = PodcastEpisode
func dequeue(_ context: QueueContext, _ payload: PodcastEpisode) -> async throws {
await transcodeAudioCodec(episode.audioFile)
await generateTranscript(episode.audioFile)
await notifyThirdPartyApi(episode.title)
}
}

Orchestration

To optimize our example we could also divide up our job into more granular tasks, each running inside ther own job The transcoding and transcript generation do not depend on each other, so they can run in parallel. Only once both have finished, we can notify the external API, since it will read the generated files and transcript.

Frameworks have sevaral different ways of defining such complex workflows. Below are some examples:

We now focus on the controller again, since the jobs would simply call the functions we used earlier.

To run jobs in parallel and get notified once they terminate we can leverage Laravel’s chains and batches:

app/Http/Controller/PodcastController
<?php
namespace App\Http\Controller;
use Illuminate\Support\Facades\Bus;
use App\Http\Requests\UploadNewEpisodeRequest;
use App\Jobs\{TranscodeAudio, GenerateTranscript};
class PodcastController
{
public function store(UploadNewEpisodeRequest $episode)
{
$path = storeAudioFile($episode->audioFile);
$parallelJobs = [
new TranscodeAudio($path),
new GenerateTranscript($path),
];
// Grouping the jobs in a batch Batching the jobs
// enables us to process them both in parallel.
Bus::batch($parallelJobs)
// When our batch (both parallel running jobs) is
// ready, we can finally notify the API.
->then(fn () => notifyThirdPartyApi($episode->title))
->dispatch();
return "Your episode was successfully uploaded! "
. "We still need to generate the transcript "
. "and process the audio, so it might take a bit "
. "until it's released.";
}
}

Retrying

This also surfaces another advantage of splitting up our work into separate jobs: we can retry them in case of an error. Imagine the API has scheduled maintainance or introduced a bug that leads to them returning a non-successful HTTP-response. When running everything in our web process, we would have no choice but responding with an error, but