Goby v2
goby-acomms: queue (Message Priority Queuing)

Table of Contents for queue:

Return to goby-acomms: An overview of Acoustic Communications Library.

Understanding dynamic priority queuing

Each queue has a base value ( $V_{base}$) and a time-to-live ( $ttl$) that create the priority ( $P(t)$) at any given time ( $t$):

\[ P(t) = V_{base} \frac{(t-t_{last})}{ttl} \]

where $t_{last}$ is the time of the last send from this queue.

This means for every queue, the user has control over two variables ( $V_{base}$ and $ttl$). $V_{base}$ is intended to capture how important the message type is in general. Higher base values mean the message is of higher importance. The $ttl$ governs the number of seconds the message lives from creation until it is destroyed by queue. The $ttl$ also factors into the priority calculation since all things being equal (same $V_{base}$), it is preferable to send more time sensitive messages first. So in these two parameters, the user can capture both overall value (i.e. $V_{base}$) and latency tolerance ( $ttl$) of the message queue.

The following graph illustrates the priority growth over time of three queues with different $ttl$ and $V_{base}$. A message is sent every 100 seconds and the queue that is chosen is marked on the graph.

priority_graph.png

Queuing Protobuf Options

This section gives an overview of the queue configuration options available. The full list is available in queue.proto (as messages goby::acomms::protobuf::QueuedMessageEntry).

Queue message options:

