Skip to content

Dispatching Jobs

Jobs are units of work that run asynchronously on a worker pool. Use them for tasks that should not block the request-response cycle: sending emails, processing uploads, generating reports, or any background work.

Defining a Job

Create a class that inherits from framework::jobs::job and implements three pure virtual methods.

cpp
#include <framework/jobs/job.hpp>

using namespace framework;
using namespace framework::jobs;

class SendEmailJob : public job {
public:
    // Unique name used to identify this job type
    std::string name() const override { return "send_email"; }

    // Which queue this job belongs to
    std::string queue() const override { return "emails"; }

    // The actual work — called by a worker thread
    void handle() override {
        // Access framework state via this->state()
        auto& cache = state().of_cache();
        fmt::println("Sending email to {}...", to);
    }

    // Custom fields for job data
    std::string to;
    std::string subject;
};
MethodPurpose
name()Returns a unique string identifying this job type. Used with register_job().
queue()Returns the queue name this job should be routed to.
handle()Contains the job logic. Called when a worker picks up the job.

Registering a Job Factory

Before dispatching, register a factory function that creates instances of your job. The framework uses this to deserialize jobs received from other nodes.

cpp
auto& queue_manager = app.get_state()->of_jobs();

queue_manager.register_job("send_email", []() -> job_ptr {
    return std::make_shared<SendEmailJob>();
});

The factory must return a job_ptr (std::shared_ptr<job>).


Creating a Queue

Queues organize jobs by type and control concurrency. Create one before dispatching.

cpp
queue_config email_config;
email_config.name_ = "emails";
email_config.workers_ = 3;
email_config.max_attempts_ = 3;
email_config.retry_delay_ms_ = 5000;

queue_manager.create_queue("emails", 3, email_config);

The first parameter to create_queue is the queue name (required). The second is the number of worker threads. The optional third parameter is a queue_config with additional settings. The queue_config::name_ field must match the first argument.

Note: The queue name is specified twice (as the first arg AND in config.name_) for validation purposes — they must match.


Dispatching a Job

dispatch(job) — Default (any node)

Sends the job to a worker on the local node.

cpp
auto job = std::make_shared<SendEmailJob>();
job->to = "user@example.com";
job->subject = "Welcome!";

auto job_id = queue_manager.dispatch(job);

Returns the UUID assigned to the job.

dispatch(job, target_node_id) — Specific node

Sends the job to a specific node in the mesh. Useful for data locality.

cpp
boost::uuids::uuid target_node = /* ... */;
queue_manager.dispatch(job, target_node);

broadcast(job) — All nodes

Sends the job to every node in the mesh. Each node processes it independently.

cpp
queue_manager.broadcast(job);

Job Serialization

Jobs dispatched across the mesh need to be serialized. Override these methods for custom data:

cpp
class SendEmailJob : public job {
    // For binary transport
    std::vector<uint8_t> serialize() const override { /* pack fields */ }
    void deserialize(const std::vector<uint8_t>& data) override { /* unpack */ }

    // For JSON transport
    boost::json::value serialize_as_json() const override { /* ... */ }
    void deserialize_as_json(const boost::json::value& json) override { /* ... */ }
};