Getting Started

Getting started with the Account Aggregator

The account aggregator is a service responsible for monitoring various input streams, compiling and aggregating statistics, and selectively reporting to one or more output streams. It acts primarily as a centralized collector of metrics which may have various aggregations applied before being published to one or more endpoints.

Modes of operation

There are two primary modes of operation, a live-monitoring mode, as well as a reporting mode. The live-monitoring mode measures the account records in real-time, filters, and aggregates the data to the various outputs in real-time. In this mode, only the most recent data will be considered, and any historical context upon startup may be skipped. In the reporting mode, the account record data will be consumed and processed in the order in which they were published to Kafka, and the service will guarantee that all records, still available within the Kafka topic will be processed and reported upon.

Activating the various modes of operation is performed by way of the set of input and output blocks within the configuration file. The file may contain one or more input blocks which specify where the data is sourced, e.g. account records from Kafka, and one or more output blocks which determine how and where the aggregated statistics are published.

While it is possible to specify multiple input and output blocks within a single configuration file, it is highly recommended to separate each pairing of input and output blocks into separate instances running on different nodes. This will yield the best performance and provide for better load balancing, since each instance will be responsible for a single mode of operation.

Real-time account monitoring

In the real-time account monitoring mode, account records, which are sent from each streaming server through the Kafka message broker, are processed by the account aggregator, and current real-time throughput metrics are updated in a Redis database. These metrics, which are constantly being updated, reflect the most current state of the CDN, and can be used by the Convoy Request Router to make real-time routing decisions.

PCX Reporting

In the PCX collector mode, account records are consumed in such a way that past throughput and session statistics can be aggregated to produce billing related reports. These reports are not considered real-time metrics, but represent usage statistics over fixed time intervals. This mode of operation requires a PCX API compatible reporting endpoint. See Appendix B for additional information regarding the PCX reporting format.

Installation

Prerequisites

The account aggregator is shipped as a compressed OCI formatted container image and as such, it requires a supported container runtime such as one of the following:

  • Docker
  • Podman
  • Kubernetes

Any runtime capable of running a Linux container should work the same. For simplicity, the following installation instructions assume that Docker is being used, and that Docker is already configured and running on the target system.

To test that Docker is setup and running, and that the current user has the required privileges to create a container, you may execute the following command.

$ docker run hello-world

Hello from Docker!
This message shows that your installation appears to be working correctly.

To generate this message, Docker took the following steps:
 1. The Docker client contacted the Docker daemon.
 2. The Docker daemon pulled the "hello-world" image from the Docker Hub.
    (amd64)
 3. The Docker daemon created a new container from that image which runs the
    executable that produces the output you are currently reading.
 4. The Docker daemon streamed that output to the Docker client, which sent it
    to your terminal.

To try something more ambitious, you can run an Ubuntu container with:
 $ docker run -it ubuntu bash

Share images, automate workflows, and more with a free Docker ID:
 https://hub.docker.com/

For more examples and ideas, visit:
 https://docs.docker.com/get-started/

If you get a permission denied error, ensure that the current user is a member of the docker group or execute all Docker commands under sudo.

Loading the container image

The container image is delivered as a compressed OCI formatted image, which can be loaded directly via the docker load command. The following assumes that the image is in /tmp/esb3032-acd-aggregator-0.0.0.gz

docker load --input /tmp/esb3032-acd-aggregator-0.0.0.gz

You will now be able to verify that the image was loaded successfully by executing the following and looking for the image name in the output.

$ docker images | grep acd-aggregator

images.edgeware.tv/esb3032-acd-aggregator latest   4bbe28b444d3 1 day ago  2.08GB

Creating the configuration file

The configuration file may be located anywhere on the filesystem, however it is recommended to keep everything under the /opt/edgeware/acd/aggregator folder to be consistent with other products under the ACD product family. If that folder doesn’t already exist, you may create the folder with the following command.

mkdir -p /opt/edgeware/acd/aggregator

If using a different location, you will need to map the folder to the container while creating the Docker container. Additional information describing how to map the volume is available in the section “Creating and starting the container” below.

The configuration file for the account aggregator is divided into several sections, input, output and tuning. One or more input blocks may be specified to configure from where the data should be sourced. One or more output blocks may be configured which determine to where the resulting aggregated data is published. Finally the tuning block configures various global settings for how the account aggregator operates, such as the global log_level.

