Files
nginx-ipng-stats-plugin/src/ngx_http_ipng_stats_module.c
Pim van Pelt 450391af6b Switch per-device attribution from SO_BINDTODEVICE to IP_PKTINFO
SO_BINDTODEVICE pins both ingress *and* egress to the bound
interface — the kernel uses the listening socket's device
binding when choosing the output interface for the SYN-ACK,
which is sent before accept() returns and therefore can't be
fixed up in userspace. That's fatal for maglev / DSR
deployments where the SYN arrives through a GRE tunnel but the
return path has to leave via the default route; the SYN-ACK
goes out the GRE and is dropped by the uplink, so every new
connection times out.

Rework the listen plumbing so the module never touches
SO_BINDTODEVICE. init_module now enables IP_PKTINFO and
IPV6_RECVPKTINFO on every HTTP listening socket and resolves
each configured `device=` name to an ifindex. At request time
resolve_source calls getsockopt(IP_PKTOPTIONS) on the accepted
fd to read the per-connection in(6)_pktinfo cmsg the kernel
stashed during the handshake, then matches (ifindex, family)
against the bindings table. The listening sockets remain plain
wildcards, so the return path follows the normal routing table
and DSR works.

The wrapper also no longer clones or rebinds sockets: it still
dedups per (cscf, sockaddr) so multiple device-tagged listens
in a single server block coexist, and dedups bindings on
(device, family) so the same device can carry different tags
for v4 and v6 (e.g. tag2-v4 / tag2-v6) but not pointlessly
duplicate when a listen include is shared across server blocks.

Drive-by fixes to unblock `make pkg-deb` after a prior
`make build-asan`:
- debian/rules overrides dh_clean to exclude build/, since
  nginx-asan's install creates nobody:0700 temp dirs dh_clean
  can't traverse.
- Makefile's build-asan removes those unused runtime temp dirs
  so the tree is clean afterwards.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 15:00:46 +02:00

2917 lines
100 KiB
C