name type default description
ack bool true Whether an acoustic acknowledgment should be requested for messages sent from this queue.
blackout_time uint32 0 Minimum number of seconds allowed between sending messages from this queue.
max_queue uint32 0 Allowed size of the queue before overflow. If newest_first is true, the oldest elements are removed upon overflow, else the newest elements are (the queue blocks). 0 is a special value signifying infinity (no maximum).
newest_first bool true (true=FILO, false=FIFO) whether to send newest messages in the queue first (FILO) or not (FIFO).
ttl int32 1800 the time in seconds a message lives after its creation before being discarded. This time-to-live also factors into the growth in priority of a queue. see value_base for the main discussion on this. 0 is a special value indicating infinite life (i.e. ttl = 0 is effectively the same as ttl = $\infty$)
value_base double 1 base priority value for this message queue. priorities are calculated on a request for data by the modem (to send a message). The queue with the highest priority (and isn't in blackout) is chosen. The actual priority ( $P$) is calculated by $P(t) = V_{base} \frac{(t-t_{last})}{ttl}$ where $V_{base}$ is the value set here, $t$ is the current time (in seconds), $t_{last}$ is the time of the last send from this queue, and $ttl$ is the ttl option. Essentially, a message with low ttl will become effective quickly again after a sent message (the priority line grows faster). See Understanding dynamic priority queuing for further discussion.
encode_on_demand bool false (Advanced) enable on-demand encoding where rather than queueing data, the data request is forwarded up to the application level via the signal goby::acomms::QueueManager::signal_data_on_demand
on_demand_skew_seconds double 1 (Advanced) if encode_on_demand == true, this sets the number of seconds before data encoded on demand are considering stale and thus must be demanded again with the signal goby::acomms::QueueManager::signal_data_on_demand. Setting this to 0 is unadvisable as it will cause many calls to goby::acomms::QueueManager::signal_data_on_demand and thus waste CPU cycles needlessly encoding.

Queue Role options: Queue needs to know how to address a message (the source ID and destination ID) as well as the time the message was generated. This information either read from the fields of the of the DCCL message (setting: FIELD_VALUE) or is statically configured (setting: STATIC). In the latter case, the configuration value "static_value" is set and used for every DCCL message of this type that gets queued by this QueueManager.

In the former case (the default), you can tag a given field of a DCCL message to a particular "role." This takes the place of a fixed transport layer header that protocols such as UDP use. The fields used in a given role can be anywhere within the message. The field is identified by its name (in the configuration value "field"). Submessage fields can be used by separating the field names by periods (".") until the child is a primitive type (e.g. uint32).

RoleType allowed field types description
SOURCE_ID All integer types (uint32, int32, uint64, int64, ...) The value in this field is used to represent the sending address (similar to an IP address) of the message.
DESTINATION_ID All integer types (uint32, int32, uint64, int64, ...) The value in this field is used to represent the destination address (similar to an IP address) of the message. 0 is reserved to indicate broadcast.
TIMESTAMP uint64 or double The value in this field is used as the timestamp of the message. If the type is double, it must be seconds (and fractional seconds) since the UNIX epoch (1970-01-01 midnight UTC). If it is a uint64, it must be microseconds since the UNIX epoch. This field used for expiring messages that exceed their ttl and thus must, in general, be set and correct.

Interacting with the QueueManager

Instantiate and configure

The goby::acomms::QueueManager is configured similarly to the goby::acomms::DCCLCodec. You need to set a unique identification number for this platform (the "modem ID") through the goby::acomms::protobuf::QueueManagerConfig .

You can configure queues by added repeated fields to the QueueManagerConfig's message_entry field, or by calling goby::acomms::QueueManager::add_queue() directly.

When using goby::acomms::QueueManager you will not likely need to use the goby::acomms::DCCLCodec directly much at all. All messages are pushed to the queues unencoded and are encoded automatically by goby::acomms::QueueManager before sending. Likewise, all messages received are decoded before being provided on the signal goby::acomms::QueueManager::signal_receive.

For example, this code configures the QueueManager with a single queue (DCCL type GobyMessage)

cfg.set_modem_id(our_id);
goby::acomms::protobuf::QueuedMessageEntry* q_entry = cfg.add_message_entry();
q_entry->set_protobuf_name("GobyMessage");
q_entry->set_newest_first(true);
goby::acomms::protobuf::QueuedMessageEntry::Role* dest_role = q_entry->add_role();
dest_role->set_type(goby::acomms::protobuf::QueuedMessageEntry::DESTINATION_ID);
dest_role->set_field("header.dest_platform");
goby::acomms::protobuf::QueuedMessageEntry::Role* time_role = q_entry->add_role();
time_role->set_type(goby::acomms::protobuf::QueuedMessageEntry::TIMESTAMP);
time_role->set_field("header.time");
goby::acomms::protobuf::QueuedMessageEntry::Role* src_role = q_entry->add_role();
src_role->set_type(goby::acomms::protobuf::QueuedMessageEntry::SOURCE_ID);
src_role->set_field("header.source_platform");
q_manager.set_cfg(cfg);

Signals and (application layer) slots

Then, you need to do a few more initialization chores:

Operation

At this point the goby::acomms::QueueManager is ready to use. At the application layer, new messages are pushed to the queues for sending using goby::acomms::QueueManager::push_message. Each queue is identified by its DCCL (Protobuf) name.

At the driver layer, messages are requested using goby::acomms::QueueManager::handle_modem_data_request and incoming messages (including acknowledgments) are published using goby::acomms::QueueManager::handle_modem_receive. If using the goby-acomms drivers (i.e. some class derived from goby::acomms::ModemDriverBase), simply call goby::acomms::bind (ModemDriverBase&, QueueManager&) and these methods (slots) will be invoked automatically from the proper driver signals.

You must run goby::acomms::QueueManager::do_work() regularly (faster than 1 Hz; 10 Hertz is good) to process expired messages (goby::acomms::QueueManager::signal_expire). All other signals are emitted in response to a driver level signal (and thus are called during a call to goby::acomms::ModemDriverBase::do_work() if using the Goby modemdriver).

See queue_simple.cpp for a basic complete example.

Example messages

This section provides a listing of queue example Protobuf messages used in the code examples and unit tests.

Minimal functional DCCL / Queue message

simple.proto

import "dccl/option_extensions.proto";

message Simple
{
    // see http://gobysoft.org/wiki/DcclIdTable
    option (dccl.msg).id = 124;

    // if, for example, we want to use on the WHOI Micro-Modem rate 0
    option (dccl.msg).max_bytes = 32;

    required string telegram = 1 [(dccl.field).max_length = 30];
}

Test1

queue1/test.proto

import "dccl/option_extensions.proto";

message TestMsg
{
    option (dccl.msg).id = 2;
    option (dccl.msg).max_bytes = 32;

    // test default enc/dec
    optional double double_default_optional = 1 [
        (dccl.field).min = -100,
        (dccl.field).max = 126,
        (dccl.field).precision = 2,
        (dccl.field).in_head = true
    ];
    optional float float_default_optional = 2 [
        (dccl.field).min = -20,
        (dccl.field).max = 150,
        (dccl.field).precision = 3
    ];
}
See also
queue1/test.cpp

Test2, Test3, Test4

dccl3/test.proto

import "goby/common/protobuf/option_extensions.proto";
import "dccl/option_extensions.proto";
import "goby/test/acomms/dccl3/header.proto";

message GobyMessage
{
    option (dccl.msg).id = 4;
    option (dccl.msg).max_bytes = 32;

    required string telegram = 1 [(dccl.field).max_length = 10];
    required Header header = 2;
}

protobuf/header.proto

import "goby/common/protobuf/option_extensions.proto";
import "dccl/option_extensions.proto";

// required fields will be filled in for you by ApplicationBase
// if you choose not to do so yourself
message Header
{
    //
    // time
    //

    // result of goby::util::as<std::string>(goby_time())
    // e.g. "2002-01-20 23:59:59.000"
    required string time = 10
        [(dccl.field).codec = "_time", (dccl.field).in_head = true];

    //
    // source
    //
    required string source_platform = 11 [
        (dccl.field).codec = "_platform<->modem_id",
        (dccl.field).in_head = true
    ];
    optional string source_app = 12 [(dccl.field).omit = true];

    //
    // destination
    //
    enum PublishDestination
    {
        PUBLISH_SELF = 1;
        PUBLISH_OTHER = 2;
        PUBLISH_ALL = 3;
    }
    optional PublishDestination dest_type = 13
        [default = PUBLISH_SELF, (dccl.field).in_head = true];

    optional string dest_platform = 14 [
        (dccl.field).codec = "_platform<->modem_id",
        (dccl.field).in_head = true
    ];  // required if dest_type == other
}
See also
queue2/test.cpp
queue3/test.cpp
queue4/test.cpp

Test5

queue5/test.proto

import "dccl/option_extensions.proto";

message GobyMessage
{
    option (dccl.msg).id = 4;
    option (dccl.msg).max_bytes = 32;

    // one byte
    required int32 telegram = 1 [(dccl.field).min = 0, (dccl.field).max = 255];
}
See also
queue5/test.cpp