Configuring the input source

As of the current version of the account aggregator, there is only a single type of input source supported, and that is account_records. This input source connects to a Kafka message broker, and consumes account records.
Depending on which output types are configured, the Kafka consumer may either start by processing the oldest or most recent records first.

The following configuration block sample will be used as an example in the description below.

Note that the key input is surrounded by double-square-brackets. This is a syntax element to indicate that there may be multiple input sections in the configuration.

[[input]]
type = "account_records"
servers = [
    "kafka://192.0.2.1:9092",
    "kafka://192.0.2.2:9092",
]
group_name = "acd-aggregator"
kafka_max_poll_interval_ms = 30000
kafka_session_timeout_ms = 3000
log_level = "off"

The type property is used to determine the type of input, and the only valid value is account_records.

The servers list must contain at least 1 Kafka URL, prefixed with the URL scheme kafka://. If not specified, the default Kafka port of 9092 will be used. It is recommended but not required to specify all servers here, as the Kafka client library will obtain the full list of endpoints from the server on startup, however, the initial connection will be made to one or more of the provided URLs.

The group_name property identifies to which consumer group the aggregator should belong. Due to the type of data which account records represent, each instance of the aggregator connecting to the same Kafka message broker MUST have a unique group name. If two instances belong to the same group, the data will be partitioned among both instances, and the resulting aggregations may not be correct. If only a single instance of the account aggregator is used, this property is optional and defaults to “acd-aggregator”.

The kafka_* properties, for max_poll_interval and session_timeout are used to tune the connection parameters for the internal Kafka consumer. More details for these properties can be found in the documentation for the rdkafka library. See Kafka documentation for more details.

The log_level property configures the logging level for the Kafka library and supports the values “off”, “trace”, “debug”, “info”, “warn”, and “error”. By default, logging from this library is disabled. This should only be enabled for troubleshooting purposes, as it is extremely verbose, and any warnings or error messages will be repeated in the account aggregator’s log. The logging level for the Kafka library must be higher then the general logging level for the aggregator, as defined in the “tuning” section or the lower-level messages from the Kafka library will be skipped.

Configuring output

The account aggregator currently supports two types of output blocks, depending on the desired mode of operation. For reference purposes, both types will be described within this section, but it is recommended to only use a single type per instance of the account aggregator.

Note that the key output is surrounded by double-square-brackets. This is a syntax element to indicate that there may be multiple output sections in the configuration.

[[output]]
type = "account_monitor"
redis_servers = [
    "redis://192.0.2.7:6379/0",
    "redis://:password@192.0.2.8:6379/1",
]
stale_threshold_s = 12
throughput_correction_mbps = 0
minimum_check_interval_ms = 1000

[[output]]
type = "pcx_collector"
report_url = "https://192.0.2.5:8000/v1/collector"
client_id = "edgeware"
secret = "abc123"
report_timeout_ms = 2000
report_interval_s = 30
report_delay_s = 30
Real-time account monitor output

The first output block has the type account_monitor and represents the live account monitoring functionality, which publishes per-account bandwidth metrics to one or more Redis servers. When this type of output block is configured, the account records will be consumed starting with the most recent messages first, and offsets will not be committed. Stopping or restarting the service may cause account records to be skipped. This type of output is suitable for making real-time routing decisions, but should not be relied upon for critical billing or reporting metrics.

The redis_servers list consists of URLs to Redis instances which shall be updated with the current real-time bandwidth metrics. If the Redis instance requires authentication, the global instance password can be specified as part of the URL as in the second entry in the list. Since Redis does not support usernames, anything before the : in the credentials part of the URL will be ignored. At least 1 Redis URL must be provided.

The stale_threshold_s property determines the maximum timeout in seconds, after which, if no account records have been received for a given host, the host will be considered stale and removed.

The throughput_correction_mbps property can be used to add or subtract a fixed correction factor to the bandwidth reported in Redis. This is specified in megabits per second, and this may be either positive or negative. If the value is negative, and the calculated bandwidth is less than the correction factor, a minimum bandwidth of 0 will be reported.

The minimum_check_interval_ms property is used to throttle how frequently the statistics will be processed. By default, the account aggregator will not recalculate the statistics more than once per second. Setting this value too low will result in potentially higher CPU usage, while setting it too high may result in some account records being missed. The default of 1 second should be adequate for most situations.