/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2026 Pim van Pelt <pim@ipng.ch>
*
* ngx_http_ipng_stats_module — per-VIP, per-device traffic counters for
* nginx hosts receiving traffic on distinct interfaces.
*
* See docs/design.md in the repository for the full design. The short
* version is:
*
* - Attribution is done by the Linux kernel's TCP socket lookup, via
* SO_BINDTODEVICE on per-tunnel listening sockets. Each `listen`
* directive may carry `device=<ifname>` and `ipng_source_tag=<tag>`
* parameter; this module parses them by replacing the stock
* ngx_http_core_module `listen` command handler at preconfig time.
*
* - Counters are maintained per-worker in a private table (no locks,
* no atomics on the request path) and flushed into a shared-memory
* zone via a per-worker timer. The scrape handler reads only from
* the shared zone.
*
* - The scrape handler content-negotiates between Prometheus text and
* JSON output, filtered server-side by optional `?source_tag=` and
* `?vip=` query parameters.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <net/if.h>
#include <sys/socket.h>
#include <arpa/inet.h>
/* The log module's op/format types are private to ngx_http_log_module.c
* (no public header). We duplicate the struct layouts under our own
* names so we can call the compiled log_format ops. The layouts have
* been stable since nginx 0.7.x. */
typedef struct ipng_log_op_s ipng_log_op_t;
typedef u_char *(*ipng_log_op_run_pt)(ngx_http_request_t *r, u_char *buf,
ipng_log_op_t *op);
typedef size_t (*ipng_log_op_getlen_pt)(ngx_http_request_t *r,
uintptr_t data);
struct ipng_log_op_s {
size_t len;
ipng_log_op_getlen_pt getlen;
ipng_log_op_run_pt run;
uintptr_t data;
};
typedef struct {
ngx_str_t name;
ngx_array_t *flushes;
ngx_array_t *ops; /* array of ipng_log_op_t */
} ipng_log_fmt_t;
typedef struct {
ngx_array_t formats; /* array of ipng_log_fmt_t */
ngx_uint_t combined_used;
} ipng_log_main_conf_t;
extern ngx_module_t ngx_http_log_module;
/* NGX_HTTP_IPNG_STATS_VERSION is generated at build time from the
* top-level Makefile's VERSION variable — see `make version-header`. */
#include "version.h"
#define NGX_HTTP_IPNG_STATS_SCHEMA_VERSION 2
/* Default histogram buckets in milliseconds (FR-2.3). */
#define NGX_HTTP_IPNG_STATS_DEFAULT_BUCKETS \
{ 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000 }
#define NGX_HTTP_IPNG_STATS_DEFAULT_BUCKET_COUNT 12
/* Default histogram buckets for request/response sizes, in bytes. */
#define NGX_HTTP_IPNG_STATS_DEFAULT_BYTE_BUCKETS \
{ 100, 1000, 10000, 100000, 1000000, 10000000 }
#define NGX_HTTP_IPNG_STATS_DEFAULT_BYTE_BUCKET_COUNT 6
/* Status-code cardinality reduction: we bin every response into one of six
* classes — 1xx/2xx/3xx/4xx/5xx, plus a catch-all for codes outside
* [100, 599] (NFR-3.3). Operators who need full three-digit breakdown
* should use ipng_stats_logtail (FR-8); the stats zone is deliberately
* narrow. */
#define NGX_HTTP_IPNG_STATS_CODE_MIN 100
#define NGX_HTTP_IPNG_STATS_CODE_MAX 599
#define NGX_HTTP_IPNG_STATS_CLASS_UNKNOWN 0
#define NGX_HTTP_IPNG_STATS_NCLASSES 6
/* A single per-worker slot key. */
typedef struct {
ngx_str_t source; /* points into shared-zone interning table */
ngx_str_t vip; /* ditto */
ngx_uint_t class; /* 0 unknown, 1..5 for 1xx..5xx */
} ngx_http_ipng_stats_key_t;
/* Shared-zone node: stored in an rbtree keyed by a 32-bit hash of the
* (source, vip, code) tuple. Collisions are resolved by walking the
* rbtree's duplicate chain and comparing full keys.
*
* Counter lanes are 64-bit and updated via relaxed atomic fetch_add
* (NFR-1.2). Histogram lanes live in an ngx_palloc'd tail allocation
* whose size is determined by the configured bucket count. */
typedef struct {
ngx_rbtree_node_t rbnode; /* key = hash */
ngx_queue_t lru; /* in-zone LRU for eviction on rename (NFR-4.4) */
ngx_uint_t source_id;
ngx_uint_t vip_id;
ngx_uint_t class; /* 0 unknown, 1..5 for 1xx..5xx */
ngx_atomic_uint_t requests;
ngx_atomic_uint_t bytes_in;
ngx_atomic_uint_t bytes_out;
ngx_atomic_uint_t duration_sum_ms;
ngx_atomic_uint_t upstream_sum_ms;
/* Followed by histogram lanes, in order:
* [0 .. nb] -> request duration (nb = nbuckets)
* [nb+1 .. 2nb+1] -> upstream duration
* [2nb+2 .. 2nb+2+nbb] -> bytes_in (nbb = nbytebuckets)
* [2nb+3+nbb .. 2nb+3+2nbb] -> bytes_out
*/
} ngx_http_ipng_stats_node_t;
/* Per-worker local slot — identical shape to the shared node but without
* atomics. We maintain a small dynamic array of these, plus a "dirty
* list" head pointing into the array, so that the flush tick only walks
* entries touched since the last flush (FR-4.2, NFR-2.2). */
typedef struct ngx_http_ipng_stats_slot_s {
ngx_uint_t hash;
ngx_uint_t source_id;
ngx_uint_t vip_id;
ngx_uint_t class;
/* Deltas since last flush. */
uint64_t requests;
uint64_t bytes_in;
uint64_t bytes_out;
uint64_t duration_sum_ms;
uint64_t upstream_sum_ms;
uint64_t *dhist; /* nbuckets+1 lanes */
uint64_t *uhist;
uint64_t *bin_hist; /* nbytebuckets+1 lanes */
uint64_t *bout_hist;
/* Intrusive "dirty" linked list — dirty_next == NULL and
* !is_dirty_head means "not on the list". */
struct ngx_http_ipng_stats_slot_s *dirty_next;
unsigned dirty:1;
} ngx_http_ipng_stats_slot_t;
/* String interning tables live at the head of the shared-memory zone.
* They're flat arrays of ngx_str_t whose data pointers reference memory
* allocated from the zone's slab pool. Workers look up strings by
* sequential scan — the cardinality is small (tens) so this is fine. */
typedef struct {
ngx_str_t *entries;
ngx_uint_t nelts;
ngx_uint_t nalloc;
} ngx_http_ipng_stats_intern_t;
/* Header of the shared-memory zone. */
typedef struct {
uint32_t magic; /* 0x49504E47 "IPNG" when initialized */
ngx_rbtree_t rbtree;
ngx_rbtree_node_t sentinel;
ngx_queue_t lru;
ngx_http_ipng_stats_intern_t sources;
ngx_http_ipng_stats_intern_t vips;
/* Meta-counters for the plugin itself (FR-6 observability of
* the plugin in the design doc). */
ngx_atomic_uint_t zone_full_events;
ngx_atomic_uint_t flushes_total;
} ngx_http_ipng_stats_shctx_t;
/* Per-device binding recorded by the listen wrapper at config parse
* time. `ifindex` is resolved from `device` at init_module time and
* becomes the lookup key at request time: the log handler reads the
* ingress ifindex from the connection's IP_PKTINFO/IPV6_PKTINFO cmsg
* and matches against this table.
*
* We deliberately don't use SO_BINDTODEVICE on the listening sockets —
* that option pins *egress* to the bound interface too, which breaks
* maglev / DSR deployments where the SYN arrives via a GRE tunnel and
* the SYN-ACK must leave via the default route. See docs/design.md. */
typedef struct {
ngx_str_t device;
ngx_str_t source;
ngx_uint_t ifindex; /* filled at init_module (0 until resolved) */
sa_family_t family; /* AF_INET / AF_INET6 of the listen sockaddr */
} ngx_http_ipng_stats_binding_t;
/* Per-(cscf, sockaddr) tracking used only during config parse. The
* listen wrapper uses it to avoid invoking the core `listen` handler
* twice for the same (server block, sockaddr) pair — a valid config
* with N device-tagged listens on the same port in the same server
* block generates exactly one core-handler call and one cscf
* attachment, with the remaining listens handled by init_module
* cloning. Across server blocks the same sockaddr legitimately recurs,
* and each distinct cscf gets its own core-handler call. */
typedef struct {
void *cscf; /* ngx_http_core_srv_conf_t * */
ngx_sockaddr_t sockaddr;
socklen_t socklen;
} ngx_http_ipng_stats_seen_t;
typedef struct {
ngx_shm_zone_t *shm_zone;
ngx_str_t zone_name;
size_t zone_size;
ngx_msec_t flush_interval;
ngx_str_t default_source;
ngx_uint_t nbuckets;
ngx_uint_t *bucket_bounds_ms; /* len = nbuckets */
ngx_uint_t nbytebuckets;
ngx_uint_t *byte_bucket_bounds; /* len = nbytebuckets, bytes */
ngx_array_t *bindings; /* ngx_http_ipng_stats_binding_t */
ngx_array_t *listens_seen; /* ngx_http_ipng_stats_seen_t */
ngx_flag_t enabled;
/* Global logtail (FR-8) — UDP-only. */
ipng_log_fmt_t *logtail_fmt; /* compiled ops from log_format */
struct sockaddr_in logtail_udp_addr; /* destination address */
size_t logtail_buf_size; /* per-worker buffer, default 64k */
ngx_msec_t logtail_flush; /* max flush interval, default 1s */
ngx_uint_t logtail_if_index; /* if=$var: variable index, 0=none */
} ngx_http_ipng_stats_main_conf_t;
typedef struct {
ngx_flag_t enabled;
ngx_flag_t is_scrape_handler;
} ngx_http_ipng_stats_loc_conf_t;
/* Per-worker runtime state. */
typedef struct {
ngx_http_ipng_stats_slot_t *slots;
ngx_uint_t nslots;
ngx_uint_t nalloc;
ngx_http_ipng_stats_slot_t *dirty_head;
ngx_event_t flush_ev;
ngx_log_t *log;
uint64_t *dhist_arena; /* nslots * (nbuckets+1) u64 */
uint64_t *uhist_arena;
uint64_t *bin_arena; /* nslots * (nbytebuckets+1) u64 */
uint64_t *bout_arena;
/* Global logtail buffer. */
u_char *logtail_buf;
u_char *logtail_pos;
u_char *logtail_end;
ngx_event_t logtail_flush_ev;
ngx_socket_t logtail_udp_fd; /* per-worker UDP socket, or -1 */
} ngx_http_ipng_stats_worker_t;
/* Forward decls. */
static ngx_int_t ngx_http_ipng_stats_preconfig(ngx_conf_t *cf);
static ngx_int_t ngx_http_ipng_stats_postconfig(ngx_conf_t *cf);
static void *ngx_http_ipng_stats_create_main_conf(ngx_conf_t *cf);
static char *ngx_http_ipng_stats_init_main_conf(ngx_conf_t *cf, void *conf);
static void *ngx_http_ipng_stats_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_ipng_stats_merge_loc_conf(ngx_conf_t *cf, void *parent,
void *child);
static char *ngx_http_ipng_stats_zone(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_http_ipng_stats_buckets(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_http_ipng_stats_byte_buckets(ngx_conf_t *cf,
ngx_command_t *cmd, void *conf);
static char *ngx_http_ipng_stats_scrape(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_http_ipng_stats_logtail(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static void ngx_http_ipng_stats_logtail_flush_handler(ngx_event_t *ev);
static void ngx_http_ipng_stats_logtail_flush(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf);
static void ngx_http_ipng_stats_logtail_write(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_http_ipng_stats_worker_t *w);
static char *ngx_http_ipng_stats_listen_wrapper(ngx_conf_t *cf,
ngx_command_t *cmd, void *conf);
static ngx_int_t ngx_http_ipng_stats_init_zone(ngx_shm_zone_t *shm_zone,
void *data);
static ngx_int_t ngx_http_ipng_stats_init_module(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_ipng_stats_init_worker(ngx_cycle_t *cycle);
static void ngx_http_ipng_stats_exit_worker(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_ipng_stats_log_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_ipng_stats_content_handler(ngx_http_request_t *r);
static void ngx_http_ipng_stats_flush_timer(ngx_event_t *ev);
static void ngx_http_ipng_stats_do_flush(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf, ngx_log_t *log);
static ngx_uint_t ngx_http_ipng_stats_bucket_index(ngx_uint_t ms,
ngx_uint_t *bounds, ngx_uint_t nbuckets);
static ngx_uint_t ngx_http_ipng_stats_status_index(ngx_uint_t code);
static ngx_int_t ngx_http_ipng_stats_resolve_source(
ngx_http_request_t *r, ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *source_out);
static ngx_int_t ngx_http_ipng_stats_canonical_vip(ngx_http_request_t *r,
u_char *buf, size_t buflen, ngx_str_t *vip_out);
static ngx_int_t ngx_http_ipng_stats_intern_shared(
ngx_http_ipng_stats_shctx_t *sh, ngx_slab_pool_t *slab,
ngx_http_ipng_stats_intern_t *t, ngx_str_t *s, ngx_uint_t *idx_out);
static ngx_int_t ngx_http_ipng_stats_source_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_ipng_stats_render_prom(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_source, ngx_str_t *filter_vip);
static ngx_int_t ngx_http_ipng_stats_render_json(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_source, ngx_str_t *filter_vip);
/* Module-wide globals. */
static char *(*ngx_http_core_listen_orig)(ngx_conf_t *cf,
ngx_command_t *cmd, void *conf) = NULL;
static ngx_http_ipng_stats_worker_t ngx_http_ipng_stats_worker;
extern ngx_module_t ngx_http_core_module;
/* Directives. */
static ngx_command_t ngx_http_ipng_stats_commands[] = {
{ ngx_string("ipng_stats_zone"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_http_ipng_stats_zone,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
{ ngx_string("ipng_stats_flush_interval"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_ipng_stats_main_conf_t, flush_interval),
NULL },
{ ngx_string("ipng_stats_default_source"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_ipng_stats_main_conf_t, default_source),
NULL },
{ ngx_string("ipng_stats_buckets"),
NGX_HTTP_MAIN_CONF|NGX_CONF_1MORE,
ngx_http_ipng_stats_buckets,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
{ ngx_string("ipng_stats_byte_buckets"),
NGX_HTTP_MAIN_CONF|NGX_CONF_1MORE,
ngx_http_ipng_stats_byte_buckets,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
{ ngx_string("ipng_stats"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF
|NGX_CONF_NOARGS|NGX_CONF_TAKE1,
ngx_http_ipng_stats_scrape,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_ipng_stats_loc_conf_t, enabled),
NULL },
{ ngx_string("ipng_stats_logtail"),
NGX_HTTP_MAIN_CONF|NGX_CONF_2MORE,
ngx_http_ipng_stats_logtail,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
ngx_null_command
};
static ngx_http_module_t ngx_http_ipng_stats_module_ctx = {
ngx_http_ipng_stats_preconfig, /* preconfiguration */
ngx_http_ipng_stats_postconfig, /* postconfiguration */
ngx_http_ipng_stats_create_main_conf, /* create main configuration */
ngx_http_ipng_stats_init_main_conf, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
ngx_http_ipng_stats_create_loc_conf, /* create location configuration */
ngx_http_ipng_stats_merge_loc_conf /* merge location configuration */
};
ngx_module_t ngx_http_ipng_stats_module = {
NGX_MODULE_V1,
&ngx_http_ipng_stats_module_ctx, /* module context */
ngx_http_ipng_stats_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
ngx_http_ipng_stats_init_module, /* init module */
ngx_http_ipng_stats_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
ngx_http_ipng_stats_exit_worker, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
/* ----------------------------------------------------------------- */
/* Preconfig: replace ngx_http_core_module's `listen` handler. */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_preconfig(ngx_conf_t *cf)
{
ngx_command_t *cmd;
/* ngx_http_core_commands is not const in the nginx tree, so we can
* rebind its `set` function pointer here. We only do this once per
* process, the first time preconfig runs; subsequent reloads reuse
* the same wrapper without re-saving the original. */
if (ngx_http_core_listen_orig != NULL) {
return NGX_OK;
}
for (cmd = ngx_http_core_module.commands; cmd->name.len != 0; cmd++) {
if (cmd->name.len == sizeof("listen") - 1
&& ngx_strncmp(cmd->name.data, "listen", 6) == 0)
{
ngx_http_core_listen_orig = cmd->set;
cmd->set = ngx_http_ipng_stats_listen_wrapper;
return NGX_OK;
}
}
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"ipng_stats: could not locate ngx_http_core_module "
"\"listen\" directive to wrap");
return NGX_ERROR;
}
/* The wrapper extracts device= and ipng_source_tag= from cf->args, compacting
* the array in place, then calls the original ngx_http_core_module
* listen handler. After a successful call it records a binding in
* imcf->bindings using the last listen_opt from the current core srv
* conf, to be resolved to an ngx_listening_t* at init_module time. */
static char *
ngx_http_ipng_stats_listen_wrapper(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf)
{
ngx_str_t *value;
ngx_str_t device, source;
ngx_uint_t i, j;
char *rv;
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_ipng_stats_binding_t *b;
ngx_url_t u;
ngx_str_null(&device);
ngx_str_null(&source);
value = cf->args->elts;
i = 1;
while (i < cf->args->nelts) {
if (value[i].len > 7
&& ngx_strncmp(value[i].data, "device=", 7) == 0)
{
device.data = value[i].data + 7;
device.len = value[i].len - 7;
for (j = i; j + 1 < cf->args->nelts; j++) {
value[j] = value[j + 1];
}
cf->args->nelts--;
continue;
}
if (value[i].len > 16
&& ngx_strncmp(value[i].data, "ipng_source_tag=", 16) == 0)
{
source.data = value[i].data + 16;
source.len = value[i].len - 16;
for (j = i; j + 1 < cf->args->nelts; j++) {
value[j] = value[j + 1];
}
cf->args->nelts--;
continue;
}
i++;
}
if (device.len == 0 && source.len == 0) {
/* Plain listen with no module-specific parameters — let nginx
* handle it end-to-end. */
return ngx_http_core_listen_orig(cf, cmd, conf);
}
/* Parse the listen address ourselves so we can dedup per (cscf,
* sockaddr) — nginx's core handler rejects the same sockaddr
* appearing twice in the same server block. */
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1];
u.listen = 1;
u.default_port = 80;
if (ngx_parse_url(cf->pool, &u) != NGX_OK || u.naddrs == 0) {
return ngx_http_core_listen_orig(cf, cmd, conf);
}
imcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_ipng_stats_module);
if (imcf->bindings == NULL) {
imcf->bindings = ngx_array_create(cf->pool, 8,
sizeof(ngx_http_ipng_stats_binding_t));
if (imcf->bindings == NULL) {
return NGX_CONF_ERROR;
}
}
if (imcf->listens_seen == NULL) {
imcf->listens_seen = ngx_array_create(cf->pool, 16,
sizeof(ngx_http_ipng_stats_seen_t));
if (imcf->listens_seen == NULL) {
return NGX_CONF_ERROR;
}
}
/* Skip the core handler when this (cscf, sockaddr) pair was
* already processed — matches nginx's own "duplicate listen"
* check and lets a server block carry multiple device-tagged
* listens at the same port. Across different server blocks the
* same sockaddr re-appears and nginx merges the cscf via
* ngx_http_add_server. */
void *cscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_core_module);
ngx_http_ipng_stats_seen_t *seen = imcf->listens_seen->elts;
ngx_uint_t same_cscf_sockaddr = 0;
for (i = 0; i < imcf->listens_seen->nelts; i++) {
if (seen[i].cscf != cscf) continue;
if (seen[i].socklen != u.addrs[0].socklen) continue;
if (ngx_cmp_sockaddr((struct sockaddr *) &seen[i].sockaddr,
seen[i].socklen,
u.addrs[0].sockaddr,
u.addrs[0].socklen, 1) == NGX_OK)
{
same_cscf_sockaddr = 1;
break;
}
}
if (!same_cscf_sockaddr) {
rv = ngx_http_core_listen_orig(cf, cmd, conf);
if (rv != NGX_CONF_OK) {
return rv;
}
ngx_http_ipng_stats_seen_t *s = ngx_array_push(imcf->listens_seen);
if (s == NULL) return NGX_CONF_ERROR;
s->cscf = cscf;
s->socklen = u.addrs[0].socklen;
ngx_memcpy(&s->sockaddr, u.addrs[0].sockaddr, u.addrs[0].socklen);
}
/* Dedup bindings on (device, family) pair. Same device appearing
* under multiple server blocks (because they share a listen
* include) collapses to a single binding per family. The same
* device can still carry different source tags for IPv4 vs IPv6
* — the resolver looks up on (ifindex, family). */
sa_family_t fam = u.addrs[0].sockaddr->sa_family;
if (device.len > 0) {
ngx_http_ipng_stats_binding_t *existing = imcf->bindings->elts;
for (i = 0; i < imcf->bindings->nelts; i++) {
if (existing[i].family == fam
&& existing[i].device.len == device.len
&& ngx_memcmp(existing[i].device.data, device.data,
device.len) == 0)
{
return NGX_CONF_OK;
}
}
}
b = ngx_array_push(imcf->bindings);
if (b == NULL) return NGX_CONF_ERROR;
ngx_memzero(b, sizeof(*b));
b->family = fam;
if (device.len > 0) {
b->device.data = ngx_pnalloc(cf->pool, device.len);
if (b->device.data == NULL) return NGX_CONF_ERROR;
ngx_memcpy(b->device.data, device.data, device.len);
b->device.len = device.len;
}
if (source.len > 0) {
b->source.data = ngx_pnalloc(cf->pool, source.len);
if (b->source.data == NULL) return NGX_CONF_ERROR;
ngx_memcpy(b->source.data, source.data, source.len);
b->source.len = source.len;
} else if (device.len > 0) {
/* FR-1.4: default source = device name. */
b->source = b->device;
}
return NGX_CONF_OK;
}
/* ----------------------------------------------------------------- */
/* Config create/merge/init */
/* ----------------------------------------------------------------- */
static void *
ngx_http_ipng_stats_create_main_conf(ngx_conf_t *cf)
{
ngx_http_ipng_stats_main_conf_t *imcf;
imcf = ngx_pcalloc(cf->pool, sizeof(*imcf));
if (imcf == NULL) {
return NULL;
}
imcf->flush_interval = NGX_CONF_UNSET_MSEC;
imcf->nbuckets = 0;
imcf->bucket_bounds_ms = NULL;
imcf->nbytebuckets = 0;
imcf->byte_bucket_bounds = NULL;
imcf->enabled = NGX_CONF_UNSET;
return imcf;
}
static char *
ngx_http_ipng_stats_init_main_conf(ngx_conf_t *cf, void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
static const ngx_uint_t default_bounds[] = NGX_HTTP_IPNG_STATS_DEFAULT_BUCKETS;
ngx_uint_t i;
ngx_conf_init_msec_value(imcf->flush_interval, 1000);
if (imcf->flush_interval < 100) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"ipng_stats_flush_interval must be at least 100ms");
return NGX_CONF_ERROR;
}
if (imcf->default_source.len == 0) {
ngx_str_set(&imcf->default_source, "direct");
}
if (imcf->nbuckets == 0) {
imcf->nbuckets = NGX_HTTP_IPNG_STATS_DEFAULT_BUCKET_COUNT;
imcf->bucket_bounds_ms = ngx_palloc(cf->pool,
imcf->nbuckets * sizeof(ngx_uint_t));
if (imcf->bucket_bounds_ms == NULL) {
return NGX_CONF_ERROR;
}
for (i = 0; i < imcf->nbuckets; i++) {
imcf->bucket_bounds_ms[i] = default_bounds[i];
}
}
if (imcf->nbytebuckets == 0) {
static const ngx_uint_t default_byte_bounds[] =
NGX_HTTP_IPNG_STATS_DEFAULT_BYTE_BUCKETS;
imcf->nbytebuckets = NGX_HTTP_IPNG_STATS_DEFAULT_BYTE_BUCKET_COUNT;
imcf->byte_bucket_bounds = ngx_palloc(cf->pool,
imcf->nbytebuckets * sizeof(ngx_uint_t));
if (imcf->byte_bucket_bounds == NULL) {
return NGX_CONF_ERROR;
}
for (i = 0; i < imcf->nbytebuckets; i++) {
imcf->byte_bucket_bounds[i] = default_byte_bounds[i];
}
}
if (imcf->enabled == NGX_CONF_UNSET) {
imcf->enabled = 1;
}
/* logtail_fmt, logtail_udp_addr, logtail_buf_size, logtail_flush
* are set by ipng_stats_logtail if the directive is present; they
* default to NULL/0 from pcalloc. */
return NGX_CONF_OK;
}
static void *
ngx_http_ipng_stats_create_loc_conf(ngx_conf_t *cf)
{
ngx_http_ipng_stats_loc_conf_t *ilcf;
ilcf = ngx_pcalloc(cf->pool, sizeof(*ilcf));
if (ilcf == NULL) {
return NULL;
}
ilcf->enabled = NGX_CONF_UNSET;
ilcf->is_scrape_handler = NGX_CONF_UNSET;
return ilcf;
}
static char *
ngx_http_ipng_stats_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_ipng_stats_loc_conf_t *prev = parent;
ngx_http_ipng_stats_loc_conf_t *conf = child;
ngx_conf_merge_value(conf->enabled, prev->enabled, 1);
ngx_conf_merge_value(conf->is_scrape_handler, prev->is_scrape_handler, 0);
/* Scrape-handler locations never count themselves (FR-5.5). */
if (conf->is_scrape_handler) {
conf->enabled = 0;
}
return NGX_CONF_OK;
}
/* ipng_stats_zone name:size */
static char *
ngx_http_ipng_stats_zone(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
ngx_str_t *value;
u_char *p;
ssize_t size;
ngx_str_t name;
value = cf->args->elts;
p = (u_char *) ngx_strchr(value[1].data, ':');
if (p == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid ipng_stats_zone \"%V\"; expected name:size",
&value[1]);
return NGX_CONF_ERROR;
}
name.data = value[1].data;
name.len = p - value[1].data;
p++;
size = ngx_parse_size(&(ngx_str_t){ .data = p,
.len = value[1].len - (p - value[1].data) });
if (size == NGX_ERROR || size < (ssize_t) (64 * 1024)) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_zone \"%V\" size is too small",
&value[1]);
return NGX_CONF_ERROR;
}
imcf->zone_name = name;
imcf->zone_size = size;
imcf->shm_zone = ngx_shared_memory_add(cf, &name, size,
&ngx_http_ipng_stats_module);
if (imcf->shm_zone == NULL) {
return NGX_CONF_ERROR;
}
imcf->shm_zone->init = ngx_http_ipng_stats_init_zone;
imcf->shm_zone->data = imcf;
return NGX_CONF_OK;
}
/* ipng_stats_buckets <ms> <ms> ... */
static char *
ngx_http_ipng_stats_buckets(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
ngx_str_t *value;
ngx_uint_t i, prev, n;
value = cf->args->elts;
n = cf->args->nelts - 1;
if (n < 1) {
return "requires at least one bucket boundary";
}
imcf->bucket_bounds_ms = ngx_palloc(cf->pool, n * sizeof(ngx_uint_t));
if (imcf->bucket_bounds_ms == NULL) {
return NGX_CONF_ERROR;
}
imcf->nbuckets = n;
prev = 0;
for (i = 0; i < n; i++) {
ngx_int_t v = ngx_atoi(value[i + 1].data, value[i + 1].len);
if (v == NGX_ERROR || v <= 0 || (ngx_uint_t) v <= prev) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_buckets values must be strictly increasing positive "
"integers; got \"%V\"", &value[i + 1]);
return NGX_CONF_ERROR;
}
imcf->bucket_bounds_ms[i] = (ngx_uint_t) v;
prev = (ngx_uint_t) v;
}
return NGX_CONF_OK;
}
/* ipng_stats_byte_buckets <bytes> <bytes> ... */
static char *
ngx_http_ipng_stats_byte_buckets(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
ngx_str_t *value;
ngx_uint_t i, prev, n;
value = cf->args->elts;
n = cf->args->nelts - 1;
if (n < 1) {
return "requires at least one bucket boundary";
}
imcf->byte_bucket_bounds = ngx_palloc(cf->pool, n * sizeof(ngx_uint_t));
if (imcf->byte_bucket_bounds == NULL) {
return NGX_CONF_ERROR;
}
imcf->nbytebuckets = n;
prev = 0;
for (i = 0; i < n; i++) {
ssize_t v = ngx_parse_size(&value[i + 1]);
if (v == NGX_ERROR || v <= 0 || (ngx_uint_t) v <= prev) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_byte_buckets values must be strictly increasing "
"positive sizes; got \"%V\"", &value[i + 1]);
return NGX_CONF_ERROR;
}
imcf->byte_bucket_bounds[i] = (ngx_uint_t) v;
prev = (ngx_uint_t) v;
}
return NGX_CONF_OK;
}
static char *
ngx_http_ipng_stats_scrape(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_ipng_stats_loc_conf_t *ilcf = conf;
ngx_http_core_loc_conf_t *clcf;
ngx_str_t *value;
/* Argument-less form inside a location block turns that location
* into the scrape handler. */
if (cf->args->nelts == 1) {
clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
clcf->handler = ngx_http_ipng_stats_content_handler;
ilcf->is_scrape_handler = 1;
ilcf->enabled = 0;
return NGX_CONF_OK;
}
/* With an on/off argument, toggles counting in the current context. */
value = cf->args->elts;
if (value[1].len == 2 && ngx_strncmp(value[1].data, "on", 2) == 0) {
ilcf->enabled = 1;
return NGX_CONF_OK;
}
if (value[1].len == 3 && ngx_strncmp(value[1].data, "off", 3) == 0) {
ilcf->enabled = 0;
return NGX_CONF_OK;
}
return "expects \"on\", \"off\", or no argument";
}
/* ipng_stats_logtail <format_name> udp://host:port [buffer=<size>] [flush=<duration>]
*
* Enables a global access log that fires for every request via our
* log-phase handler, using a compiled log_format looked up by name
* from ngx_http_log_module. Per-worker buffered UDP datagrams. */
static char *
ngx_http_ipng_stats_logtail(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
ipng_log_main_conf_t *lmcf;
ipng_log_fmt_t *fmt;
ngx_str_t *value;
ngx_uint_t i;
ssize_t buf_size;
ngx_msec_t flush_ms;
if (imcf->logtail_fmt != NULL) {
return "is duplicate";
}
value = cf->args->elts;
/* Look up the named log_format in ngx_http_log_module. */
lmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_log_module);
if (lmcf == NULL) {
return "ngx_http_log_module is not available";
}
fmt = lmcf->formats.elts;
imcf->logtail_fmt = NULL;
for (i = 0; i < lmcf->formats.nelts; i++) {
if (fmt[i].name.len == value[1].len
&& ngx_strncmp(fmt[i].name.data, value[1].data, value[1].len) == 0)
{
imcf->logtail_fmt = &fmt[i];
break;
}
}
if (imcf->logtail_fmt == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: unknown log_format \"%V\"",
&value[1]);
return NGX_CONF_ERROR;
}
/* Destination: udp://host:port (only UDP is supported). */
if (value[2].len <= 6
|| ngx_strncmp(value[2].data, "udp://", 6) != 0)
{
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: destination must be udp://host:port, "
"got \"%V\"", &value[2]);
return NGX_CONF_ERROR;
}
{
/* Parse udp://host:port */
u_char *colon;
ngx_str_t host_str, port_str;
ngx_int_t port;
struct in_addr addr;
host_str.data = value[2].data + 6;
host_str.len = value[2].len - 6;
colon = ngx_strlchr(host_str.data, host_str.data + host_str.len, ':');
if (colon == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: udp:// requires host:port, got \"%V\"",
&value[2]);
return NGX_CONF_ERROR;
}
port_str.data = colon + 1;
port_str.len = host_str.len - (size_t)(colon + 1 - host_str.data);
host_str.len = (size_t)(colon - host_str.data);
port = ngx_atoi(port_str.data, port_str.len);
if (port < 1 || port > 65535) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: invalid UDP port in \"%V\"", &value[2]);
return NGX_CONF_ERROR;
}
/* Resolve host — only literal IPv4 for simplicity. */
u_char tmp[16];
if (host_str.len >= sizeof(tmp)) {
return "udp:// host too long";
}
ngx_memcpy(tmp, host_str.data, host_str.len);
tmp[host_str.len] = '\0';
if (inet_aton((char *) tmp, &addr) == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: invalid IPv4 address in \"%V\"",
&value[2]);
return NGX_CONF_ERROR;
}
ngx_memzero(&imcf->logtail_udp_addr, sizeof(struct sockaddr_in));
imcf->logtail_udp_addr.sin_family = AF_INET;
imcf->logtail_udp_addr.sin_port = htons((in_port_t) port);
imcf->logtail_udp_addr.sin_addr = addr;
}
/* Defaults. */
buf_size = 64 * 1024;
flush_ms = 1000;
/* Parse optional key=value parameters. */
for (i = 3; i < cf->args->nelts; i++) {
if (value[i].len > 7
&& ngx_strncmp(value[i].data, "buffer=", 7) == 0)
{
ngx_str_t s = { value[i].len - 7, value[i].data + 7 };
buf_size = ngx_parse_size(&s);
if (buf_size == NGX_ERROR || buf_size < 1024) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: invalid buffer size \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
continue;
}
if (value[i].len > 6
&& ngx_strncmp(value[i].data, "flush=", 6) == 0)
{
ngx_str_t s = { value[i].len - 6, value[i].data + 6 };
flush_ms = ngx_parse_time(&s, 0);
if (flush_ms == (ngx_msec_t) NGX_ERROR || flush_ms < 100) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: invalid flush interval \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
continue;
}
if (value[i].len > 3
&& ngx_strncmp(value[i].data, "if=", 3) == 0)
{
ngx_str_t var_name = { value[i].len - 3, value[i].data + 3 };
if (var_name.len < 2 || var_name.data[0] != '$') {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: if= requires a $variable, "
"got \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}
var_name.data++;
var_name.len--;
imcf->logtail_if_index = ngx_http_get_variable_index(cf,
&var_name);
if (imcf->logtail_if_index == (ngx_uint_t) NGX_ERROR) {
return NGX_CONF_ERROR;
}
/* Shift from 0-based to 1-based so 0 means "not set". */
imcf->logtail_if_index++;
continue;
}
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: unknown parameter \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}
imcf->logtail_buf_size = (size_t) buf_size;
imcf->logtail_flush = flush_ms;
return NGX_CONF_OK;
}
/* ----------------------------------------------------------------- */
/* Postconfig: install log-phase handler */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_postconfig(ngx_conf_t *cf)
{
ngx_http_handler_pt *h;
ngx_http_core_main_conf_t *cmcf;
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_variable_t *var;
ngx_str_t v_source = ngx_string("ipng_source_tag");
/* Register $ipng_source_tag unconditionally — it's useful in
* log_format, map, add_header, etc. even when ipng_stats_zone
* isn't configured. For the VIP address, operators use nginx's
* built-in $server_addr, which is functionally identical. */
var = ngx_http_add_variable(cf, &v_source, NGX_HTTP_VAR_NOCACHEABLE);
if (var == NULL) {
return NGX_ERROR;
}
var->get_handler = ngx_http_ipng_stats_source_variable;
imcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_ipng_stats_module);
/* Loading the module into a config that doesn't use it must be a
* no-op — operators may install the Debian package on hosts where
* they haven't yet added `ipng_stats_zone`. In that case we simply
* skip installing the log handler and sit idle. */
if (imcf->shm_zone == NULL) {
return NGX_OK;
}
cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);
h = ngx_array_push(&cmcf->phases[NGX_HTTP_LOG_PHASE].handlers);
if (h == NULL) {
return NGX_ERROR;
}
*h = ngx_http_ipng_stats_log_handler;
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* Shared-memory zone init */
/* ----------------------------------------------------------------- */
static void
ngx_http_ipng_stats_rbtree_insert(ngx_rbtree_node_t *temp,
ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
{
/* Plain hash-keyed rbtree insertion; duplicate keys (hash
* collisions) are resolved by walking sibling chains at lookup
* time. */
for ( ;; ) {
if (node->key < temp->key) {
if (temp->left == sentinel) {
temp->left = node;
break;
}
temp = temp->left;
} else {
if (temp->right == sentinel) {
temp->right = node;
break;
}
temp = temp->right;
}
}
node->parent = temp;
node->left = sentinel;
node->right = sentinel;
ngx_rbt_red(node);
}
static ngx_int_t
ngx_http_ipng_stats_init_zone(ngx_shm_zone_t *shm_zone, void *data)
{
ngx_http_ipng_stats_main_conf_t *imcf = shm_zone->data;
ngx_slab_pool_t *slab;
ngx_http_ipng_stats_shctx_t *sh;
slab = (ngx_slab_pool_t *) shm_zone->shm.addr;
/* On reload, nginx may or may not set shm.exists depending on
* dynamic-module lifecycle subtleties. As a belt-and-suspenders
* check, also look at slab->data: if our shctx is already pinned
* there from a previous cycle, the zone is reusable regardless
* of what shm.exists says. */
sh = slab->data;
if (sh != NULL && sh->magic == 0x49504E47u /* "IPNG" */) {
ngx_log_error(NGX_LOG_NOTICE, shm_zone->shm.log, 0,
"ipng_stats: init_zone: reusing existing zone "
"(sources=%ui, vips=%ui)",
sh->sources.nelts, sh->vips.nelts);
shm_zone->data = sh;
return NGX_OK;
}
sh = ngx_slab_alloc(slab, sizeof(*sh));
if (sh == NULL) {
return NGX_ERROR;
}
ngx_memzero(sh, sizeof(*sh));
sh->magic = 0x49504E47u; /* "IPNG" */
slab->data = sh;
ngx_rbtree_init(&sh->rbtree, &sh->sentinel,
ngx_http_ipng_stats_rbtree_insert);
ngx_queue_init(&sh->lru);
sh->sources.nalloc = 16;
sh->sources.entries = ngx_slab_alloc(slab,
sh->sources.nalloc * sizeof(ngx_str_t));
if (sh->sources.entries == NULL) {
return NGX_ERROR;
}
sh->sources.nelts = 0;
sh->vips.nalloc = 64;
sh->vips.entries = ngx_slab_alloc(slab,
sh->vips.nalloc * sizeof(ngx_str_t));
if (sh->vips.entries == NULL) {
return NGX_ERROR;
}
sh->vips.nelts = 0;
imcf->shm_zone->data = sh;
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* init_module: rebind listen sockets with SO_BINDTODEVICE */
/* ----------------------------------------------------------------- */
/* init_module: enable IP_PKTINFO / IPV6_RECVPKTINFO on every HTTP
* listening socket so that each accepted TCP connection carries an
* IP_PKTINFO/IPV6_PKTINFO cmsg the log handler can retrieve via
* getsockopt(IP_PKTOPTIONS). Resolve each configured device= name
* to an ifindex once, up front, since the log handler's attribution
* lookup is ifindex-keyed on the hot path.
*
* Crucially the listening sockets themselves are left as plain
* wildcards — no SO_BINDTODEVICE, no extra sockets — so the kernel
* returns outgoing packets via the normal routing table. This is
* what makes DSR / maglev deployments work: SYN arrives through a
* GRE tunnel, SYN-ACK leaves via the default route. */
static ngx_int_t
ngx_http_ipng_stats_init_module(ngx_cycle_t *cycle)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_ipng_stats_binding_t *bindings;
ngx_listening_t *ls;
ngx_uint_t i;
int one = 1;
char devname[IFNAMSIZ];
size_t dlen;
imcf = ngx_http_cycle_get_module_main_conf(cycle,
ngx_http_ipng_stats_module);
if (imcf == NULL || imcf->bindings == NULL) {
return NGX_OK;
}
bindings = imcf->bindings->elts;
for (i = 0; i < imcf->bindings->nelts; i++) {
if (bindings[i].device.len == 0) continue;
dlen = bindings[i].device.len < IFNAMSIZ - 1
? bindings[i].device.len : IFNAMSIZ - 1;
ngx_memcpy(devname, bindings[i].device.data, dlen);
devname[dlen] = '\0';
bindings[i].ifindex = if_nametoindex(devname);
if (bindings[i].ifindex == 0) {
ngx_log_error(NGX_LOG_WARN, cycle->log, ngx_errno,
"ipng_stats: if_nametoindex(\"%s\") failed — "
"traffic via that interface will fall back to "
"the default source", devname);
continue;
}
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"ipng_stats: device \"%V\" -> ifindex %ui "
"(source=\"%V\")",
&bindings[i].device, bindings[i].ifindex,
&bindings[i].source);
}
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
if (ls[i].fd == (ngx_socket_t) -1) continue;
if (ls[i].sockaddr == NULL) continue;
if (ls[i].type != SOCK_STREAM) continue;
if (ls[i].sockaddr->sa_family == AF_INET) {
if (setsockopt(ls[i].fd, IPPROTO_IP, IP_PKTINFO,
&one, sizeof(one)) == -1)
{
ngx_log_error(NGX_LOG_WARN, cycle->log, ngx_errno,
"ipng_stats: setsockopt(IP_PKTINFO) failed "
"on listen fd %d", (int) ls[i].fd);
}
#if (NGX_HAVE_INET6)
} else if (ls[i].sockaddr->sa_family == AF_INET6) {
if (setsockopt(ls[i].fd, IPPROTO_IPV6, IPV6_RECVPKTINFO,
&one, sizeof(one)) == -1)
{
ngx_log_error(NGX_LOG_WARN, cycle->log, ngx_errno,
"ipng_stats: setsockopt(IPV6_RECVPKTINFO) "
"failed on listen fd %d", (int) ls[i].fd);
}
#endif
}
}
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* Worker init/exit and flush timer */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_init_worker(ngx_cycle_t *cycle)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_ipng_stats_worker_t *w = &ngx_http_ipng_stats_worker;
size_t arena_bytes;
if (ngx_process != NGX_PROCESS_WORKER
&& ngx_process != NGX_PROCESS_SINGLE)
{
return NGX_OK;
}
imcf = ngx_http_cycle_get_module_main_conf(cycle,
ngx_http_ipng_stats_module);
if (imcf == NULL || imcf->shm_zone == NULL) {
return NGX_OK;
}
w->log = cycle->log;
w->nalloc = 256;
w->nslots = 0;
w->dirty_head = NULL;
w->slots = ngx_pcalloc(cycle->pool,
w->nalloc * sizeof(ngx_http_ipng_stats_slot_t));
if (w->slots == NULL) {
return NGX_ERROR;
}
arena_bytes = w->nalloc * (imcf->nbuckets + 1) * sizeof(uint64_t);
w->dhist_arena = ngx_pcalloc(cycle->pool, arena_bytes);
w->uhist_arena = ngx_pcalloc(cycle->pool, arena_bytes);
if (w->dhist_arena == NULL || w->uhist_arena == NULL) {
return NGX_ERROR;
}
arena_bytes = w->nalloc * (imcf->nbytebuckets + 1) * sizeof(uint64_t);
w->bin_arena = ngx_pcalloc(cycle->pool, arena_bytes);
w->bout_arena = ngx_pcalloc(cycle->pool, arena_bytes);
if (w->bin_arena == NULL || w->bout_arena == NULL) {
return NGX_ERROR;
}
/* Schedule first flush tick. */
ngx_memzero(&w->flush_ev, sizeof(w->flush_ev));
w->flush_ev.handler = ngx_http_ipng_stats_flush_timer;
w->flush_ev.log = cycle->log;
w->flush_ev.data = w;
w->flush_ev.cancelable = 1;
ngx_add_timer(&w->flush_ev, imcf->flush_interval);
/* Logtail buffer + flush timer (UDP only). */
w->logtail_udp_fd = (ngx_socket_t) -1;
if (imcf->logtail_fmt != NULL) {
w->logtail_buf = ngx_palloc(cycle->pool, imcf->logtail_buf_size);
if (w->logtail_buf == NULL) {
return NGX_ERROR;
}
w->logtail_pos = w->logtail_buf;
w->logtail_end = w->logtail_buf + imcf->logtail_buf_size;
/* Open per-worker UDP socket. */
w->logtail_udp_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (w->logtail_udp_fd == (ngx_socket_t) -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: socket(SOCK_DGRAM) failed");
return NGX_ERROR;
}
ngx_memzero(&w->logtail_flush_ev, sizeof(w->logtail_flush_ev));
w->logtail_flush_ev.handler = ngx_http_ipng_stats_logtail_flush_handler;
w->logtail_flush_ev.log = cycle->log;
w->logtail_flush_ev.data = w;
w->logtail_flush_ev.cancelable = 1;
ngx_add_timer(&w->logtail_flush_ev, imcf->logtail_flush);
}
return NGX_OK;
}
static void
ngx_http_ipng_stats_exit_worker(ngx_cycle_t *cycle)
{
ngx_http_ipng_stats_worker_t *w = &ngx_http_ipng_stats_worker;
ngx_http_ipng_stats_main_conf_t *imcf;
if (w->flush_ev.timer_set) {
ngx_del_timer(&w->flush_ev);
}
/* Flush logtail buffer before the worker exits. */
imcf = ngx_http_cycle_get_module_main_conf(cycle,
ngx_http_ipng_stats_module);
if (imcf != NULL) {
ngx_http_ipng_stats_logtail_flush(w, imcf);
}
if (w->logtail_flush_ev.timer_set) {
ngx_del_timer(&w->logtail_flush_ev);
}
if (w->logtail_udp_fd != (ngx_socket_t) -1) {
close(w->logtail_udp_fd);
w->logtail_udp_fd = (ngx_socket_t) -1;
}
}
static void
ngx_http_ipng_stats_flush_timer(ngx_event_t *ev)
{
ngx_http_ipng_stats_worker_t *w = ev->data;
ngx_http_ipng_stats_main_conf_t *imcf;
imcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
ngx_http_ipng_stats_module);
if (imcf == NULL) {
return;
}
ngx_http_ipng_stats_do_flush(w, imcf, ev->log);
if (!ngx_exiting && !ngx_quit) {
ngx_add_timer(&w->flush_ev, imcf->flush_interval);
}
}
static void
ngx_http_ipng_stats_do_flush(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf, ngx_log_t *log)
{
ngx_http_ipng_stats_shctx_t *sh;
ngx_slab_pool_t *slab;
ngx_http_ipng_stats_slot_t *slot, *next;
ngx_http_ipng_stats_node_t *n;
ngx_rbtree_node_t *rb;
ngx_uint_t i;
ngx_uint_t nbuckets, nbytebuckets;
size_t node_size;
ngx_atomic_uint_t *shared_lanes;
if (imcf->shm_zone == NULL) {
return;
}
slab = (ngx_slab_pool_t *) imcf->shm_zone->shm.addr;
sh = imcf->shm_zone->data;
nbuckets = imcf->nbuckets;
nbytebuckets = imcf->nbytebuckets;
node_size = sizeof(ngx_http_ipng_stats_node_t)
+ 2 * (nbuckets + 1) * sizeof(ngx_atomic_uint_t)
+ 2 * (nbytebuckets + 1) * sizeof(ngx_atomic_uint_t);
ngx_shmtx_lock(&slab->mutex);
(void) ngx_atomic_fetch_add(&sh->flushes_total, 1);
for (slot = w->dirty_head; slot != NULL; slot = next) {
next = slot->dirty_next;
slot->dirty_next = NULL;
slot->dirty = 0;
/* Find or insert the shared-zone node for this hash. */
rb = sh->rbtree.root;
n = NULL;
while (rb != &sh->sentinel) {
if (slot->hash < rb->key) {
rb = rb->left;
continue;
}
if (slot->hash > rb->key) {
rb = rb->right;
continue;
}
/* hash match: verify full key. */
n = (ngx_http_ipng_stats_node_t *) rb;
if (n->source_id == slot->source_id
&& n->vip_id == slot->vip_id
&& n->class == slot->class)
{
break;
}
/* collision — walk right. */
rb = rb->right;
n = NULL;
}
if (n == NULL) {
n = ngx_slab_calloc_locked(slab, node_size);
if (n == NULL) {
(void) ngx_atomic_fetch_add(&sh->zone_full_events, 1);
/* Drop this slot's dirty deltas; they will reaccumulate
* on the next request if the operator resizes the zone. */
slot->requests = 0;
slot->bytes_in = 0;
slot->bytes_out = 0;
slot->duration_sum_ms = 0;
slot->upstream_sum_ms = 0;
for (i = 0; i <= nbuckets; i++) {
slot->dhist[i] = 0;
slot->uhist[i] = 0;
}
for (i = 0; i <= nbytebuckets; i++) {
slot->bin_hist[i] = 0;
slot->bout_hist[i] = 0;
}
continue;
}
n->rbnode.key = slot->hash;
n->source_id = slot->source_id;
n->vip_id = slot->vip_id;
n->class = slot->class;
ngx_rbtree_insert(&sh->rbtree, &n->rbnode);
ngx_queue_insert_tail(&sh->lru, &n->lru);
}
(void) ngx_atomic_fetch_add(&n->requests, slot->requests);
(void) ngx_atomic_fetch_add(&n->bytes_in, slot->bytes_in);
(void) ngx_atomic_fetch_add(&n->bytes_out, slot->bytes_out);
(void) ngx_atomic_fetch_add(&n->duration_sum_ms, slot->duration_sum_ms);
(void) ngx_atomic_fetch_add(&n->upstream_sum_ms, slot->upstream_sum_ms);
shared_lanes = (ngx_atomic_uint_t *) (n + 1);
for (i = 0; i <= nbuckets; i++) {
if (slot->dhist[i]) {
(void) ngx_atomic_fetch_add(&shared_lanes[i], slot->dhist[i]);
}
if (slot->uhist[i]) {
(void) ngx_atomic_fetch_add(
&shared_lanes[nbuckets + 1 + i], slot->uhist[i]);
}
}
{
ngx_atomic_uint_t *bin_lanes = shared_lanes + 2 * (nbuckets + 1);
ngx_atomic_uint_t *bout_lanes = bin_lanes + (nbytebuckets + 1);
for (i = 0; i <= nbytebuckets; i++) {
if (slot->bin_hist[i]) {
(void) ngx_atomic_fetch_add(&bin_lanes[i],
slot->bin_hist[i]);
}
if (slot->bout_hist[i]) {
(void) ngx_atomic_fetch_add(&bout_lanes[i],
slot->bout_hist[i]);
}
}
}
/* Clear local deltas. */
slot->requests = 0;
slot->bytes_in = 0;
slot->bytes_out = 0;
slot->duration_sum_ms = 0;
slot->upstream_sum_ms = 0;
for (i = 0; i <= nbuckets; i++) {
slot->dhist[i] = 0;
slot->uhist[i] = 0;
}
for (i = 0; i <= nbytebuckets; i++) {
slot->bin_hist[i] = 0;
slot->bout_hist[i] = 0;
}
ngx_queue_remove(&n->lru);
ngx_queue_insert_tail(&sh->lru, &n->lru);
}
w->dirty_head = NULL;
ngx_shmtx_unlock(&slab->mutex);
}
/* ----------------------------------------------------------------- */
/* Log-phase handler */
/* ----------------------------------------------------------------- */
static ngx_uint_t
ngx_http_ipng_stats_status_index(ngx_uint_t code)
{
if (code < NGX_HTTP_IPNG_STATS_CODE_MIN
|| code > NGX_HTTP_IPNG_STATS_CODE_MAX)
{
return NGX_HTTP_IPNG_STATS_CLASS_UNKNOWN;
}
return code / 100; /* 1..5 */
}
static const char *
ngx_http_ipng_stats_class_label(ngx_uint_t class)
{
static const char *labels[NGX_HTTP_IPNG_STATS_NCLASSES] = {
"unknown", "1xx", "2xx", "3xx", "4xx", "5xx",
};
if (class >= NGX_HTTP_IPNG_STATS_NCLASSES) return "unknown";
return labels[class];
}
static ngx_uint_t
ngx_http_ipng_stats_bucket_index(ngx_uint_t ms, ngx_uint_t *bounds,
ngx_uint_t nbuckets)
{
/* Binary search for the smallest bound >= ms. Return nbuckets for
* values exceeding the final bound (the implicit +Inf bucket). */
ngx_uint_t lo = 0, hi = nbuckets;
while (lo < hi) {
ngx_uint_t mid = (lo + hi) >> 1;
if (ms <= bounds[mid]) {
hi = mid;
} else {
lo = mid + 1;
}
}
return lo;
}
/* Determine the source tag for a request by asking the kernel which
* interface the connection's SYN came in on. IP_PKTINFO /
* IPV6_RECVPKTINFO (enabled on the listening socket at init_module
* time) tells the kernel to stash an in(6)_pktinfo cmsg on each
* accepted connection; getsockopt(IP_PKTOPTIONS) returns that cmsg
* here. The ifindex lookup is cheap — a short linear scan, since a
* host has O(10) attributed interfaces — and avoids the
* SO_BINDTODEVICE egress-pinning problem. */
static ngx_int_t
ngx_http_ipng_stats_resolve_source(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf, ngx_str_t *source_out)
{
ngx_http_ipng_stats_binding_t *b;
ngx_uint_t i, ifindex = 0;
u_char cbuf[256];
socklen_t clen = sizeof(cbuf);
struct cmsghdr *cm;
struct msghdr mh;
int level, optname;
if (imcf->bindings == NULL || imcf->bindings->nelts == 0) {
*source_out = imcf->default_source;
return NGX_OK;
}
if (r->connection->local_sockaddr
&& r->connection->local_sockaddr->sa_family == AF_INET6)
{
level = IPPROTO_IPV6;
/* glibc's <netinet/in.h> only exposes the legacy name for
* this option; the numeric value (6) is the same as the
* unnamed "modern" IPV6_PKTOPTIONS. */
optname = IPV6_2292PKTOPTIONS;
} else {
level = IPPROTO_IP;
optname = IP_PKTOPTIONS;
}
if (getsockopt(r->connection->fd, level, optname, cbuf, &clen) == 0) {
ngx_memzero(&mh, sizeof(mh));
mh.msg_control = cbuf;
mh.msg_controllen = clen;
for (cm = CMSG_FIRSTHDR(&mh); cm; cm = CMSG_NXTHDR(&mh, cm)) {
if (cm->cmsg_level == IPPROTO_IP && cm->cmsg_type == IP_PKTINFO) {
ifindex = ((struct in_pktinfo *) CMSG_DATA(cm))->ipi_ifindex;
break;
}
#if (NGX_HAVE_INET6)
if (cm->cmsg_level == IPPROTO_IPV6
&& cm->cmsg_type == IPV6_PKTINFO)
{
ifindex = ((struct in6_pktinfo *) CMSG_DATA(cm))->ipi6_ifindex;
break;
}
#endif
}
}
if (ifindex > 0) {
sa_family_t fam = r->connection->local_sockaddr
? r->connection->local_sockaddr->sa_family
: AF_INET;
b = imcf->bindings->elts;
for (i = 0; i < imcf->bindings->nelts; i++) {
if (b[i].ifindex == ifindex && b[i].family == fam) {
*source_out = b[i].source;
return NGX_OK;
}
}
}
*source_out = imcf->default_source;
return NGX_OK;
}
static ngx_int_t
ngx_http_ipng_stats_canonical_vip(ngx_http_request_t *r, u_char *buf,
size_t buflen, ngx_str_t *vip_out)
{
size_t n;
if (r->connection->local_sockaddr == NULL) {
return NGX_ERROR;
}
/* ngx_sock_ntop(NGX_SOCKADDR_STRLEN) drops port and renders v6 in
* RFC 5952 form. Scope-ids are dropped as part of the format, which
* is what FR-2.5 asks for. */
n = ngx_sock_ntop(r->connection->local_sockaddr,
r->connection->local_socklen,
buf, buflen, 0);
if (n == 0) {
return NGX_ERROR;
}
vip_out->data = buf;
vip_out->len = n;
return NGX_OK;
}
/* Find or allocate a per-worker slot for this key; caller fills in
* deltas. The per-worker table is a flat array with linear scan over
* already-used slots — fine because `nslots` is small. */
static ngx_http_ipng_stats_slot_t *
ngx_http_ipng_stats_worker_slot(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_uint_t source_id, ngx_uint_t vip_id, ngx_uint_t class)
{
ngx_http_ipng_stats_slot_t *s;
ngx_uint_t i, hash;
hash = (source_id * 2654435761u)
^ (vip_id * 40503u)
^ (class * 131071u);
for (i = 0; i < w->nslots; i++) {
s = &w->slots[i];
if (s->hash == hash && s->source_id == source_id
&& s->vip_id == vip_id && s->class == class)
{
return s;
}
}
if (w->nslots >= w->nalloc) {
/* Over the per-worker cap. Drop. The flush will see zero
* dirty slots for this key and the shared zone will not
* learn of it until capacity frees up. */
return NULL;
}
s = &w->slots[w->nslots];
s->hash = hash;
s->source_id = source_id;
s->vip_id = vip_id;
s->class = class;
s->dhist = &w->dhist_arena[w->nslots * (imcf->nbuckets + 1)];
s->uhist = &w->uhist_arena[w->nslots * (imcf->nbuckets + 1)];
s->bin_hist = &w->bin_arena [w->nslots * (imcf->nbytebuckets + 1)];
s->bout_hist = &w->bout_arena[w->nslots * (imcf->nbytebuckets + 1)];
w->nslots++;
return s;
}
static void
ngx_http_ipng_stats_mark_dirty(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_slot_t *s)
{
if (s->dirty) {
return;
}
s->dirty = 1;
s->dirty_next = w->dirty_head;
w->dirty_head = s;
}
static ngx_int_t
ngx_http_ipng_stats_log_handler(ngx_http_request_t *r)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_ipng_stats_loc_conf_t *ilcf;
ngx_http_ipng_stats_worker_t *w = &ngx_http_ipng_stats_worker;
ngx_http_ipng_stats_shctx_t *sh;
ngx_slab_pool_t *slab;
ngx_http_ipng_stats_slot_t *slot;
ngx_str_t source, vip;
u_char vipbuf[NGX_SOCKADDR_STRLEN];
ngx_uint_t source_id, vip_id, class;
ngx_uint_t bucket;
uint64_t bin_sz, bout_sz;
ngx_msec_int_t elapsed_ms;
ngx_time_t *tp;
imcf = ngx_http_get_module_main_conf(r, ngx_http_ipng_stats_module);
if (imcf == NULL || imcf->shm_zone == NULL || !imcf->enabled) {
return NGX_OK;
}
ilcf = ngx_http_get_module_loc_conf(r, ngx_http_ipng_stats_module);
if (ilcf == NULL || !ilcf->enabled) {
return NGX_OK;
}
if (ngx_http_ipng_stats_resolve_source(r, imcf, &source) != NGX_OK) {
return NGX_OK;
}
/* For wildcard listeners (0.0.0.0 / ::), local_sockaddr initially
* holds the bind address, not the real VIP. Force a getsockname()
* so we get the actual destination address the client connected to. */
if (ngx_connection_local_sockaddr(r->connection, NULL, 0) != NGX_OK) {
return NGX_OK;
}
if (ngx_http_ipng_stats_canonical_vip(r, vipbuf, sizeof(vipbuf), &vip)
!= NGX_OK)
{
return NGX_OK;
}
class = ngx_http_ipng_stats_status_index(r->headers_out.status);
/* Intern source and vip in the shared zone. This is the only
* shared-zone write outside of flush — it runs rarely (once per
* new (source, vip) pair, ever) and takes the slab mutex. */
slab = (ngx_slab_pool_t *) imcf->shm_zone->shm.addr;
sh = imcf->shm_zone->data;
ngx_shmtx_lock(&slab->mutex);
if (ngx_http_ipng_stats_intern_shared(sh, slab, &sh->sources, &source,
&source_id) != NGX_OK
|| ngx_http_ipng_stats_intern_shared(sh, slab, &sh->vips, &vip,
&vip_id) != NGX_OK)
{
ngx_shmtx_unlock(&slab->mutex);
return NGX_OK;
}
ngx_shmtx_unlock(&slab->mutex);
slot = ngx_http_ipng_stats_worker_slot(w, imcf, source_id, vip_id, class);
if (slot == NULL) {
return NGX_OK;
}
bin_sz = r->request_length > 0 ? (uint64_t) r->request_length : 0;
bout_sz = (uint64_t) r->connection->sent;
slot->requests += 1;
slot->bytes_in += bin_sz;
slot->bytes_out += bout_sz;
bucket = ngx_http_ipng_stats_bucket_index((ngx_uint_t) bin_sz,
imcf->byte_bucket_bounds,
imcf->nbytebuckets);
slot->bin_hist[bucket] += 1;
bucket = ngx_http_ipng_stats_bucket_index((ngx_uint_t) bout_sz,
imcf->byte_bucket_bounds,
imcf->nbytebuckets);
slot->bout_hist[bucket] += 1;
/* Use the same formula nginx uses for $request_time: two-field
* subtraction via ngx_timeofday(), which is the canonical way to
* compute elapsed time in nginx's cached-time model. */
tp = ngx_timeofday();
elapsed_ms = (ngx_msec_int_t)
((tp->sec - r->start_sec) * 1000 + (tp->msec - r->start_msec));
if (elapsed_ms < 0) elapsed_ms = 0;
slot->duration_sum_ms += (uint64_t) elapsed_ms;
bucket = ngx_http_ipng_stats_bucket_index((ngx_uint_t) elapsed_ms,
imcf->bucket_bounds_ms,
imcf->nbuckets);
slot->dhist[bucket] += 1;
if (r->upstream_states != NULL && r->upstream_states->nelts > 0) {
ngx_http_upstream_state_t *us = r->upstream_states->elts;
ngx_msec_int_t up_ms = (ngx_msec_int_t) us[0].response_time;
if (up_ms > 0) {
slot->upstream_sum_ms += (uint64_t) up_ms;
bucket = ngx_http_ipng_stats_bucket_index((ngx_uint_t) up_ms,
imcf->bucket_bounds_ms,
imcf->nbuckets);
slot->uhist[bucket] += 1;
}
}
ngx_http_ipng_stats_mark_dirty(w, slot);
/* Global logtail — runs for every request, uses compiled log_format
* ops, writes to a per-worker buffer. */
if (imcf->logtail_fmt != NULL) {
ngx_http_ipng_stats_logtail_write(r, imcf, w);
}
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* String interning (called under slab mutex) */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_intern_shared(ngx_http_ipng_stats_shctx_t *sh,
ngx_slab_pool_t *slab, ngx_http_ipng_stats_intern_t *t, ngx_str_t *s,
ngx_uint_t *idx_out)
{
ngx_uint_t i;
u_char *copy;
ngx_str_t *ne;
for (i = 0; i < t->nelts; i++) {
if (t->entries[i].len == s->len
&& ngx_memcmp(t->entries[i].data, s->data, s->len) == 0)
{
*idx_out = i;
return NGX_OK;
}
}
if (t->nelts >= t->nalloc) {
/* Grow by 2x. */
ngx_uint_t new_cap = t->nalloc * 2;
ngx_str_t *ne_arr = ngx_slab_alloc_locked(slab,
new_cap * sizeof(ngx_str_t));
if (ne_arr == NULL) {
(void) ngx_atomic_fetch_add(&sh->zone_full_events, 1);
return NGX_ERROR;
}
ngx_memcpy(ne_arr, t->entries, t->nelts * sizeof(ngx_str_t));
ngx_slab_free_locked(slab, t->entries);
t->entries = ne_arr;
t->nalloc = new_cap;
}
copy = ngx_slab_alloc_locked(slab, s->len);
if (copy == NULL) {
(void) ngx_atomic_fetch_add(&sh->zone_full_events, 1);
return NGX_ERROR;
}
ngx_memcpy(copy, s->data, s->len);
ne = &t->entries[t->nelts];
ne->data = copy;
ne->len = s->len;
*idx_out = t->nelts;
t->nelts++;
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* Global logtail: write + flush */
/* ----------------------------------------------------------------- */
static void
ngx_http_ipng_stats_logtail_write(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_http_ipng_stats_worker_t *w)
{
ipng_log_op_t *ops;
ngx_uint_t i, nops;
size_t line_len;
u_char *p;
if (imcf->logtail_fmt == NULL || w->logtail_buf == NULL) {
return;
}
/* if=$variable: skip this request when the variable is empty or "0". */
if (imcf->logtail_if_index) {
ngx_http_variable_value_t *val;
val = ngx_http_get_indexed_variable(r, imcf->logtail_if_index - 1);
if (val == NULL || val->not_found
|| val->len == 0
|| (val->len == 1 && val->data[0] == '0'))
{
return;
}
}
ops = imcf->logtail_fmt->ops->elts;
nops = imcf->logtail_fmt->ops->nelts;
/* Compute line length. */
line_len = 1; /* trailing newline */
for (i = 0; i < nops; i++) {
if (ops[i].len == 0) {
line_len += ops[i].getlen(r, ops[i].data);
} else {
line_len += ops[i].len;
}
}
/* Flush if the line won't fit. */
if ((size_t)(w->logtail_end - w->logtail_pos) < line_len) {
ngx_http_ipng_stats_logtail_flush(w, imcf);
}
/* If it STILL doesn't fit (single line > buffer), write directly. */
if ((size_t)(w->logtail_end - w->logtail_pos) < line_len) {
u_char *tmp = ngx_pnalloc(r->pool, line_len);
if (tmp == NULL) {
return;
}
p = tmp;
for (i = 0; i < nops; i++) {
p = ops[i].run(r, p, &ops[i]);
}
*p++ = '\n';
if (w->logtail_udp_fd != (ngx_socket_t) -1) {
(void) sendto(w->logtail_udp_fd, tmp, p - tmp, 0,
(struct sockaddr *) &imcf->logtail_udp_addr,
sizeof(struct sockaddr_in));
}
return;
}
/* Append to buffer. */
p = w->logtail_pos;
for (i = 0; i < nops; i++) {
p = ops[i].run(r, p, &ops[i]);
}
*p++ = '\n';
w->logtail_pos = p;
}
static void
ngx_http_ipng_stats_logtail_flush(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf)
{
size_t n;
if (w->logtail_buf == NULL || w->logtail_pos == w->logtail_buf) {
return;
}
n = w->logtail_pos - w->logtail_buf;
/* UDP mode: fire-and-forget sendto. If nobody is listening,
* the kernel silently drops the datagram. */
if (w->logtail_udp_fd != (ngx_socket_t) -1) {
(void) sendto(w->logtail_udp_fd, w->logtail_buf, n, 0,
(struct sockaddr *) &imcf->logtail_udp_addr,
sizeof(struct sockaddr_in));
}
w->logtail_pos = w->logtail_buf;
}
static void
ngx_http_ipng_stats_logtail_flush_handler(ngx_event_t *ev)
{
ngx_http_ipng_stats_worker_t *w = ev->data;
ngx_http_ipng_stats_main_conf_t *imcf;
imcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
ngx_http_ipng_stats_module);
if (imcf != NULL) {
ngx_http_ipng_stats_logtail_flush(w, imcf);
}
if (!ngx_exiting && !ngx_quit) {
ngx_add_timer(&w->logtail_flush_ev,
imcf ? imcf->logtail_flush : 1000);
}
}
/* ----------------------------------------------------------------- */
/* Nginx variables: $ipng_source_tag */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_source_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_str_t source;
(void) data;
imcf = ngx_http_get_module_main_conf(r, ngx_http_ipng_stats_module);
if (imcf == NULL) {
v->not_found = 1;
return NGX_OK;
}
if (ngx_http_ipng_stats_resolve_source(r, imcf, &source) != NGX_OK) {
v->not_found = 1;
return NGX_OK;
}
v->len = source.len;
v->valid = 1;
v->no_cacheable = 0;
v->not_found = 0;
v->data = source.data;
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* Scrape content handler */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_parse_filters(ngx_http_request_t *r, ngx_str_t *src,
ngx_str_t *vip)
{
ngx_str_t key, val;
u_char *p, *last;
ngx_str_null(src);
ngx_str_null(vip);
if (r->args.len == 0) {
return NGX_OK;
}
p = r->args.data;
last = r->args.data + r->args.len;
while (p < last) {
key.data = p;
while (p < last && *p != '=' && *p != '&') p++;
key.len = p - key.data;
if (p < last && *p == '=') {
p++;
val.data = p;
while (p < last && *p != '&') p++;
val.len = p - val.data;
} else {
ngx_str_null(&val);
}
if (p < last && *p == '&') p++;
if (key.len == 10 && ngx_strncmp(key.data, "source_tag", 10) == 0) {
*src = val;
} else if (key.len == 3 && ngx_strncmp(key.data, "vip", 3) == 0) {
*vip = val;
}
}
return NGX_OK;
}
static ngx_int_t
ngx_http_ipng_stats_want_json(ngx_http_request_t *r)
{
ngx_table_elt_t *h;
ngx_list_part_t *part;
ngx_uint_t i;
part = &r->headers_in.headers.part;
h = part->elts;
for (i = 0; /* void */; i++) {
if (i >= part->nelts) {
if (part->next == NULL) break;
part = part->next;
h = part->elts;
i = 0;
}
if (h[i].key.len == 6
&& ngx_strncasecmp(h[i].key.data, (u_char *) "accept", 6) == 0)
{
if (ngx_strlcasestrn(h[i].value.data,
h[i].value.data + h[i].value.len,
(u_char *) "application/json", 16 - 1)
!= NULL)
{
return 1;
}
return 0;
}
}
return 0;
}
static ngx_int_t
ngx_http_ipng_stats_content_handler(ngx_http_request_t *r)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_str_t filter_src, filter_vip;
ngx_int_t rc;
if (r->method != NGX_HTTP_GET && r->method != NGX_HTTP_HEAD) {
return NGX_HTTP_NOT_ALLOWED;
}
rc = ngx_http_discard_request_body(r);
if (rc != NGX_OK) {
return rc;
}
imcf = ngx_http_get_module_main_conf(r, ngx_http_ipng_stats_module);
if (imcf == NULL || imcf->shm_zone == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_ipng_stats_parse_filters(r, &filter_src, &filter_vip);
if (ngx_http_ipng_stats_want_json(r)) {
return ngx_http_ipng_stats_render_json(r, imcf,
&filter_src, &filter_vip);
}
return ngx_http_ipng_stats_render_prom(r, imcf,
&filter_src, &filter_vip);
}
/* Scrape rendering.
*
* Cardinality model (post-schema-2): counters are keyed by
* (source_tag, vip, code-class), where code-class is one of
* "1xx".."5xx" or "unknown" — six values rather than ~60 full codes.
* Histograms drop the code label entirely and aggregate across classes
* per (source_tag, vip). Operators who need a full per-code breakdown
* should enable ipng_stats_logtail (FR-8) and process the access-log
* stream off the hot path.
*
* The renderers below walk the shared-zone LRU once under the slab
* mutex, emit per-node counters inline, and accumulate histograms into
* a per-(source, vip) aggregation table allocated from r->pool. The
* aggregation table is then drained to produce one histogram group per
* (source, vip). */
typedef struct {
ngx_uint_t source_id;
ngx_uint_t vip_id;
ngx_uint_t used;
uint64_t duration_sum_ms; /* histogram _sum for request_duration */
uint64_t upstream_sum_ms;
uint64_t bytes_in_sum; /* histogram _sum for bytes_in */
uint64_t bytes_out_sum;
uint64_t req_total; /* total requests for this (source, vip) */
uint64_t up_total; /* upstream observations */
uint64_t *dhist; /* nbuckets+1 */
uint64_t *uhist;
uint64_t *bin_hist; /* nbytebuckets+1 */
uint64_t *bout_hist;
} ngx_http_ipng_stats_agg_t;
static ngx_chain_t *
ngx_http_ipng_stats_chain_buf(ngx_http_request_t *r, size_t size)
{
ngx_buf_t *b;
ngx_chain_t *cl;
b = ngx_create_temp_buf(r->pool, size);
if (b == NULL) return NULL;
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) return NULL;
cl->buf = b;
cl->next = NULL;
return cl;
}
static ngx_int_t
ngx_http_ipng_stats_send(ngx_http_request_t *r, ngx_str_t *ctype,
ngx_chain_t *out)
{
ngx_int_t rc;
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_type = *ctype;
r->headers_out.content_type_len = ctype->len;
r->headers_out.content_length_n = -1;
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->header_only) {
return rc;
}
if (out == NULL) {
return ngx_http_send_special(r, NGX_HTTP_LAST);
}
ngx_chain_t *last = out;
while (last->next) last = last->next;
last->buf->last_buf = 1;
last->buf->last_in_chain = 1;
return ngx_http_output_filter(r, out);
}
static ngx_int_t
ngx_http_ipng_stats_append(ngx_chain_t ***last, ngx_chain_t *cl)
{
if (cl == NULL) return NGX_ERROR;
**last = cl;
*last = &cl->next;
return NGX_OK;
}
/* Per-node snapshot used by the Prometheus and JSON renderers. The
* renderers drain the shared-zone LRU into an array of these under
* the slab mutex, and then release the mutex before doing any output
* buffer allocation. That keeps ngx_pcalloc / malloc off the locked
* path — a worker crash during rendering can never leave the slab
* mutex held (NFR-4.3), and slab-lock contention no longer blocks on
* glibc's allocator. */
typedef struct {
ngx_uint_t source_id;
ngx_uint_t vip_id;
ngx_uint_t class;
uint64_t requests;
uint64_t bytes_in;
uint64_t bytes_out;
uint64_t duration_sum_ms;
uint64_t upstream_sum_ms;
} ngx_http_ipng_stats_snap_t;
/* Sanity cap on shared-zone cardinality, above which the renderer
* refuses to snapshot rather than try to allocate a potentially
* runaway buffer. This is a belt-and-suspenders guard against a
* corrupt `sh->sources.nelts` or `sh->vips.nelts` — see design.md
* on reload hardening. */
#define NGX_HTTP_IPNG_STATS_MAX_INTERN 10000
/* Find or create an aggregation entry for (source_id, vip_id). Linear
* scan — the table is small in practice (< 100 entries). */
static ngx_http_ipng_stats_agg_t *
ngx_http_ipng_stats_agg_get(ngx_http_ipng_stats_agg_t *aggs, ngx_uint_t *naggs,
ngx_uint_t naggs_alloc, ngx_uint_t source_id, ngx_uint_t vip_id)
{
ngx_uint_t i;
for (i = 0; i < *naggs; i++) {
if (aggs[i].source_id == source_id && aggs[i].vip_id == vip_id) {
return &aggs[i];
}
}
if (*naggs >= naggs_alloc) return NULL;
aggs[*naggs].source_id = source_id;
aggs[*naggs].vip_id = vip_id;
aggs[*naggs].used = 1;
return &aggs[(*naggs)++];
}
/* Deep-copy the sources and vips interning tables into r->pool so the
* renderers can dereference strings without holding the slab mutex.
* Caller holds the slab mutex. Writes the number of valid entries
* (possibly smaller than the allocated length) via *n_src_out / *n_vip_out. */
static ngx_int_t
ngx_http_ipng_stats_snapshot_strings(ngx_http_request_t *r,
ngx_http_ipng_stats_shctx_t *sh,
ngx_str_t *src_tbl, ngx_uint_t n_src_alloc, ngx_uint_t *n_src_out,
ngx_str_t *vip_tbl, ngx_uint_t n_vip_alloc, ngx_uint_t *n_vip_out)
{
ngx_uint_t i, n;
u_char *d;
n = sh->sources.nelts < n_src_alloc ? sh->sources.nelts : n_src_alloc;
for (i = 0; i < n; i++) {
if (sh->sources.entries[i].len > 0) {
d = ngx_pnalloc(r->pool, sh->sources.entries[i].len);
if (d == NULL) return NGX_ERROR;
ngx_memcpy(d, sh->sources.entries[i].data,
sh->sources.entries[i].len);
src_tbl[i].data = d;
src_tbl[i].len = sh->sources.entries[i].len;
}
}
*n_src_out = n;
n = sh->vips.nelts < n_vip_alloc ? sh->vips.nelts : n_vip_alloc;
for (i = 0; i < n; i++) {
if (sh->vips.entries[i].len > 0) {
d = ngx_pnalloc(r->pool, sh->vips.entries[i].len);
if (d == NULL) return NGX_ERROR;
ngx_memcpy(d, sh->vips.entries[i].data,
sh->vips.entries[i].len);
vip_tbl[i].data = d;
vip_tbl[i].len = sh->vips.entries[i].len;
}
}
*n_vip_out = n;
return NGX_OK;
}
/* Allocate the aggregation table plus its lane arena from r->pool. */
static ngx_int_t
ngx_http_ipng_stats_agg_alloc(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf, ngx_uint_t n,
ngx_http_ipng_stats_agg_t **aggs_out, ngx_uint_t *nalloc_out)
{
ngx_uint_t nb = imcf->nbuckets;
ngx_uint_t nbb = imcf->nbytebuckets;
size_t lanes_per = 2 * (nb + 1) + 2 * (nbb + 1);
uint64_t *arena;
ngx_http_ipng_stats_agg_t *aggs;
ngx_uint_t i;
if (n == 0) n = 1;
aggs = ngx_pcalloc(r->pool, n * sizeof(*aggs));
arena = ngx_pcalloc(r->pool, n * lanes_per * sizeof(uint64_t));
if (aggs == NULL || arena == NULL) return NGX_ERROR;
for (i = 0; i < n; i++) {
uint64_t *base = &arena[i * lanes_per];
aggs[i].dhist = base;
aggs[i].uhist = base + (nb + 1);
aggs[i].bin_hist = base + 2 * (nb + 1);
aggs[i].bout_hist = base + 2 * (nb + 1) + (nbb + 1);
}
*aggs_out = aggs;
*nalloc_out = n;
return NGX_OK;
}
/* Walk the LRU under the slab mutex, draining matching nodes into the
* caller-supplied `snaps` array and aggregating histogram lanes into
* `aggs`. All output buffers are pre-allocated by the caller — this
* function does not touch r->pool, so a crash here cannot leave the
* slab mutex held via a glibc abort.
*
* `src_tbl` / `vip_tbl` hold the deep-copied interning strings and are
* used only for filter comparisons; the renderer downstream indexes
* them again. Nodes whose ids fall outside the snapshotted range are
* skipped (a belt-and-suspenders guard against a stale id after a
* concurrent reload). */
static void
ngx_http_ipng_stats_snapshot_nodes(ngx_http_ipng_stats_shctx_t *sh,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_src, ngx_str_t *filter_vip,
ngx_str_t *src_tbl, ngx_uint_t n_src,
ngx_str_t *vip_tbl, ngx_uint_t n_vip,
ngx_http_ipng_stats_snap_t *snaps, ngx_uint_t nsnaps_alloc,
ngx_uint_t *nsnaps_out,
ngx_http_ipng_stats_agg_t *aggs, ngx_uint_t naggs_alloc,
ngx_uint_t *naggs_out)
{
ngx_queue_t *q;
ngx_http_ipng_stats_node_t *n;
ngx_str_t *src_entry, *vip_entry;
ngx_atomic_uint_t *lanes, *blanes;
ngx_http_ipng_stats_agg_t *a;
ngx_uint_t i;
ngx_uint_t nb = imcf->nbuckets;
ngx_uint_t nbb = imcf->nbytebuckets;
ngx_uint_t nsnaps = 0, naggs = 0;
for (q = ngx_queue_head(&sh->lru);
q != ngx_queue_sentinel(&sh->lru);
q = ngx_queue_next(q))
{
n = ngx_queue_data(q, ngx_http_ipng_stats_node_t, lru);
if (n->source_id >= n_src || n->vip_id >= n_vip) continue;
src_entry = &src_tbl[n->source_id];
vip_entry = &vip_tbl[n->vip_id];
if (filter_src->len > 0
&& (src_entry->len != filter_src->len
|| ngx_memcmp(src_entry->data, filter_src->data,
filter_src->len) != 0))
{
continue;
}
if (filter_vip->len > 0
&& (vip_entry->len != filter_vip->len
|| ngx_memcmp(vip_entry->data, filter_vip->data,
filter_vip->len) != 0))
{
continue;
}
if (nsnaps < nsnaps_alloc) {
snaps[nsnaps].source_id = n->source_id;
snaps[nsnaps].vip_id = n->vip_id;
snaps[nsnaps].class = n->class;
snaps[nsnaps].requests = n->requests;
snaps[nsnaps].bytes_in = n->bytes_in;
snaps[nsnaps].bytes_out = n->bytes_out;
snaps[nsnaps].duration_sum_ms = n->duration_sum_ms;
snaps[nsnaps].upstream_sum_ms = n->upstream_sum_ms;
nsnaps++;
}
a = ngx_http_ipng_stats_agg_get(aggs, &naggs, naggs_alloc,
n->source_id, n->vip_id);
if (a == NULL) continue;
a->duration_sum_ms += n->duration_sum_ms;
a->upstream_sum_ms += n->upstream_sum_ms;
a->bytes_in_sum += n->bytes_in;
a->bytes_out_sum += n->bytes_out;
a->req_total += n->requests;
lanes = (ngx_atomic_uint_t *) (n + 1);
blanes = lanes + 2 * (nb + 1);
for (i = 0; i <= nb; i++) {
a->dhist[i] += lanes[i];
a->uhist[i] += lanes[nb + 1 + i];
a->up_total += lanes[nb + 1 + i];
}
for (i = 0; i <= nbb; i++) {
a->bin_hist[i] += blanes[i];
a->bout_hist[i] += blanes[nbb + 1 + i];
}
}
*nsnaps_out = nsnaps;
*naggs_out = naggs;
}
/* -- Prometheus ---------------------------------------------------- */
static u_char *
ngx_http_ipng_stats_render_hist(u_char *p, const char *metric,
ngx_str_t *src, ngx_str_t *vip, ngx_uint_t *bounds, ngx_uint_t nb,
uint64_t *lanes, double sum_units, int is_seconds)
{
ngx_uint_t i;
uint64_t cum = 0;
for (i = 0; i < nb; i++) {
cum += lanes[i];
if (is_seconds) {
p = ngx_sprintf(p,
"%s_bucket{source_tag=\"%V\",vip=\"%V\",le=\"%.3f\"} %uL\n",
metric, src, vip,
(double) bounds[i] / 1000.0, cum);
} else {
p = ngx_sprintf(p,
"%s_bucket{source_tag=\"%V\",vip=\"%V\",le=\"%ui\"} %uL\n",
metric, src, vip, bounds[i], cum);
}
}
cum += lanes[nb];
p = ngx_sprintf(p,
"%s_bucket{source_tag=\"%V\",vip=\"%V\",le=\"+Inf\"} %uL\n"
"%s_sum{source_tag=\"%V\",vip=\"%V\"} %.3f\n"
"%s_count{source_tag=\"%V\",vip=\"%V\"} %uL\n",
metric, src, vip, cum,
metric, src, vip, sum_units,
metric, src, vip, cum);
return p;
}
static ngx_int_t
ngx_http_ipng_stats_render_prom(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_source, ngx_str_t *filter_vip)
{
ngx_http_ipng_stats_shctx_t *sh = imcf->shm_zone->data;
ngx_slab_pool_t *slab;
ngx_chain_t *out = NULL, *cl;
ngx_chain_t **last = &out;
ngx_str_t ctype = ngx_string("text/plain; version=0.0.4");
ngx_http_ipng_stats_agg_t *aggs;
ngx_http_ipng_stats_snap_t *snaps;
ngx_str_t *src_tbl = NULL, *vip_tbl = NULL;
ngx_uint_t n_src_alloc, n_vip_alloc;
ngx_uint_t n_src = 0, n_vip = 0;
ngx_uint_t naggs = 0, naggs_alloc;
ngx_uint_t nsnaps = 0, nsnaps_alloc;
ngx_uint_t i;
ngx_uint_t nb = imcf->nbuckets;
ngx_uint_t nbb = imcf->nbytebuckets;
size_t hist_sz;
slab = (ngx_slab_pool_t *) imcf->shm_zone->shm.addr;
cl = ngx_http_ipng_stats_chain_buf(r, 1536);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
cl->buf->last = ngx_sprintf(cl->buf->last,
"# nginx-ipng-stats-plugin %s (schema=%d)\n"
"# HELP nginx_ipng_requests_total Total HTTP requests.\n"
"# TYPE nginx_ipng_requests_total counter\n"
"# HELP nginx_ipng_bytes_in_total Request bytes received.\n"
"# TYPE nginx_ipng_bytes_in_total counter\n"
"# HELP nginx_ipng_bytes_out_total Response bytes sent.\n"
"# TYPE nginx_ipng_bytes_out_total counter\n"
"# HELP nginx_ipng_latency_total Sum of request durations in seconds.\n"
"# TYPE nginx_ipng_latency_total counter\n"
"# HELP nginx_ipng_request_duration_seconds Request duration histogram.\n"
"# TYPE nginx_ipng_request_duration_seconds histogram\n"
"# HELP nginx_ipng_upstream_response_seconds Upstream response-time histogram.\n"
"# TYPE nginx_ipng_upstream_response_seconds histogram\n"
"# HELP nginx_ipng_bytes_in Request size histogram in bytes.\n"
"# TYPE nginx_ipng_bytes_in histogram\n"
"# HELP nginx_ipng_bytes_out Request size histogram in bytes.\n"
"# TYPE nginx_ipng_bytes_out histogram\n",
NGX_HTTP_IPNG_STATS_VERSION, NGX_HTTP_IPNG_STATS_SCHEMA_VERSION);
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
/* Read cardinality with a brief lock. We release before allocating
* the snapshot buffers so glibc's allocator is never entered with
* the slab mutex held — a worker crash during allocation cannot
* leave the shared-memory zone locked. */
ngx_shmtx_lock(&slab->mutex);
n_src_alloc = sh->sources.nelts;
n_vip_alloc = sh->vips.nelts;
ngx_shmtx_unlock(&slab->mutex);
if (n_src_alloc > NGX_HTTP_IPNG_STATS_MAX_INTERN
|| n_vip_alloc > NGX_HTTP_IPNG_STATS_MAX_INTERN)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"ipng_stats: render: refusing to render, cardinality "
"out of range (sources=%ui, vips=%ui)",
n_src_alloc, n_vip_alloc);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
naggs_alloc = n_src_alloc * n_vip_alloc;
nsnaps_alloc = naggs_alloc * NGX_HTTP_IPNG_STATS_NCLASSES;
if (naggs_alloc == 0) naggs_alloc = 1;
if (nsnaps_alloc == 0) nsnaps_alloc = 1;
if (ngx_http_ipng_stats_agg_alloc(r, imcf, naggs_alloc,
&aggs, &naggs_alloc) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
snaps = ngx_pcalloc(r->pool, nsnaps_alloc * sizeof(*snaps));
if (n_src_alloc > 0) {
src_tbl = ngx_pcalloc(r->pool, n_src_alloc * sizeof(ngx_str_t));
if (src_tbl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (n_vip_alloc > 0) {
vip_tbl = ngx_pcalloc(r->pool, n_vip_alloc * sizeof(ngx_str_t));
if (vip_tbl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (snaps == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
ngx_shmtx_lock(&slab->mutex);
if (ngx_http_ipng_stats_snapshot_strings(r, sh,
src_tbl, n_src_alloc, &n_src,
vip_tbl, n_vip_alloc, &n_vip) != NGX_OK)
{
ngx_shmtx_unlock(&slab->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_ipng_stats_snapshot_nodes(sh, imcf,
filter_source, filter_vip,
src_tbl, n_src, vip_tbl, n_vip,
snaps, nsnaps_alloc, &nsnaps,
aggs, naggs_alloc, &naggs);
ngx_shmtx_unlock(&slab->mutex);
/* Per-node counters. */
for (i = 0; i < nsnaps; i++) {
ngx_http_ipng_stats_snap_t *s = &snaps[i];
ngx_str_t *src = &src_tbl[s->source_id];
ngx_str_t *vip = &vip_tbl[s->vip_id];
const char *cls = ngx_http_ipng_stats_class_label(s->class);
cl = ngx_http_ipng_stats_chain_buf(r, 1024);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
cl->buf->last = ngx_sprintf(cl->buf->last,
"nginx_ipng_requests_total{source_tag=\"%V\",vip=\"%V\",code=\"%s\"} %uL\n"
"nginx_ipng_bytes_in_total{source_tag=\"%V\",vip=\"%V\",code=\"%s\"} %uL\n"
"nginx_ipng_bytes_out_total{source_tag=\"%V\",vip=\"%V\",code=\"%s\"} %uL\n"
"nginx_ipng_latency_total{source_tag=\"%V\",vip=\"%V\",code=\"%s\"} %.3f\n",
src, vip, cls, s->requests,
src, vip, cls, s->bytes_in,
src, vip, cls, s->bytes_out,
src, vip, cls, (double) s->duration_sum_ms / 1000.0);
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
/* One chain link per (source, vip) for the four aggregated histograms.
* Size: per-bucket line ~96B, + sum/count/+Inf per metric ~96B each. */
hist_sz = 256 + 96 * (2 * (nb + 1) + 2 * (nbb + 1)) + 4 * 200;
for (i = 0; i < naggs; i++) {
ngx_http_ipng_stats_agg_t *a = &aggs[i];
ngx_str_t *src = &src_tbl[a->source_id];
ngx_str_t *vip = &vip_tbl[a->vip_id];
u_char *p;
cl = ngx_http_ipng_stats_chain_buf(r, hist_sz);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
p = cl->buf->last;
p = ngx_http_ipng_stats_render_hist(p,
"nginx_ipng_request_duration_seconds", src, vip,
imcf->bucket_bounds_ms, nb, a->dhist,
(double) a->duration_sum_ms / 1000.0, 1);
p = ngx_http_ipng_stats_render_hist(p,
"nginx_ipng_upstream_response_seconds", src, vip,
imcf->bucket_bounds_ms, nb, a->uhist,
(double) a->upstream_sum_ms / 1000.0, 1);
p = ngx_http_ipng_stats_render_hist(p,
"nginx_ipng_bytes_in", src, vip,
imcf->byte_bucket_bounds, nbb, a->bin_hist,
(double) a->bytes_in_sum, 0);
p = ngx_http_ipng_stats_render_hist(p,
"nginx_ipng_bytes_out", src, vip,
imcf->byte_bucket_bounds, nbb, a->bout_hist,
(double) a->bytes_out_sum, 0);
cl->buf->last = p;
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
return ngx_http_ipng_stats_send(r, &ctype, out);
}
/* -- JSON ---------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_render_json(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_source, ngx_str_t *filter_vip)
{
ngx_http_ipng_stats_shctx_t *sh = imcf->shm_zone->data;
ngx_slab_pool_t *slab;
ngx_chain_t *out = NULL, *cl;
ngx_chain_t **last = &out;
ngx_str_t ctype = ngx_string("application/json");
ngx_http_ipng_stats_agg_t *aggs;
ngx_http_ipng_stats_snap_t *snaps;
ngx_str_t *src_tbl = NULL, *vip_tbl = NULL;
ngx_uint_t n_src_alloc, n_vip_alloc;
ngx_uint_t n_src = 0, n_vip = 0;
ngx_uint_t nsnaps = 0, nsnaps_alloc;
ngx_uint_t naggs = 0, naggs_alloc;
ngx_uint_t i, j, emitted = 0;
ngx_uint_t nb = imcf->nbuckets;
ngx_uint_t nbb = imcf->nbytebuckets;
ngx_http_ipng_stats_agg_t *a;
size_t rec_sz;
slab = (ngx_slab_pool_t *) imcf->shm_zone->shm.addr;
cl = ngx_http_ipng_stats_chain_buf(r, 64);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
cl->buf->last = ngx_sprintf(cl->buf->last,
"{\"schema\":%d,\"records\":[",
NGX_HTTP_IPNG_STATS_SCHEMA_VERSION);
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_shmtx_lock(&slab->mutex);
n_src_alloc = sh->sources.nelts;
n_vip_alloc = sh->vips.nelts;
ngx_shmtx_unlock(&slab->mutex);
if (n_src_alloc > NGX_HTTP_IPNG_STATS_MAX_INTERN
|| n_vip_alloc > NGX_HTTP_IPNG_STATS_MAX_INTERN)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"ipng_stats: render: refusing to render, cardinality "
"out of range (sources=%ui, vips=%ui)",
n_src_alloc, n_vip_alloc);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
naggs_alloc = n_src_alloc * n_vip_alloc;
nsnaps_alloc = naggs_alloc * NGX_HTTP_IPNG_STATS_NCLASSES;
if (naggs_alloc == 0) naggs_alloc = 1;
if (nsnaps_alloc == 0) nsnaps_alloc = 1;
if (ngx_http_ipng_stats_agg_alloc(r, imcf, naggs_alloc,
&aggs, &naggs_alloc) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
snaps = ngx_pcalloc(r->pool, nsnaps_alloc * sizeof(*snaps));
if (n_src_alloc > 0) {
src_tbl = ngx_pcalloc(r->pool, n_src_alloc * sizeof(ngx_str_t));
if (src_tbl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (n_vip_alloc > 0) {
vip_tbl = ngx_pcalloc(r->pool, n_vip_alloc * sizeof(ngx_str_t));
if (vip_tbl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (snaps == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
ngx_shmtx_lock(&slab->mutex);
if (ngx_http_ipng_stats_snapshot_strings(r, sh,
src_tbl, n_src_alloc, &n_src,
vip_tbl, n_vip_alloc, &n_vip) != NGX_OK)
{
ngx_shmtx_unlock(&slab->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_ipng_stats_snapshot_nodes(sh, imcf,
filter_source, filter_vip,
src_tbl, n_src, vip_tbl, n_vip,
snaps, nsnaps_alloc, &nsnaps,
aggs, naggs_alloc, &naggs);
ngx_shmtx_unlock(&slab->mutex);
/* One JSON record per aggregated (source, vip). Size upper-bound
* accounts for: fixed overhead, up to NCLASSES class entries, 4
* histograms. */
rec_sz = 512
+ 160 * NGX_HTTP_IPNG_STATS_NCLASSES
+ 48 * (2 * (nb + 1) + 2 * (nbb + 1))
+ 4 * 128;
for (i = 0; i < naggs; i++) {
a = &aggs[i];
ngx_str_t *src = &src_tbl[a->source_id];
ngx_str_t *vip = &vip_tbl[a->vip_id];
u_char *p;
ngx_uint_t first_class = 1;
cl = ngx_http_ipng_stats_chain_buf(r, rec_sz);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
p = cl->buf->last;
p = ngx_sprintf(p,
"%s{\"source_tag\":\"%V\",\"vip\":\"%V\",\"classes\":{",
(emitted == 0) ? "" : ",", src, vip);
for (j = 0; j < nsnaps; j++) {
if (snaps[j].source_id != a->source_id
|| snaps[j].vip_id != a->vip_id) continue;
p = ngx_sprintf(p,
"%s\"%s\":{\"requests\":%uL,\"bytes_in\":%uL,"
"\"bytes_out\":%uL,\"latency_ms\":%uL,"
"\"upstream_latency_ms\":%uL}",
first_class ? "" : ",",
ngx_http_ipng_stats_class_label(snaps[j].class),
snaps[j].requests, snaps[j].bytes_in,
snaps[j].bytes_out, snaps[j].duration_sum_ms,
snaps[j].upstream_sum_ms);
first_class = 0;
}
p = ngx_sprintf(p, "},\"request_duration_ms\":{\"sum\":%uL,"
"\"count\":%uL,\"buckets\":{",
a->duration_sum_ms, a->req_total);
for (j = 0; j <= nb; j++) {
if (j == nb) {
p = ngx_sprintf(p, "%s\"+Inf\":%uL", j == 0 ? "" : ",",
a->dhist[j]);
} else {
p = ngx_sprintf(p, "%s\"%ui\":%uL", j == 0 ? "" : ",",
imcf->bucket_bounds_ms[j], a->dhist[j]);
}
}
p = ngx_sprintf(p, "}},\"upstream_response_ms\":{\"sum\":%uL,"
"\"count\":%uL,\"buckets\":{",
a->upstream_sum_ms, a->up_total);
for (j = 0; j <= nb; j++) {
if (j == nb) {
p = ngx_sprintf(p, "%s\"+Inf\":%uL", j == 0 ? "" : ",",
a->uhist[j]);
} else {
p = ngx_sprintf(p, "%s\"%ui\":%uL", j == 0 ? "" : ",",
imcf->bucket_bounds_ms[j], a->uhist[j]);
}
}
p = ngx_sprintf(p,
"}},\"bytes_in\":{\"sum\":%uL,\"count\":%uL,\"buckets\":{",
a->bytes_in_sum, a->req_total);
for (j = 0; j <= nbb; j++) {
if (j == nbb) {
p = ngx_sprintf(p, "%s\"+Inf\":%uL", j == 0 ? "" : ",",
a->bin_hist[j]);
} else {
p = ngx_sprintf(p, "%s\"%ui\":%uL", j == 0 ? "" : ",",
imcf->byte_bucket_bounds[j], a->bin_hist[j]);
}
}
p = ngx_sprintf(p,
"}},\"bytes_out\":{\"sum\":%uL,\"count\":%uL,\"buckets\":{",
a->bytes_out_sum, a->req_total);
for (j = 0; j <= nbb; j++) {
if (j == nbb) {
p = ngx_sprintf(p, "%s\"+Inf\":%uL", j == 0 ? "" : ",",
a->bout_hist[j]);
} else {
p = ngx_sprintf(p, "%s\"%ui\":%uL", j == 0 ? "" : ",",
imcf->byte_bucket_bounds[j], a->bout_hist[j]);
}
}
p = ngx_sprintf(p, "}}}");
cl->buf->last = p;
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
emitted++;
}
cl = ngx_http_ipng_stats_chain_buf(r, 8);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
cl->buf->last = ngx_sprintf(cl->buf->last, "]}\n");
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return ngx_http_ipng_stats_send(r, &ctype, out);
}