This is the multi-page printable view of this section. Click here to print.
ESB3032 ACD Aggregator
1 - Getting Started
The account aggregator is a service responsible for monitoring various input streams, compiling and aggregating statistics, and selectively reporting to one or more output streams. It acts primarily as a centralized collector of metrics which may have various aggregations applied before being published to one or more endpoints.
Modes of operation
There are two primary modes of operation, a live-monitoring mode, as well as a reporting mode. The live-monitoring mode measures the account records in real-time, filters, and aggregates the data to the various outputs in real-time. In this mode, only the most recent data will be considered, and any historical context upon startup may be skipped. In the reporting mode, the account record data will be consumed and processed in the order in which they were published to Kafka, and the service will guarantee that all records, still available within the Kafka topic will be processed and reported upon.
Activating the various modes of operation is performed by way of the set of
input
and output
blocks within the configuration file. The file may
contain one or more input
blocks which specify where the data is sourced,
e.g. account records from Kafka, and one or more output
blocks which
determine how and where the aggregated statistics are published.
While it is possible to specify multiple input
and output
blocks within
a single configuration file, it is highly recommended to separate each pairing
of input
and output
blocks into separate instances running on different
nodes. This will yield the best performance and provide for better load
balancing, since each instance will be responsible for a single mode of
operation.
Real-time account monitoring
In the real-time account monitoring mode, account records, which are sent from each streaming server through the Kafka message broker, are processed by the account aggregator, and current real-time throughput metrics are updated in a Redis database. These metrics, which are constantly being updated, reflect the most current state of the CDN, and can be used by the Convoy Request Router to make real-time routing decisions.
PCX Reporting
In the PCX collector mode, account records are consumed in such a way that past throughput and session statistics can be aggregated to produce billing related reports. These reports are not considered real-time metrics, but represent usage statistics over fixed time intervals. This mode of operation requires a PCX API compatible reporting endpoint. See Appendix B for additional information regarding the PCX reporting format.
Installation
Prerequisites
The account aggregator is shipped as a compressed OCI formatted container image and as such, it requires a supported container runtime such as one of the following:
- Docker
- Podman
- Kubernetes
Any runtime capable of running a Linux container should work the same. For simplicity, the following installation instructions assume that Docker is being used, and that Docker is already configured and running on the target system.
To test that Docker is setup and running, and that the current user has the required privileges to create a container, you may execute the following command.
$ docker run hello-world
Hello from Docker!
This message shows that your installation appears to be working correctly.
To generate this message, Docker took the following steps:
1. The Docker client contacted the Docker daemon.
2. The Docker daemon pulled the "hello-world" image from the Docker Hub.
(amd64)
3. The Docker daemon created a new container from that image which runs the
executable that produces the output you are currently reading.
4. The Docker daemon streamed that output to the Docker client, which sent it
to your terminal.
To try something more ambitious, you can run an Ubuntu container with:
$ docker run -it ubuntu bash
Share images, automate workflows, and more with a free Docker ID:
https://hub.docker.com/
For more examples and ideas, visit:
https://docs.docker.com/get-started/
If you get a permission denied error, ensure that the current user is a member
of the docker
group or execute all Docker commands under sudo
.
Loading the container image
The container image is delivered as a compressed OCI formatted image, which
can be loaded directly via the docker load
command. The following assumes
that the image is in /tmp/esb3032-acd-aggregator-0.0.0.gz
docker load --input /tmp/esb3032-acd-aggregator-0.0.0.gz
You will now be able to verify that the image was loaded successfully by executing the following and looking for the image name in the output.
$ docker images | grep acd-aggregator
images.edgeware.tv/esb3032-acd-aggregator latest 4bbe28b444d3 1 day ago 2.08GB
Creating the configuration file
The configuration file may be located anywhere on the filesystem, however
it is recommended to keep everything under the /opt/edgeware/acd/aggregator
folder to be consistent with other products under the ACD product family.
If that folder doesn’t already exist, you may create the folder with the
following command.
mkdir -p /opt/edgeware/acd/aggregator
If using a different location, you will need to map the folder to the container while creating the Docker container. Additional information describing how to map the volume is available in the section “Creating and starting the container” below.
The configuration file for the account aggregator is divided into several
sections, input
, output
and tuning
. One or more input
blocks may
be specified to configure from where the data should be sourced. One or
more output
blocks may be configured which determine to where the resulting
aggregated data is published. Finally the tuning
block configures various
global settings for how the account aggregator operates, such as the global
log_level
.
Configuring the input source
As of the current version of the account aggregator, there is only a single
type of input source supported, and that is account_records
. This input
source connects to a Kafka message broker, and consumes account records.
Depending on which output types are configured, the Kafka consumer may either
start by processing the oldest or most recent records first.
The following configuration block sample will be used as an example in the description below.
Note that the key input
is surrounded by double-square-brackets. This is
a syntax element to indicate that there may be multiple input
sections in
the configuration.
[[input]]
type = "account_records"
servers = [
"kafka://192.0.2.1:9092",
"kafka://192.0.2.2:9092",
]
group_name = "acd-aggregator"
kafka_max_poll_interval_ms = 30000
kafka_session_timeout_ms = 3000
log_level = "off"
The type
property is used to determine the type of input, and the only
valid value is account_records
.
The servers
list must contain at least 1 Kafka URL, prefixed with the
URL scheme kafka://
. If not specified, the default Kafka port of 9092
will be used. It is recommended but not required to specify all servers
here, as the Kafka client library will obtain the full list of endpoints
from the server on startup, however, the initial connection will be made
to one or more of the provided URLs.
The group_name
property identifies to which consumer group the aggregator
should belong. Due to the type of data which account records represent, each
instance of the aggregator connecting to the same Kafka message broker
MUST have a unique group name. If two instances belong to the same group,
the data will be partitioned among both instances, and the resulting
aggregations may not be correct. If only a single instance of the account
aggregator is used, this property is optional and defaults to “acd-aggregator”.
The kafka_*
properties, for max_poll_interval
and session_timeout
are
used to tune the connection parameters for the internal Kafka consumer. More
details for these properties can be found in the documentation for the
rdkafka
library. See Kafka documentation
for more details.
The log_level
property configures the logging level for the Kafka library
and supports the values “off”, “trace”, “debug”, “info”, “warn”, and “error”.
By default, logging from this library is disabled. This should only be
enabled for troubleshooting purposes, as it is extremely verbose, and any
warnings or error messages will be repeated in the account aggregator’s log.
The logging level for the Kafka library must be higher then the general
logging level for the aggregator, as defined in the “tuning” section or the
lower-level messages from the Kafka library will be skipped.
Configuring output
The account aggregator currently supports two types of output blocks, depending on the desired mode of operation. For reference purposes, both types will be described within this section, but it is recommended to only use a single type per instance of the account aggregator.
Note that the key output
is surrounded by double-square-brackets. This is
a syntax element to indicate that there may be multiple output
sections in
the configuration.
[[output]]
type = "account_monitor"
redis_servers = [
"redis://192.0.2.7:6379/0",
"redis://:password@192.0.2.8:6379/1",
]
stale_threshold_s = 12
throughput_correction_mbps = 0
minimum_check_interval_ms = 1000
[[output]]
type = "pcx_collector"
report_url = "https://192.0.2.5:8000/v1/collector"
client_id = "edgeware"
secret = "abc123"
report_timeout_ms = 2000
report_interval_s = 30
report_delay_s = 30
Real-time account monitor output
The first output block has the type account_monitor
and represents the
live account monitoring functionality, which publishes per-account bandwidth
metrics to one or more Redis servers. When this type of output
block is
configured, the account records will be consumed starting with the most
recent messages first, and offsets will not be committed. Stopping or
restarting the service may cause account records to be skipped. This type
of output
is suitable for making real-time routing decisions, but should
not be relied upon for critical billing or reporting metrics.
The redis_servers
list consists of URLs to Redis instances which shall be
updated with the current real-time bandwidth metrics. If the Redis instance
requires authentication, the global instance password can be specified as part
of the URL as in the second entry in the list. Since Redis does not support
usernames, anything before the :
in the credentials part of the URL will
be ignored. At least 1 Redis URL must be provided.
The stale_threshold_s
property determines the maximum timeout in seconds,
after which, if no account records have been received for a given host,
the host will be considered stale and removed.
The throughput_correction_mbps
property can be used to add or subtract a
fixed correction factor to the bandwidth reported in Redis. This is specified
in megabits per second, and this may be either positive or negative. If
the value is negative, and the calculated bandwidth is less than the correction
factor, a minimum bandwidth of 0 will be reported.
The minimum_check_interval_ms
property is used to throttle how frequently
the statistics will be processed. By default, the account aggregator will not
recalculate the statistics more than once per second. Setting this value too
low will result in potentially higher CPU usage, while setting it too high may
result in some account records being missed. The default of 1 second should
be adequate for most situations.
PCX Collector output
The pcx_collector
type configures the account aggregator as a reporting
agent for the PCX API. Whenever this configuration is present, the account
record consumer will be configured to always start at the oldest records
retained within the Kafka topic. It then processes the records one at a time,
committing the Kafka offset each time a report is successfully received.
This mode does not make any guarantees as to how recent the data is on which
the reports are made, but does guarantee that every record will be counted
in the aggregated report. Stopping or restarting the service will result in
the account record consumer resuming processing from the last successful
report. This type of reporting is suitable for billing purposes assuming
that there are multiple replicated Kafka nodes, and that the service is not
stopped for longer than the maximum retention period configured within
Kafka. Stopping the service for longer than the retention period will result
in messages being unavailable. Because this type of output
requires that
the Kafka consumer is processed in a specific order, and will not proceed
with reading additional messages until all reports have been successfully
received, it is not recommended to have both pcx_collector
and the
account_monitor
type output
blocks configured within the same instance.
The report_url
property is a single HTTP endpoint URL where the PCX API
can be reached. This property is required and may be either an HTTP or HTTPS
URL. For HTTPS, the validity of the TLS certificate will be enforced, meaning
that self-signed certificates will not be considered valid.
The client_id
and secret
fields are used to authenticate the client with
the PCX API via token-based authentication. These fields are both required,
however if not used by the specific PCX API instance, the empty string ""
may be provided.
The report_timeout_ms
field is an optional maximum timeout for the HTTP
connection to the PCX API before the connection will fail. Failed reports
will be retried indefinitely.
The report_interval_s
property represents the interval bucket size for
reporting metrics. The timing for this type of output
is based solely
on the embedded timestamp
value of the account records, meaning that this
property is not an absolute period on which the reports will be sent, but
instead represents the duration between the start and ending timestamps of
the report. Especially upon startup, reports may be sent much more frequently
than this interval, but will always cover this duration of time.
The report_delay_s
property is an optional offset used to account for both
clock synchronization between servers as well as propagation delay of the
account records through the message broker. The default delay is 30 seconds.
This means that the ending timestamp of a given report will be no more recent
than this many seconds in the past. It is important to include this delay, as
any account records received with a timestamp that would be within period which
has already been reported upon, will be dropped.
Tuning the account aggregator
The tuning
configuration block represents the global properties for tuning
how the account aggregator functions. Currently only one tuning property
can be configured, and that is the log_level
. The default log_level
is
“info”, which should be used in normal operation of the account aggregator,
however, other possible values in order of verbosity include “trace”,
“debug”, “info”, “warn”, “error”, and “off”.
Note that the tuning
key is surrounded by single square-brackets. This is
TOML syntax meaning that only one instance of tuning
is allowed.
[tuning]
log_level = "info"
Example configurations
This section describes some example configuration files which can be used as a starting template depending on which mode of operation is desired.
Real-time account monitoring
This configuration will consume account records from a Kafka server running on 3 hosts, kafka-1, kafka-2, and kafka-3. The account records will be consumed starting with the most recent records. The resulting aggregations will be published to two Redis instances, running on redis-1 and redis-2. The reported bandwidth will have a 2Gb/s correction factor applied.
[[input]]
type = "account_records"
servers = [
"kafka://kafka-1:9092",
"kafka://kafka-2:9092",
"kafka://kafka-3:9092"
]
group_name = "acd-aggregator-live"
# kafka_max_poll_interval_ms = 30000
# kafka_session_timeout_ms = 3000
# log_level = "off"
[[output]]
type = "account_monitor"
redis_servers = [
"redis://redis-1:6379/0",
"redis://redis-2:6379/0",
]
# stale_threshold_s = 12
throughput_correction_mbps = 2000
# minimum_check_interval_ms = 1000
[tuning]
log_level = "info"
The keys prefixed by #
are commented out, since the default values will
be used. They are included in the example for completeness.
PCX collector
This configuration will consume account records starting from the earliest
record, calculate aggregated statistics for every 30 seconds, offset with
a delay of 30 seconds, and publish the results to
https://pcx.example.com/v1/collector
.
[[input]]
type = "account_records"
servers = [
"kafka://kafka-1:9092",
"kafka://kafka-2:9092",
"kafka://kafka-3:9092"
]
group_name = "acd-aggregator-pcx"
# kafka_max_poll_interval_ms = 30000
# kafka_session_timeout_ms = 3000
# log_level = "off"
[[output]]
type = "pcx_collector"
report_url = "https://pcx.example.com/v1/collector"
client_id = "edgeware"
secret = "abc123"
# report_timeout_ms = 2000
# report_interval_s = 30
# report_delay_s = 30
[tuning]
log_level = "info"
The keys prefixed by #
are commented out, since the default values will
be used. They are included in the example for completeness.
Combined PCX collector with real-time account monitoring
While this configuration is possible, it is not recommended, since the
pcx_collector
output
type will force all records to be consumed starting
at the earliest record. This will cause the live statistics to be delayed
until ALL earlier records have been consumed, and reports have been
successfully accepted by the PCX API. This combined role configuration can be
used to minimize the number of servers or services running if the above
limitations are acceptable.
Note: This is simply the combination of the above two output
blocks in the
same configuration file.
[[input]]
type = "account_records"
servers = [
"kafka://kafka-1:9092",
"kafka://kafka-2:9092",
"kafka://kafka-3:9092"
]
group_name = "acd-aggregator-combined"
# kafka_max_poll_interval_ms = 30000
# kafka_session_timeout_ms = 3000
# log_level = "off"
[[output]]
type = "account_monitor"
redis_servers = [
"redis://redis-1:6379/0",
"redis://redis-2:6379/0",
]
# stale_threshold_s = 12
throughput_correction_mbps = 2000
# minimum_check_interval_ms = 1000
[[output]]
type = "pcx_collector"
report_url = "https://pcx.example.com/v1/collector"
client_id = "edgeware"
secret = "abc123"
# report_timeout_ms = 2000
# report_interval_s = 30
# report_delay_s = 30
[tuning]
log_level = "info"
Upgrading
The upgrade procedure for the aggregator consists of simply stopping
the existing container with docker stop acd-aggregator
, removing the
existing container with docker rm acd-aggregator
, and following the
steps in “Creating and starting the container” below with the upgraded
Docker image.
To roll back to a previous version, simply perform the same steps with the
previous image. It is recommended to keep at least one previous image
around until such time that you are satisfied with the new version. After
which, you may remove the previous image with
docker rmi images.edgeware.tv/esb3032-acd-aggregator:1.2.3
where “1.2.3”
represents the previous version number.
Creating and starting the container
Now that the configuration file has been created, and the image has been
loaded, we will need to create and start the container instance. The
following docker run
command will create a new container called
“acd-aggregator”, start the process, and automatically resume the container
once the Docker daemon is loaded at startup.
docker run \
--name "acd-aggregator" \
--detach \
--restart=always \
-v <PATH_TO_CONFIG_FOLDER>:/opt/edgeware/acd/aggregator:ro \
<IMAGE NAME>:<VERSION> \
--config /opt/edgeware/acd/aggregator/aggregator.toml
As an example using version 1.4.0:
docker run \
--name "acd-aggregator" \
--detach \
--restart=always \
-v /opt/edgeware/acd/aggregator:/opt/edgeware/acd/aggregator:ro \
images.edgeware.tv/esb3032-acd-aggregator:1.4.0 \
--config /opt/edgeware/acd/aggregator/aggregator.toml
Note: The image tag in the example is “1.4.0”, you will need to replace
that tag with the image tag loaded from the compressed OCI formatted image
file, which can be obtained by running docker images
and searching for the
account aggregator image as described in the step “Loading the container image”
above.
If the configuration file saved in the previous step was at a different
location from /opt/edgeware/acd/aggregator/aggregator.toml
you will need to
change both the -v
option and the --config
option in the above
command to represent that location. The -v
option mounts the containing
folder from the host system on the left to the corresponding path inside
the container on the right, and the :ro
tells Docker that the volume is
mounted read-only. The --config
should be the absolute path to the
configuration file from INSIDE the container. For example, if you saved
the configuration file as /host/path/config.toml
on the host, and you need
to map that to /container/path/config.toml
within the container, the lines
should be -v /host/path:/container/path:ro
and
--config /container/path/config.toml
respectively.
The --restart=always
line tells Docker to automatically restart the
container when the Docker runtime is loaded, and is the equivalent in
systemd to “enabling” the service.
Starting and Stopping the container
To view the status of the running container, use the docker ps
command.
This will give a line of output for the acd-aggregator
container if it
is currently running. Appending the -a
flag, will list the aggregator
container if is not running as well.
Execute the following:
docker ps -a
You should see a line for the container with the container name “acd-aggregator” along with the current state of the container. If all is OK, you should see the container process running at this point, but it may show as “exited” if there was a problem.
To start and stop the container the docker start acd-aggregator
and
docker stop acd-aggregator
commands can be used.
Viewing the logs
By default, Docker will maintain the logs of the individual containers within its own internal logging subsystem, which requires the user to use the command
docker logs
to view them. It is possible however to configure the Docker daemon to send logs to the system journal, however configuring that is beyond the scope of this document. Additional details describing how to do that are described here
[https://docs.docker.com/config/containers/logging/journald/].
To view the complete log for the aggregator the following command can be used.
docker logs acd-aggregator
Supplying the -f
flag, can be used to “follow” the log until either the
process terminates or CTRL+C is pressed.
docker logs -f acd-aggregator
Appendix A: Real-time account monitoring
Redis key value pairs
Each account will have a single key-value stored in Redis with the current throughput with any correction factor applied, which will be updated in real-time every time all hosts for the given account have received a new account record. This should be approximately every 10 seconds, but may vary slightly due to processing time.
The keys are structured in the following format:
bandwidth:<account>:value
and the value is reported in bits-per-second.
For example for accounts foo
, bar
and baz
we may see the following:
bandwidth:foo:value = 123456789
bandwidth:bar:value = 234567890
bandwidth:baz:value = 102400
These values represent the most current throughput for each account, and will be updated periodically. A TTL of 48 hours is added to the keys, such that they will be pruned automatically after 48 hours since the last update. This is to prevent stale keys from remaining in Redis indefinitely. This TTL is not configurable by the end user.
Appendix B: PCX collector reporting
PCX reporting format
The following is an example of the report sent to the PCX HTTP endpoint.
{
timestamp_begin: 1674165540,
timestamp_end: 1674165570,
writer_id: "writer-1",
traffic: [
Traffic {
account_id: "unknown",
num_ongoing_sessions: 0,
bytes_transmitted: 0,
edges: [
Edge {
server: "orbit-1632",
num_ongoing_sessions: 0,
bytes_transmitted: 0,
},
],
},
Traffic {
account_id: "default",
num_ongoing_sessions: 747,
bytes_transmitted: 75326,
edges: [
Edge {
server: "orbit-1632",
num_ongoing_sessions: 747,
bytes_transmitted: 75326,
},
],
},
],
}
The report can be broken down into 3 parts. The outer root section includes
the starting and stopping timestamps, as well as a writer_id
field which is
currently unused. For each account a Traffic
section contains the
aggregated statistics for that account, as well as a detailed breakdown of
each Edge
. An Edge
is the portion of traffic for the account streamed
by each server. Within an Edge
the num_ongoing_sessions
represents the
peak ongoing sessions during the reporting interval, while the
bytes_transmitted
represents the total egress bandwidth in bytes over the
entire period. For each outer Traffic
section, the num_ongoing_sessions
and bytes_transmitted
represent the sum of the corresponding entries in
all Edges
.
Data protection and consistency
The ACD aggregator works by consuming messages from Kafka. Once a report has successfully been submitted, as determined by a 200 OK HTTP status from the reporting endpoint, the position in the Kafka topic will be committed. This means that if the aggregator process stops and is restarted, reporting will resume from the last successful report, and no data will be lost. There is a limitation to this, however, and that has to do with the data retention time of the messages in Kafka and the TTL value specified in the aggregator configuration. Both default to the same value of 24 hours. This means that if the aggregator process is stopped for more than 24 hours, data loss will result since the source account records will have expired from Kafka before they can be reported on by the aggregator.
Upon startup of the aggregator, all records stored in Kafka will be reported on in the order they are read, starting from either the last successful report or the oldest record currently in Kafka. Reports will be sent each time the timestamp in the current record read from Kafka exceeds the reporting interval meaning a large burst of reports will be sent at startup to cover each interval. Once the aggregator has caught up with the backlog of account records, it will send a single report roughly every 30 seconds (configurable).
It is not recommended to have more than a single account aggregator instance reading from Kafka at a time, as this will result in partial reports being sent to the HTTP endpoint which will require the endpoint to reconstruct the data upon receipt. All redundancy in the account aggregator is handled by the redundancy within Kafka itself. With this in mind, it is important to ensure that there are multiple Kafka instances running and that the aggregator is configured to read from all of them.
2 - Releases
2.1 - Release esb3032-0.2.0
Build date
2022-12-21
Release status
Type: devdrop
Change log
- NEW: Use config file instead of command line switches
- NEW: Reports are now aligned with wall-clock time
- NEW: Reporting time no longer contains gaps in coverage
- FIX: Per-account number of sessions only shows largest host
2.2 - Release esb3032-1.0.0
Build date
2023-02-14
Release status
Type: production
Change log
- NEW: Create user documentation for ACD Aggregator
- NEW: Simplify configuration . Changed from YAML to TOML format.
- NEW: Handle account records arriving late
- FIXED: Aggregator hangs if committing to Kafka delays more than 5 minutes
2.3 - Release esb3032-1.2.1
Build date
2023-04-24
Release status
Type: production
Breaking changes
No breaking changes
Change log
- NEW: Port Account Monitor functionality for Convoy Request Router
- NEW: Aggregator Performance Improvements
- FIXED: Reports lost when restarting acd-aggregator
2.4 - Release esb3032-1.4.0
Build date
2023-09-28
Release status
Type: production
Breaking changes
None
Change log
- NEW: Extend aggregator with additional metrics. Per streamer bandwidth and total bandwidth are now updated in Redis. [ESB3032-98]
- FIXED: Not all Redis instances are updated after a failure [ESB3032-99]
- FIXED: Kafka consumer restarts on Partition EOF [ESB3032-100]