PCX Collector output

The pcx_collector type configures the account aggregator as a reporting

agent for the PCX API. Whenever this configuration is present, the account record consumer will be configured to always start at the oldest records retained within the Kafka topic. It then processes the records one at a time, committing the Kafka offset each time a report is successfully received. This mode does not make any guarantees as to how recent the data is on which the reports are made, but does guarantee that every record will be counted in the aggregated report. Stopping or restarting the service will result in the account record consumer resuming processing from the last successful report. This type of reporting is suitable for billing purposes assuming that there are multiple replicated Kafka nodes, and that the service is not stopped for longer than the maximum retention period configured within Kafka. Stopping the service for longer than the retention period will result in messages being unavailable. Because this type of output requires that the Kafka consumer is processed in a specific order, and will not proceed with reading additional messages until all reports have been successfully received, it is not recommended to have both pcx_collector and the account_monitor type output blocks configured within the same instance.

The report_url property is a single HTTP endpoint URL where the PCX API can be reached. This property is required and may be either an HTTP or HTTPS URL. For HTTPS, the validity of the TLS certificate will be enforced, meaning that self-signed certificates will not be considered valid.

The client_id and secret fields are used to authenticate the client with the PCX API via token-based authentication. These fields are both required, however if not used by the specific PCX API instance, the empty string "" may be provided.

The report_timeout_ms field is an optional maximum timeout for the HTTP connection to the PCX API before the connection will fail. Failed reports will be retried indefinitely.

The report_interval_s property represents the interval bucket size for reporting metrics. The timing for this type of output is based solely on the embedded timestamp value of the account records, meaning that this property is not an absolute period on which the reports will be sent, but instead represents the duration between the start and ending timestamps of the report. Especially upon startup, reports may be sent much more frequently than this interval, but will always cover this duration of time.

The report_delay_s property is an optional offset used to account for both clock synchronization between servers as well as propagation delay of the account records through the message broker. The default delay is 30 seconds. This means that the ending timestamp of a given report will be no more recent than this many seconds in the past. It is important to include this delay, as any account records received with a timestamp that would be within period which has already been reported upon, will be dropped.

Tuning the account aggregator

The tuning configuration block represents the global properties for tuning how the account aggregator functions. Currently only one tuning property can be configured, and that is the log_level. The default log_level is “info”, which should be used in normal operation of the account aggregator, however, other possible values in order of verbosity include “trace”, “debug”, “info”, “warn”, “error”, and “off”.

Note that the tuning key is surrounded by single square-brackets. This is TOML syntax meaning that only one instance of tuning is allowed.

[tuning]
log_level = "info"

Example configurations

This section describes some example configuration files which can be used as a starting template depending on which mode of operation is desired.

Real-time account monitoring

This configuration will consume account records from a Kafka server running on 3 hosts, kafka-1, kafka-2, and kafka-3. The account records will be consumed starting with the most recent records. The resulting aggregations will be published to two Redis instances, running on redis-1 and redis-2. The reported bandwidth will have a 2Gb/s correction factor applied.

[[input]]
type = "account_records"
servers = [
    "kafka://kafka-1:9092",
    "kafka://kafka-2:9092",
    "kafka://kafka-3:9092"
]
group_name = "acd-aggregator-live"
# kafka_max_poll_interval_ms = 30000
# kafka_session_timeout_ms = 3000
# log_level = "off"

[[output]]
type = "account_monitor"
redis_servers = [
    "redis://redis-1:6379/0",
    "redis://redis-2:6379/0",
]
# stale_threshold_s = 12
throughput_correction_mbps = 2000
# minimum_check_interval_ms = 1000

[tuning]
log_level = "info"

The keys prefixed by # are commented out, since the default values will be used. They are included in the example for completeness.

PCX collector

This configuration will consume account records starting from the earliest record, calculate aggregated statistics for every 30 seconds, offset with a delay of 30 seconds, and publish the results to https://pcx.example.com/v1/collector.

[[input]]
type = "account_records"
servers = [
    "kafka://kafka-1:9092",
    "kafka://kafka-2:9092",
    "kafka://kafka-3:9092"
]
group_name = "acd-aggregator-pcx"
# kafka_max_poll_interval_ms = 30000
# kafka_session_timeout_ms = 3000
# log_level = "off"

