Skip to content

Tasks & Scheduling

The tasks registry manages one-shot and delayed asynchronous tasks with conditions and timeouts.

Task Registry

cpp
auto& tasks = app.get_state()->of_tasks();

Scheduling a One-Shot Task

Schedules a task that fires when all expected responses arrive or when the timeout expires (whichever comes first).

Parameters

ParamTypeDescription
tidsvector<uuid>Transaction IDs to track. When process_response() is called with one of these IDs, the on_response callback fires.
expectedsize_tNumber of responses required to satisfy the condition.
condition(current, expected)callbackCalled after each response. Return true when the task is complete. current is an atomic counter you increment in on_response.
on_response(msg, current)callbackCalled for each matching response. Increment current here. msg is the incoming FlatBuffers Message.
on_completion(ec, received)callbackCalled when the condition is met or the timeout fires. ec is set on timeout. received is the final count.
timeoutmillisecondsMaximum wait time. If the condition is not met before this, on_completion fires with an error code.
cpp
std::vector<boost::uuids::uuid> tids = {/* ... */};
size_t expected = 3;
std::chrono::milliseconds timeout(10000);

tasks.schedule_once(tids, expected,
    /* condition: return true when done */
    [](std::atomic<size_t>& current, size_t expected) -> bool {
        return current >= expected;
    },
    /* on_response: called for each response */
    [](const protocol::fbs::Message* msg, std::atomic<size_t>& current) {
        current++;
    },
    /* on_completion: called when condition is met or timeout */
    [](boost::system::error_code ec, size_t received) {
        if (ec) {
            fmt::println("Task timed out, got {} responses", received);
        } else {
            fmt::println("Task completed with {} responses", received);
        }
    },
    timeout
);

Scheduling a Delayed Task

cpp
// Run a callback after a fixed delay
tasks.schedule_delayed(std::chrono::seconds(30), []() {
    fmt::println("30 seconds have passed");
});

Processing Responses

When a message arrives that matches a tracked transaction ID, call:

cpp
tasks.process_response(transaction_id, message);

This triggers the on_response callback for any matching pending task.

Use Cases

  • Waiting for acknowledgments from multiple nodes
  • Delayed retry logic
  • Scheduled state cleanup
  • Timeout detection for distributed operations