Appearance
Tasks & Scheduling
The tasks registry manages one-shot and delayed asynchronous tasks with conditions and timeouts.
Task Registry
cpp
auto& tasks = app.get_state()->of_tasks();Scheduling a One-Shot Task
Schedules a task that fires when all expected responses arrive or when the timeout expires (whichever comes first).
Parameters
| Param | Type | Description |
|---|---|---|
tids | vector<uuid> | Transaction IDs to track. When process_response() is called with one of these IDs, the on_response callback fires. |
expected | size_t | Number of responses required to satisfy the condition. |
condition(current, expected) | callback | Called after each response. Return true when the task is complete. current is an atomic counter you increment in on_response. |
on_response(msg, current) | callback | Called for each matching response. Increment current here. msg is the incoming FlatBuffers Message. |
on_completion(ec, received) | callback | Called when the condition is met or the timeout fires. ec is set on timeout. received is the final count. |
timeout | milliseconds | Maximum wait time. If the condition is not met before this, on_completion fires with an error code. |
cpp
std::vector<boost::uuids::uuid> tids = {/* ... */};
size_t expected = 3;
std::chrono::milliseconds timeout(10000);
tasks.schedule_once(tids, expected,
/* condition: return true when done */
[](std::atomic<size_t>& current, size_t expected) -> bool {
return current >= expected;
},
/* on_response: called for each response */
[](const protocol::fbs::Message* msg, std::atomic<size_t>& current) {
current++;
},
/* on_completion: called when condition is met or timeout */
[](boost::system::error_code ec, size_t received) {
if (ec) {
fmt::println("Task timed out, got {} responses", received);
} else {
fmt::println("Task completed with {} responses", received);
}
},
timeout
);Scheduling a Delayed Task
cpp
// Run a callback after a fixed delay
tasks.schedule_delayed(std::chrono::seconds(30), []() {
fmt::println("30 seconds have passed");
});Processing Responses
When a message arrives that matches a tracked transaction ID, call:
cpp
tasks.process_response(transaction_id, message);This triggers the on_response callback for any matching pending task.
Use Cases
- Waiting for acknowledgments from multiple nodes
- Delayed retry logic
- Scheduled state cleanup
- Timeout detection for distributed operations