[[output]]
type = "pcx_collector"
report_url = "https://pcx.example.com/v1/collector"
client_id = "edgeware"
secret = "abc123"
# report_timeout_ms = 2000
# report_interval_s = 30
# report_delay_s = 30

[tuning]
log_level = "info"

The keys prefixed by # are commented out, since the default values will be used. They are included in the example for completeness.

Combined PCX collector with real-time account monitoring

While this configuration is possible, it is not recommended, since the pcx_collector output type will force all records to be consumed starting at the earliest record. This will cause the live statistics to be delayed until ALL earlier records have been consumed, and reports have been successfully accepted by the PCX API. This combined role configuration can be used to minimize the number of servers or services running if the above limitations are acceptable.

Note: This is simply the combination of the above two output blocks in the same configuration file.

[[input]]
type = "account_records"
servers = [
    "kafka://kafka-1:9092",
    "kafka://kafka-2:9092",
    "kafka://kafka-3:9092"
]
group_name = "acd-aggregator-combined"
# kafka_max_poll_interval_ms = 30000
# kafka_session_timeout_ms = 3000
# log_level = "off"

[[output]]
type = "account_monitor"
redis_servers = [
    "redis://redis-1:6379/0",
    "redis://redis-2:6379/0",
]
# stale_threshold_s = 12
throughput_correction_mbps = 2000
# minimum_check_interval_ms = 1000

[[output]]
type = "pcx_collector"
report_url = "https://pcx.example.com/v1/collector"
client_id = "edgeware"
secret = "abc123"
# report_timeout_ms = 2000
# report_interval_s = 30
# report_delay_s = 30

[tuning]
log_level = "info"

Upgrading

The upgrade procedure for the aggregator consists of simply stopping the existing container with docker stop acd-aggregator, removing the existing container with docker rm acd-aggregator, and following the steps in “Creating and starting the container” below with the upgraded Docker image.

To roll back to a previous version, simply perform the same steps with the previous image. It is recommended to keep at least one previous image around until such time that you are satisfied with the new version. After which, you may remove the previous image with docker rmi images.edgeware.tv/esb3032-acd-aggregator:1.2.3 where “1.2.3” represents the previous version number.

Creating and starting the container

Now that the configuration file has been created, and the image has been loaded, we will need to create and start the container instance. The following docker run command will create a new container called “acd-aggregator”, start the process, and automatically resume the container once the Docker daemon is loaded at startup.

docker run \
  --name "acd-aggregator" \
  --detach \
  --restart=always \
  -v <PATH_TO_CONFIG_FOLDER>:/opt/edgeware/acd/aggregator:ro \
  <IMAGE NAME>:<VERSION> \
  --config /opt/edgeware/acd/aggregator/aggregator.toml

As an example using version 1.4.0:

docker run \
  --name "acd-aggregator" \
  --detach \
  --restart=always \
  -v /opt/edgeware/acd/aggregator:/opt/edgeware/acd/aggregator:ro \
  images.edgeware.tv/esb3032-acd-aggregator:1.4.0 \
  --config /opt/edgeware/acd/aggregator/aggregator.toml

Note: The image tag in the example is “1.4.0”, you will need to replace that tag with the image tag loaded from the compressed OCI formatted image file, which can be obtained by running docker images and searching for the account aggregator image as described in the step “Loading the container image” above.

If the configuration file saved in the previous step was at a different location from /opt/edgeware/acd/aggregator/aggregator.toml you will need to change both the -v option and the --config option in the above command to represent that location. The -v option mounts the containing folder from the host system on the left to the corresponding path inside the container on the right, and the :ro tells Docker that the volume is mounted read-only. The --config should be the absolute path to the configuration file from INSIDE the container. For example, if you saved the configuration file as /host/path/config.toml on the host, and you need to map that to /container/path/config.toml within the container, the lines should be -v /host/path:/container/path:ro and --config /container/path/config.toml respectively.

The --restart=always line tells Docker to automatically restart the container when the Docker runtime is loaded, and is the equivalent in systemd to “enabling” the service.

Starting and Stopping the container

To view the status of the running container, use the docker ps command. This will give a line of output for the acd-aggregator container if it is currently running. Appending the -a flag, will list the aggregator container if is not running as well.

Execute the following:

docker ps -a

You should see a line for the container with the container name “acd-aggregator” along with the current state of the container. If all is OK, you should see the container process running at this point, but it may show as “exited” if there was a problem.

To start and stop the container the docker start acd-aggregator and docker stop acd-aggregator commands can be used.

Viewing the logs

By default, Docker will maintain the logs of the individual containers within its own internal logging subsystem, which requires the user to use the command

docker logs

to view them. It is possible however to configure the Docker daemon to send logs to the system journal, however configuring that is beyond the scope of this document. Additional details describing how to do that are described here

[https://docs.docker.com/config/containers/logging/journald/].

To view the complete log for the aggregator the following command can be used.

docker logs acd-aggregator

Supplying the -f flag, can be used to “follow” the log until either the process terminates or CTRL+C is pressed.

docker logs -f acd-aggregator

Appendix A: Real-time account monitoring

Redis key value pairs

Each account will have a single key-value stored in Redis with the current throughput with any correction factor applied, which will be updated in real-time every time all hosts for the given account have received a new account record. This should be approximately every 10 seconds, but may vary slightly due to processing time.

The keys are structured in the following format:

bandwidth:<account>:value

and the value is reported in bits-per-second.

For example for accounts foo, bar and baz we may see the following:

bandwidth:foo:value = 123456789
bandwidth:bar:value = 234567890
bandwidth:baz:value = 102400

These values represent the most current throughput for each account, and will be updated periodically. A TTL of 48 hours is added to the keys, such that they will be pruned automatically after 48 hours since the last update. This is to prevent stale keys from remaining in Redis indefinitely. This TTL is not configurable by the end user.

Appendix B: PCX collector reporting

PCX reporting format

The following is an example of the report sent to the PCX HTTP endpoint.

{
    timestamp_begin: 1674165540,
    timestamp_end: 1674165570,
    writer_id: "writer-1",
    traffic: [
        Traffic {
            account_id: "unknown",
            num_ongoing_sessions: 0,
            bytes_transmitted: 0,
            edges: [
                Edge {
                    server: "orbit-1632",
                    num_ongoing_sessions: 0,
                    bytes_transmitted: 0,
                },
            ],
        },
        Traffic {
            account_id: "default",
            num_ongoing_sessions: 747,
            bytes_transmitted: 75326,
            edges: [
                Edge {
                    server: "orbit-1632",
                    num_ongoing_sessions: 747,
                    bytes_transmitted: 75326,
                },
            ],
        },
    ],
}

The report can be broken down into 3 parts. The outer root section includes the starting and stopping timestamps, as well as a writer_id field which is currently unused. For each account a Traffic section contains the aggregated statistics for that account, as well as a detailed breakdown of each Edge. An Edge is the portion of traffic for the account streamed by each server. Within an Edge the num_ongoing_sessions represents the peak ongoing sessions during the reporting interval, while the bytes_transmitted represents the total egress bandwidth in bytes over the entire period. For each outer Traffic section, the num_ongoing_sessions and bytes_transmitted represent the sum of the corresponding entries in all Edges.

Data protection and consistency

The ACD aggregator works by consuming messages from Kafka. Once a report has successfully been submitted, as determined by a 200 OK HTTP status from the reporting endpoint, the position in the Kafka topic will be committed. This means that if the aggregator process stops and is restarted, reporting will resume from the last successful report, and no data will be lost. There is a limitation to this, however, and that has to do with the data retention time of the messages in Kafka and the TTL value specified in the aggregator configuration. Both default to the same value of 24 hours. This means that if the aggregator process is stopped for more than 24 hours, data loss will result since the source account records will have expired from Kafka before they can be reported on by the aggregator.

Upon startup of the aggregator, all records stored in Kafka will be reported on in the order they are read, starting from either the last successful report or the oldest record currently in Kafka. Reports will be sent each time the timestamp in the current record read from Kafka exceeds the reporting interval meaning a large burst of reports will be sent at startup to cover each interval. Once the aggregator has caught up with the backlog of account records, it will send a single report roughly every 30 seconds (configurable).

It is not recommended to have more than a single account aggregator instance reading from Kafka at a time, as this will result in partial reports being sent to the HTTP endpoint which will require the endpoint to reconstruct the data upon receipt. All redundancy in the account aggregator is handled by the redundancy within Kafka itself. With this in mind, it is important to ensure that there are multiple Kafka instances running and that the aggregator is configured to read from all of them.