Files
nginx-ipng-stats-plugin/src/ngx_http_ipng_stats_module.c
Pim van Pelt df05bae8a3 Support multiple device-pinned listens sharing a single port
Nginx's config-level duplicate-listen check rejected the
documented pattern of `listen 80 device=X ipng_source_tag=A;
listen 80 device=Y ipng_source_tag=B;` with "a duplicate listen
0.0.0.0:80", and even when the dedup was bypassed the kernel
refused the second bind() because the first socket was already
holding the port without SO_BINDTODEVICE.

The listen wrapper now detects same-sockaddr duplicates before
the core handler sees them and records them with `needs_clone=1`.
In init_module, phase 1 clones an ngx_listening_t for each such
duplicate, phase 3 closes every inherited naked fd, and phase 4
rebinds every target with SO_REUSEADDR + SO_REUSEPORT +
SO_BINDTODEVICE set before bind(). SO_REUSEPORT keeps
`nginx -s reload` from colliding with the still-bound sockets
held by old workers during graceful drain; IPV6_V6ONLY matches
nginx's default so the IPv6 listen doesn't claim the IPv4
wildcard and collide with sibling IPv4-specific listens.

Restructure 01-module to cover the pattern end-to-end: four
device-pinned listens on port 8080 (eth1 shares tag `tag1`
across v4 and v6; eth2 splits into `tag2-v4` / `tag2-v6`),
clients and server both get IPv6 addresses, and a new
"Per-(device, family) request count accuracy" case proves that
10 requests on each of the four combinations yields tag1=20,
tag2-v4=10, tag2-v6=10. Mgmt/direct traffic moves to port 9180
so it no longer clashes with the shared-port wildcards.

Document the constraint in docs/user-guide.md: all listens on
a given port must carry `device=`, and direct traffic belongs
on a separate port.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:45:40 +02:00

2976 lines
102 KiB
C

/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2026 Pim van Pelt <pim@ipng.ch>
*
* ngx_http_ipng_stats_module — per-VIP, per-device traffic counters for
* nginx hosts receiving traffic on distinct interfaces.
*
* See docs/design.md in the repository for the full design. The short
* version is:
*
* - Attribution is done by the Linux kernel's TCP socket lookup, via
* SO_BINDTODEVICE on per-tunnel listening sockets. Each `listen`
* directive may carry `device=<ifname>` and `ipng_source_tag=<tag>`
* parameter; this module parses them by replacing the stock
* ngx_http_core_module `listen` command handler at preconfig time.
*
* - Counters are maintained per-worker in a private table (no locks,
* no atomics on the request path) and flushed into a shared-memory
* zone via a per-worker timer. The scrape handler reads only from
* the shared zone.
*
* - The scrape handler content-negotiates between Prometheus text and
* JSON output, filtered server-side by optional `?source_tag=` and
* `?vip=` query parameters.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <net/if.h>
#include <sys/socket.h>
#include <arpa/inet.h>
/* The log module's op/format types are private to ngx_http_log_module.c
* (no public header). We duplicate the struct layouts under our own
* names so we can call the compiled log_format ops. The layouts have
* been stable since nginx 0.7.x. */
typedef struct ipng_log_op_s ipng_log_op_t;
typedef u_char *(*ipng_log_op_run_pt)(ngx_http_request_t *r, u_char *buf,
ipng_log_op_t *op);
typedef size_t (*ipng_log_op_getlen_pt)(ngx_http_request_t *r,
uintptr_t data);
struct ipng_log_op_s {
size_t len;
ipng_log_op_getlen_pt getlen;
ipng_log_op_run_pt run;
uintptr_t data;
};
typedef struct {
ngx_str_t name;
ngx_array_t *flushes;
ngx_array_t *ops; /* array of ipng_log_op_t */
} ipng_log_fmt_t;
typedef struct {
ngx_array_t formats; /* array of ipng_log_fmt_t */
ngx_uint_t combined_used;
} ipng_log_main_conf_t;
extern ngx_module_t ngx_http_log_module;
/* NGX_HTTP_IPNG_STATS_VERSION is generated at build time from the
* top-level Makefile's VERSION variable — see `make version-header`. */
#include "version.h"
#define NGX_HTTP_IPNG_STATS_SCHEMA_VERSION 2
/* Default histogram buckets in milliseconds (FR-2.3). */
#define NGX_HTTP_IPNG_STATS_DEFAULT_BUCKETS \
{ 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000 }
#define NGX_HTTP_IPNG_STATS_DEFAULT_BUCKET_COUNT 12
/* Default histogram buckets for request/response sizes, in bytes. */
#define NGX_HTTP_IPNG_STATS_DEFAULT_BYTE_BUCKETS \
{ 100, 1000, 10000, 100000, 1000000, 10000000 }
#define NGX_HTTP_IPNG_STATS_DEFAULT_BYTE_BUCKET_COUNT 6
/* Status-code cardinality reduction: we bin every response into one of six
* classes — 1xx/2xx/3xx/4xx/5xx, plus a catch-all for codes outside
* [100, 599] (NFR-3.3). Operators who need full three-digit breakdown
* should use ipng_stats_logtail (FR-8); the stats zone is deliberately
* narrow. */
#define NGX_HTTP_IPNG_STATS_CODE_MIN 100
#define NGX_HTTP_IPNG_STATS_CODE_MAX 599
#define NGX_HTTP_IPNG_STATS_CLASS_UNKNOWN 0
#define NGX_HTTP_IPNG_STATS_NCLASSES 6
/* A single per-worker slot key. */
typedef struct {
ngx_str_t source; /* points into shared-zone interning table */
ngx_str_t vip; /* ditto */
ngx_uint_t class; /* 0 unknown, 1..5 for 1xx..5xx */
} ngx_http_ipng_stats_key_t;
/* Shared-zone node: stored in an rbtree keyed by a 32-bit hash of the
* (source, vip, code) tuple. Collisions are resolved by walking the
* rbtree's duplicate chain and comparing full keys.
*
* Counter lanes are 64-bit and updated via relaxed atomic fetch_add
* (NFR-1.2). Histogram lanes live in an ngx_palloc'd tail allocation
* whose size is determined by the configured bucket count. */
typedef struct {
ngx_rbtree_node_t rbnode; /* key = hash */
ngx_queue_t lru; /* in-zone LRU for eviction on rename (NFR-4.4) */
ngx_uint_t source_id;
ngx_uint_t vip_id;
ngx_uint_t class; /* 0 unknown, 1..5 for 1xx..5xx */
ngx_atomic_uint_t requests;
ngx_atomic_uint_t bytes_in;
ngx_atomic_uint_t bytes_out;
ngx_atomic_uint_t duration_sum_ms;
ngx_atomic_uint_t upstream_sum_ms;
/* Followed by histogram lanes, in order:
* [0 .. nb] -> request duration (nb = nbuckets)
* [nb+1 .. 2nb+1] -> upstream duration
* [2nb+2 .. 2nb+2+nbb] -> bytes_in (nbb = nbytebuckets)
* [2nb+3+nbb .. 2nb+3+2nbb] -> bytes_out
*/
} ngx_http_ipng_stats_node_t;
/* Per-worker local slot — identical shape to the shared node but without
* atomics. We maintain a small dynamic array of these, plus a "dirty
* list" head pointing into the array, so that the flush tick only walks
* entries touched since the last flush (FR-4.2, NFR-2.2). */
typedef struct ngx_http_ipng_stats_slot_s {
ngx_uint_t hash;
ngx_uint_t source_id;
ngx_uint_t vip_id;
ngx_uint_t class;
/* Deltas since last flush. */
uint64_t requests;
uint64_t bytes_in;
uint64_t bytes_out;
uint64_t duration_sum_ms;
uint64_t upstream_sum_ms;
uint64_t *dhist; /* nbuckets+1 lanes */
uint64_t *uhist;
uint64_t *bin_hist; /* nbytebuckets+1 lanes */
uint64_t *bout_hist;
/* Intrusive "dirty" linked list — dirty_next == NULL and
* !is_dirty_head means "not on the list". */
struct ngx_http_ipng_stats_slot_s *dirty_next;
unsigned dirty:1;
} ngx_http_ipng_stats_slot_t;
/* String interning tables live at the head of the shared-memory zone.
* They're flat arrays of ngx_str_t whose data pointers reference memory
* allocated from the zone's slab pool. Workers look up strings by
* sequential scan — the cardinality is small (tens) so this is fine. */
typedef struct {
ngx_str_t *entries;
ngx_uint_t nelts;
ngx_uint_t nalloc;
} ngx_http_ipng_stats_intern_t;
/* Header of the shared-memory zone. */
typedef struct {
uint32_t magic; /* 0x49504E47 "IPNG" when initialized */
ngx_rbtree_t rbtree;
ngx_rbtree_node_t sentinel;
ngx_queue_t lru;
ngx_http_ipng_stats_intern_t sources;
ngx_http_ipng_stats_intern_t vips;
/* Meta-counters for the plugin itself (FR-6 observability of
* the plugin in the design doc). */
ngx_atomic_uint_t zone_full_events;
ngx_atomic_uint_t flushes_total;
} ngx_http_ipng_stats_shctx_t;
/* Per-listen binding recorded by the listen wrapper at config parse
* time. Resolved to an ngx_listening_t* at init_module time.
*
* `needs_clone` marks a listen directive that shares a sockaddr with
* an earlier binding. Nginx's core listen handler rejects such
* duplicates with "a duplicate listen ..."; our wrapper therefore
* skips the core call for these, and init_module manufactures an
* ngx_listening_t for each by cloning the template it shares the
* sockaddr with. */
typedef struct {
ngx_str_t device;
ngx_str_t source;
ngx_sockaddr_t sockaddr;
socklen_t socklen;
ngx_listening_t *listening; /* filled at init_module */
unsigned needs_clone:1;
} ngx_http_ipng_stats_binding_t;
typedef struct {
ngx_shm_zone_t *shm_zone;
ngx_str_t zone_name;
size_t zone_size;
ngx_msec_t flush_interval;
ngx_str_t default_source;
ngx_uint_t nbuckets;
ngx_uint_t *bucket_bounds_ms; /* len = nbuckets */
ngx_uint_t nbytebuckets;
ngx_uint_t *byte_bucket_bounds; /* len = nbytebuckets, bytes */
ngx_array_t *bindings; /* ngx_http_ipng_stats_binding_t */
ngx_flag_t enabled;
/* Global logtail (FR-8) — UDP-only. */
ipng_log_fmt_t *logtail_fmt; /* compiled ops from log_format */
struct sockaddr_in logtail_udp_addr; /* destination address */
size_t logtail_buf_size; /* per-worker buffer, default 64k */
ngx_msec_t logtail_flush; /* max flush interval, default 1s */
ngx_uint_t logtail_if_index; /* if=$var: variable index, 0=none */
} ngx_http_ipng_stats_main_conf_t;
typedef struct {
ngx_flag_t enabled;
ngx_flag_t is_scrape_handler;
} ngx_http_ipng_stats_loc_conf_t;
/* Per-worker runtime state. */
typedef struct {
ngx_http_ipng_stats_slot_t *slots;
ngx_uint_t nslots;
ngx_uint_t nalloc;
ngx_http_ipng_stats_slot_t *dirty_head;
ngx_event_t flush_ev;
ngx_log_t *log;
uint64_t *dhist_arena; /* nslots * (nbuckets+1) u64 */
uint64_t *uhist_arena;
uint64_t *bin_arena; /* nslots * (nbytebuckets+1) u64 */
uint64_t *bout_arena;
/* Global logtail buffer. */
u_char *logtail_buf;
u_char *logtail_pos;
u_char *logtail_end;
ngx_event_t logtail_flush_ev;
ngx_socket_t logtail_udp_fd; /* per-worker UDP socket, or -1 */
} ngx_http_ipng_stats_worker_t;
/* Forward decls. */
static ngx_int_t ngx_http_ipng_stats_preconfig(ngx_conf_t *cf);
static ngx_int_t ngx_http_ipng_stats_postconfig(ngx_conf_t *cf);
static void *ngx_http_ipng_stats_create_main_conf(ngx_conf_t *cf);
static char *ngx_http_ipng_stats_init_main_conf(ngx_conf_t *cf, void *conf);
static void *ngx_http_ipng_stats_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_ipng_stats_merge_loc_conf(ngx_conf_t *cf, void *parent,
void *child);
static char *ngx_http_ipng_stats_zone(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_http_ipng_stats_buckets(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_http_ipng_stats_byte_buckets(ngx_conf_t *cf,
ngx_command_t *cmd, void *conf);
static char *ngx_http_ipng_stats_scrape(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_http_ipng_stats_logtail(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static void ngx_http_ipng_stats_logtail_flush_handler(ngx_event_t *ev);
static void ngx_http_ipng_stats_logtail_flush(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf);
static void ngx_http_ipng_stats_logtail_write(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_http_ipng_stats_worker_t *w);
static char *ngx_http_ipng_stats_listen_wrapper(ngx_conf_t *cf,
ngx_command_t *cmd, void *conf);
static ngx_int_t ngx_http_ipng_stats_init_zone(ngx_shm_zone_t *shm_zone,
void *data);
static ngx_int_t ngx_http_ipng_stats_init_module(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_ipng_stats_init_worker(ngx_cycle_t *cycle);
static void ngx_http_ipng_stats_exit_worker(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_ipng_stats_log_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_ipng_stats_content_handler(ngx_http_request_t *r);
static void ngx_http_ipng_stats_flush_timer(ngx_event_t *ev);
static void ngx_http_ipng_stats_do_flush(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf, ngx_log_t *log);
static ngx_uint_t ngx_http_ipng_stats_bucket_index(ngx_uint_t ms,
ngx_uint_t *bounds, ngx_uint_t nbuckets);
static ngx_uint_t ngx_http_ipng_stats_status_index(ngx_uint_t code);
static ngx_int_t ngx_http_ipng_stats_resolve_source(
ngx_http_request_t *r, ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *source_out);
static ngx_int_t ngx_http_ipng_stats_canonical_vip(ngx_http_request_t *r,
u_char *buf, size_t buflen, ngx_str_t *vip_out);
static ngx_int_t ngx_http_ipng_stats_intern_shared(
ngx_http_ipng_stats_shctx_t *sh, ngx_slab_pool_t *slab,
ngx_http_ipng_stats_intern_t *t, ngx_str_t *s, ngx_uint_t *idx_out);
static ngx_int_t ngx_http_ipng_stats_source_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_ipng_stats_render_prom(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_source, ngx_str_t *filter_vip);
static ngx_int_t ngx_http_ipng_stats_render_json(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_source, ngx_str_t *filter_vip);
/* Module-wide globals. */
static char *(*ngx_http_core_listen_orig)(ngx_conf_t *cf,
ngx_command_t *cmd, void *conf) = NULL;
static ngx_http_ipng_stats_worker_t ngx_http_ipng_stats_worker;
extern ngx_module_t ngx_http_core_module;
/* Directives. */
static ngx_command_t ngx_http_ipng_stats_commands[] = {
{ ngx_string("ipng_stats_zone"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_http_ipng_stats_zone,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
{ ngx_string("ipng_stats_flush_interval"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_ipng_stats_main_conf_t, flush_interval),
NULL },
{ ngx_string("ipng_stats_default_source"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_ipng_stats_main_conf_t, default_source),
NULL },
{ ngx_string("ipng_stats_buckets"),
NGX_HTTP_MAIN_CONF|NGX_CONF_1MORE,
ngx_http_ipng_stats_buckets,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
{ ngx_string("ipng_stats_byte_buckets"),
NGX_HTTP_MAIN_CONF|NGX_CONF_1MORE,
ngx_http_ipng_stats_byte_buckets,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
{ ngx_string("ipng_stats"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF
|NGX_CONF_NOARGS|NGX_CONF_TAKE1,
ngx_http_ipng_stats_scrape,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_ipng_stats_loc_conf_t, enabled),
NULL },
{ ngx_string("ipng_stats_logtail"),
NGX_HTTP_MAIN_CONF|NGX_CONF_2MORE,
ngx_http_ipng_stats_logtail,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
ngx_null_command
};
static ngx_http_module_t ngx_http_ipng_stats_module_ctx = {
ngx_http_ipng_stats_preconfig, /* preconfiguration */
ngx_http_ipng_stats_postconfig, /* postconfiguration */
ngx_http_ipng_stats_create_main_conf, /* create main configuration */
ngx_http_ipng_stats_init_main_conf, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
ngx_http_ipng_stats_create_loc_conf, /* create location configuration */
ngx_http_ipng_stats_merge_loc_conf /* merge location configuration */
};
ngx_module_t ngx_http_ipng_stats_module = {
NGX_MODULE_V1,
&ngx_http_ipng_stats_module_ctx, /* module context */
ngx_http_ipng_stats_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
ngx_http_ipng_stats_init_module, /* init module */
ngx_http_ipng_stats_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
ngx_http_ipng_stats_exit_worker, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
/* ----------------------------------------------------------------- */
/* Preconfig: replace ngx_http_core_module's `listen` handler. */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_preconfig(ngx_conf_t *cf)
{
ngx_command_t *cmd;
/* ngx_http_core_commands is not const in the nginx tree, so we can
* rebind its `set` function pointer here. We only do this once per
* process, the first time preconfig runs; subsequent reloads reuse
* the same wrapper without re-saving the original. */
if (ngx_http_core_listen_orig != NULL) {
return NGX_OK;
}
for (cmd = ngx_http_core_module.commands; cmd->name.len != 0; cmd++) {
if (cmd->name.len == sizeof("listen") - 1
&& ngx_strncmp(cmd->name.data, "listen", 6) == 0)
{
ngx_http_core_listen_orig = cmd->set;
cmd->set = ngx_http_ipng_stats_listen_wrapper;
return NGX_OK;
}
}
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"ipng_stats: could not locate ngx_http_core_module "
"\"listen\" directive to wrap");
return NGX_ERROR;
}
/* The wrapper extracts device= and ipng_source_tag= from cf->args, compacting
* the array in place, then calls the original ngx_http_core_module
* listen handler. After a successful call it records a binding in
* imcf->bindings using the last listen_opt from the current core srv
* conf, to be resolved to an ngx_listening_t* at init_module time. */
static char *
ngx_http_ipng_stats_listen_wrapper(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf)
{
ngx_str_t *value;
ngx_str_t device, source;
ngx_uint_t i, j;
char *rv;
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_ipng_stats_binding_t *b;
ngx_url_t u;
ngx_str_null(&device);
ngx_str_null(&source);
value = cf->args->elts;
i = 1;
while (i < cf->args->nelts) {
if (value[i].len > 7
&& ngx_strncmp(value[i].data, "device=", 7) == 0)
{
device.data = value[i].data + 7;
device.len = value[i].len - 7;
for (j = i; j + 1 < cf->args->nelts; j++) {
value[j] = value[j + 1];
}
cf->args->nelts--;
continue;
}
if (value[i].len > 16
&& ngx_strncmp(value[i].data, "ipng_source_tag=", 16) == 0)
{
source.data = value[i].data + 16;
source.len = value[i].len - 16;
for (j = i; j + 1 < cf->args->nelts; j++) {
value[j] = value[j + 1];
}
cf->args->nelts--;
continue;
}
i++;
}
if (device.len == 0 && source.len == 0) {
/* Plain listen with no module-specific parameters — let nginx
* handle it end-to-end. */
return ngx_http_core_listen_orig(cf, cmd, conf);
}
/* Force nginx to bind a dedicated socket for this address rather
* than folding it into a wildcard. Without `bind`, nginx's listen
* optimizer discards specific-address entries covered by a
* wildcard, which prevents us from applying SO_BINDTODEVICE. */
ngx_str_t *bind_arg = ngx_array_push(cf->args);
if (bind_arg == NULL) {
return NGX_CONF_ERROR;
}
ngx_str_set(bind_arg, "bind");
/* Parse the listen address ourselves: we need the sockaddr to
* detect duplicates (multiple `listen 80 device=X` at the same
* addr) and to match bindings to cycle->listening[] entries in
* init_module. */
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1];
u.listen = 1;
u.default_port = 80;
if (ngx_parse_url(cf->pool, &u) != NGX_OK || u.naddrs == 0) {
return ngx_http_core_listen_orig(cf, cmd, conf);
}
imcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_ipng_stats_module);
if (imcf->bindings == NULL) {
imcf->bindings = ngx_array_create(cf->pool, 8,
sizeof(ngx_http_ipng_stats_binding_t));
if (imcf->bindings == NULL) {
return NGX_CONF_ERROR;
}
}
/* Call the core handler at most once per (addr, port). Any
* subsequent listen at the same sockaddr would hit nginx's
* duplicate-listen check — we skip it and let init_module clone
* the first-seen listening entry for each duplicate. */
ngx_http_ipng_stats_binding_t *existing = imcf->bindings->elts;
ngx_uint_t dup = 0;
for (i = 0; i < imcf->bindings->nelts; i++) {
if (existing[i].socklen == u.addrs[0].socklen
&& ngx_cmp_sockaddr((struct sockaddr *) &existing[i].sockaddr,
existing[i].socklen,
u.addrs[0].sockaddr,
u.addrs[0].socklen, 1) == NGX_OK)
{
dup = 1;
break;
}
}
if (!dup) {
rv = ngx_http_core_listen_orig(cf, cmd, conf);
if (rv != NGX_CONF_OK) {
return rv;
}
}
/* Record one binding per resolved address (ngx_parse_url may yield
* multiple for a hostname; listen specs use literal addresses so
* naddrs is almost always 1). */
for (i = 0; i < u.naddrs; i++) {
b = ngx_array_push(imcf->bindings);
if (b == NULL) {
return NGX_CONF_ERROR;
}
ngx_memzero(b, sizeof(*b));
if (device.len > 0) {
b->device.data = ngx_pnalloc(cf->pool, device.len);
if (b->device.data == NULL) {
return NGX_CONF_ERROR;
}
ngx_memcpy(b->device.data, device.data, device.len);
b->device.len = device.len;
}
if (source.len > 0) {
b->source.data = ngx_pnalloc(cf->pool, source.len);
if (b->source.data == NULL) {
return NGX_CONF_ERROR;
}
ngx_memcpy(b->source.data, source.data, source.len);
b->source.len = source.len;
} else if (device.len > 0) {
/* FR-1.4: default source = device name. */
b->source = b->device;
}
b->socklen = u.addrs[i].socklen;
ngx_memcpy(&b->sockaddr, u.addrs[i].sockaddr, u.addrs[i].socklen);
b->needs_clone = dup ? 1 : 0;
}
return NGX_CONF_OK;
}
/* ----------------------------------------------------------------- */
/* Config create/merge/init */
/* ----------------------------------------------------------------- */
static void *
ngx_http_ipng_stats_create_main_conf(ngx_conf_t *cf)
{
ngx_http_ipng_stats_main_conf_t *imcf;
imcf = ngx_pcalloc(cf->pool, sizeof(*imcf));
if (imcf == NULL) {
return NULL;
}
imcf->flush_interval = NGX_CONF_UNSET_MSEC;
imcf->nbuckets = 0;
imcf->bucket_bounds_ms = NULL;
imcf->nbytebuckets = 0;
imcf->byte_bucket_bounds = NULL;
imcf->enabled = NGX_CONF_UNSET;
return imcf;
}
static char *
ngx_http_ipng_stats_init_main_conf(ngx_conf_t *cf, void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
static const ngx_uint_t default_bounds[] = NGX_HTTP_IPNG_STATS_DEFAULT_BUCKETS;
ngx_uint_t i;
ngx_conf_init_msec_value(imcf->flush_interval, 1000);
if (imcf->flush_interval < 100) {
ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
"ipng_stats_flush_interval must be at least 100ms");
return NGX_CONF_ERROR;
}
if (imcf->default_source.len == 0) {
ngx_str_set(&imcf->default_source, "direct");
}
if (imcf->nbuckets == 0) {
imcf->nbuckets = NGX_HTTP_IPNG_STATS_DEFAULT_BUCKET_COUNT;
imcf->bucket_bounds_ms = ngx_palloc(cf->pool,
imcf->nbuckets * sizeof(ngx_uint_t));
if (imcf->bucket_bounds_ms == NULL) {
return NGX_CONF_ERROR;
}
for (i = 0; i < imcf->nbuckets; i++) {
imcf->bucket_bounds_ms[i] = default_bounds[i];
}
}
if (imcf->nbytebuckets == 0) {
static const ngx_uint_t default_byte_bounds[] =
NGX_HTTP_IPNG_STATS_DEFAULT_BYTE_BUCKETS;
imcf->nbytebuckets = NGX_HTTP_IPNG_STATS_DEFAULT_BYTE_BUCKET_COUNT;
imcf->byte_bucket_bounds = ngx_palloc(cf->pool,
imcf->nbytebuckets * sizeof(ngx_uint_t));
if (imcf->byte_bucket_bounds == NULL) {
return NGX_CONF_ERROR;
}
for (i = 0; i < imcf->nbytebuckets; i++) {
imcf->byte_bucket_bounds[i] = default_byte_bounds[i];
}
}
if (imcf->enabled == NGX_CONF_UNSET) {
imcf->enabled = 1;
}
/* logtail_fmt, logtail_udp_addr, logtail_buf_size, logtail_flush
* are set by ipng_stats_logtail if the directive is present; they
* default to NULL/0 from pcalloc. */
return NGX_CONF_OK;
}
static void *
ngx_http_ipng_stats_create_loc_conf(ngx_conf_t *cf)
{
ngx_http_ipng_stats_loc_conf_t *ilcf;
ilcf = ngx_pcalloc(cf->pool, sizeof(*ilcf));
if (ilcf == NULL) {
return NULL;
}
ilcf->enabled = NGX_CONF_UNSET;
ilcf->is_scrape_handler = NGX_CONF_UNSET;
return ilcf;
}
static char *
ngx_http_ipng_stats_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_ipng_stats_loc_conf_t *prev = parent;
ngx_http_ipng_stats_loc_conf_t *conf = child;
ngx_conf_merge_value(conf->enabled, prev->enabled, 1);
ngx_conf_merge_value(conf->is_scrape_handler, prev->is_scrape_handler, 0);
/* Scrape-handler locations never count themselves (FR-5.5). */
if (conf->is_scrape_handler) {
conf->enabled = 0;
}
return NGX_CONF_OK;
}
/* ipng_stats_zone name:size */
static char *
ngx_http_ipng_stats_zone(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
ngx_str_t *value;
u_char *p;
ssize_t size;
ngx_str_t name;
value = cf->args->elts;
p = (u_char *) ngx_strchr(value[1].data, ':');
if (p == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid ipng_stats_zone \"%V\"; expected name:size",
&value[1]);
return NGX_CONF_ERROR;
}
name.data = value[1].data;
name.len = p - value[1].data;
p++;
size = ngx_parse_size(&(ngx_str_t){ .data = p,
.len = value[1].len - (p - value[1].data) });
if (size == NGX_ERROR || size < (ssize_t) (64 * 1024)) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_zone \"%V\" size is too small",
&value[1]);
return NGX_CONF_ERROR;
}
imcf->zone_name = name;
imcf->zone_size = size;
imcf->shm_zone = ngx_shared_memory_add(cf, &name, size,
&ngx_http_ipng_stats_module);
if (imcf->shm_zone == NULL) {
return NGX_CONF_ERROR;
}
imcf->shm_zone->init = ngx_http_ipng_stats_init_zone;
imcf->shm_zone->data = imcf;
return NGX_CONF_OK;
}
/* ipng_stats_buckets <ms> <ms> ... */
static char *
ngx_http_ipng_stats_buckets(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
ngx_str_t *value;
ngx_uint_t i, prev, n;
value = cf->args->elts;
n = cf->args->nelts - 1;
if (n < 1) {
return "requires at least one bucket boundary";
}
imcf->bucket_bounds_ms = ngx_palloc(cf->pool, n * sizeof(ngx_uint_t));
if (imcf->bucket_bounds_ms == NULL) {
return NGX_CONF_ERROR;
}
imcf->nbuckets = n;
prev = 0;
for (i = 0; i < n; i++) {
ngx_int_t v = ngx_atoi(value[i + 1].data, value[i + 1].len);
if (v == NGX_ERROR || v <= 0 || (ngx_uint_t) v <= prev) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_buckets values must be strictly increasing positive "
"integers; got \"%V\"", &value[i + 1]);
return NGX_CONF_ERROR;
}
imcf->bucket_bounds_ms[i] = (ngx_uint_t) v;
prev = (ngx_uint_t) v;
}
return NGX_CONF_OK;
}
/* ipng_stats_byte_buckets <bytes> <bytes> ... */
static char *
ngx_http_ipng_stats_byte_buckets(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
ngx_str_t *value;
ngx_uint_t i, prev, n;
value = cf->args->elts;
n = cf->args->nelts - 1;
if (n < 1) {
return "requires at least one bucket boundary";
}
imcf->byte_bucket_bounds = ngx_palloc(cf->pool, n * sizeof(ngx_uint_t));
if (imcf->byte_bucket_bounds == NULL) {
return NGX_CONF_ERROR;
}
imcf->nbytebuckets = n;
prev = 0;
for (i = 0; i < n; i++) {
ssize_t v = ngx_parse_size(&value[i + 1]);
if (v == NGX_ERROR || v <= 0 || (ngx_uint_t) v <= prev) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_byte_buckets values must be strictly increasing "
"positive sizes; got \"%V\"", &value[i + 1]);
return NGX_CONF_ERROR;
}
imcf->byte_bucket_bounds[i] = (ngx_uint_t) v;
prev = (ngx_uint_t) v;
}
return NGX_CONF_OK;
}
static char *
ngx_http_ipng_stats_scrape(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_ipng_stats_loc_conf_t *ilcf = conf;
ngx_http_core_loc_conf_t *clcf;
ngx_str_t *value;
/* Argument-less form inside a location block turns that location
* into the scrape handler. */
if (cf->args->nelts == 1) {
clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
clcf->handler = ngx_http_ipng_stats_content_handler;
ilcf->is_scrape_handler = 1;
ilcf->enabled = 0;
return NGX_CONF_OK;
}
/* With an on/off argument, toggles counting in the current context. */
value = cf->args->elts;
if (value[1].len == 2 && ngx_strncmp(value[1].data, "on", 2) == 0) {
ilcf->enabled = 1;
return NGX_CONF_OK;
}
if (value[1].len == 3 && ngx_strncmp(value[1].data, "off", 3) == 0) {
ilcf->enabled = 0;
return NGX_CONF_OK;
}
return "expects \"on\", \"off\", or no argument";
}
/* ipng_stats_logtail <format_name> udp://host:port [buffer=<size>] [flush=<duration>]
*
* Enables a global access log that fires for every request via our
* log-phase handler, using a compiled log_format looked up by name
* from ngx_http_log_module. Per-worker buffered UDP datagrams. */
static char *
ngx_http_ipng_stats_logtail(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_ipng_stats_main_conf_t *imcf = conf;
ipng_log_main_conf_t *lmcf;
ipng_log_fmt_t *fmt;
ngx_str_t *value;
ngx_uint_t i;
ssize_t buf_size;
ngx_msec_t flush_ms;
if (imcf->logtail_fmt != NULL) {
return "is duplicate";
}
value = cf->args->elts;
/* Look up the named log_format in ngx_http_log_module. */
lmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_log_module);
if (lmcf == NULL) {
return "ngx_http_log_module is not available";
}
fmt = lmcf->formats.elts;
imcf->logtail_fmt = NULL;
for (i = 0; i < lmcf->formats.nelts; i++) {
if (fmt[i].name.len == value[1].len
&& ngx_strncmp(fmt[i].name.data, value[1].data, value[1].len) == 0)
{
imcf->logtail_fmt = &fmt[i];
break;
}
}
if (imcf->logtail_fmt == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: unknown log_format \"%V\"",
&value[1]);
return NGX_CONF_ERROR;
}
/* Destination: udp://host:port (only UDP is supported). */
if (value[2].len <= 6
|| ngx_strncmp(value[2].data, "udp://", 6) != 0)
{
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: destination must be udp://host:port, "
"got \"%V\"", &value[2]);
return NGX_CONF_ERROR;
}
{
/* Parse udp://host:port */
u_char *colon;
ngx_str_t host_str, port_str;
ngx_int_t port;
struct in_addr addr;
host_str.data = value[2].data + 6;
host_str.len = value[2].len - 6;
colon = ngx_strlchr(host_str.data, host_str.data + host_str.len, ':');
if (colon == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: udp:// requires host:port, got \"%V\"",
&value[2]);
return NGX_CONF_ERROR;
}
port_str.data = colon + 1;
port_str.len = host_str.len - (size_t)(colon + 1 - host_str.data);
host_str.len = (size_t)(colon - host_str.data);
port = ngx_atoi(port_str.data, port_str.len);
if (port < 1 || port > 65535) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: invalid UDP port in \"%V\"", &value[2]);
return NGX_CONF_ERROR;
}
/* Resolve host — only literal IPv4 for simplicity. */
u_char tmp[16];
if (host_str.len >= sizeof(tmp)) {
return "udp:// host too long";
}
ngx_memcpy(tmp, host_str.data, host_str.len);
tmp[host_str.len] = '\0';
if (inet_aton((char *) tmp, &addr) == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: invalid IPv4 address in \"%V\"",
&value[2]);
return NGX_CONF_ERROR;
}
ngx_memzero(&imcf->logtail_udp_addr, sizeof(struct sockaddr_in));
imcf->logtail_udp_addr.sin_family = AF_INET;
imcf->logtail_udp_addr.sin_port = htons((in_port_t) port);
imcf->logtail_udp_addr.sin_addr = addr;
}
/* Defaults. */
buf_size = 64 * 1024;
flush_ms = 1000;
/* Parse optional key=value parameters. */
for (i = 3; i < cf->args->nelts; i++) {
if (value[i].len > 7
&& ngx_strncmp(value[i].data, "buffer=", 7) == 0)
{
ngx_str_t s = { value[i].len - 7, value[i].data + 7 };
buf_size = ngx_parse_size(&s);
if (buf_size == NGX_ERROR || buf_size < 1024) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: invalid buffer size \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
continue;
}
if (value[i].len > 6
&& ngx_strncmp(value[i].data, "flush=", 6) == 0)
{
ngx_str_t s = { value[i].len - 6, value[i].data + 6 };
flush_ms = ngx_parse_time(&s, 0);
if (flush_ms == (ngx_msec_t) NGX_ERROR || flush_ms < 100) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: invalid flush interval \"%V\"",
&value[i]);
return NGX_CONF_ERROR;
}
continue;
}
if (value[i].len > 3
&& ngx_strncmp(value[i].data, "if=", 3) == 0)
{
ngx_str_t var_name = { value[i].len - 3, value[i].data + 3 };
if (var_name.len < 2 || var_name.data[0] != '$') {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: if= requires a $variable, "
"got \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}
var_name.data++;
var_name.len--;
imcf->logtail_if_index = ngx_http_get_variable_index(cf,
&var_name);
if (imcf->logtail_if_index == (ngx_uint_t) NGX_ERROR) {
return NGX_CONF_ERROR;
}
/* Shift from 0-based to 1-based so 0 means "not set". */
imcf->logtail_if_index++;
continue;
}
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"ipng_stats_logtail: unknown parameter \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}
imcf->logtail_buf_size = (size_t) buf_size;
imcf->logtail_flush = flush_ms;
return NGX_CONF_OK;
}
/* ----------------------------------------------------------------- */
/* Postconfig: install log-phase handler */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_postconfig(ngx_conf_t *cf)
{
ngx_http_handler_pt *h;
ngx_http_core_main_conf_t *cmcf;
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_variable_t *var;
ngx_str_t v_source = ngx_string("ipng_source_tag");
/* Register $ipng_source_tag unconditionally — it's useful in
* log_format, map, add_header, etc. even when ipng_stats_zone
* isn't configured. For the VIP address, operators use nginx's
* built-in $server_addr, which is functionally identical. */
var = ngx_http_add_variable(cf, &v_source, NGX_HTTP_VAR_NOCACHEABLE);
if (var == NULL) {
return NGX_ERROR;
}
var->get_handler = ngx_http_ipng_stats_source_variable;
imcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_ipng_stats_module);
/* Loading the module into a config that doesn't use it must be a
* no-op — operators may install the Debian package on hosts where
* they haven't yet added `ipng_stats_zone`. In that case we simply
* skip installing the log handler and sit idle. */
if (imcf->shm_zone == NULL) {
return NGX_OK;
}
cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);
h = ngx_array_push(&cmcf->phases[NGX_HTTP_LOG_PHASE].handlers);
if (h == NULL) {
return NGX_ERROR;
}
*h = ngx_http_ipng_stats_log_handler;
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* Shared-memory zone init */
/* ----------------------------------------------------------------- */
static void
ngx_http_ipng_stats_rbtree_insert(ngx_rbtree_node_t *temp,
ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
{
/* Plain hash-keyed rbtree insertion; duplicate keys (hash
* collisions) are resolved by walking sibling chains at lookup
* time. */
for ( ;; ) {
if (node->key < temp->key) {
if (temp->left == sentinel) {
temp->left = node;
break;
}
temp = temp->left;
} else {
if (temp->right == sentinel) {
temp->right = node;
break;
}
temp = temp->right;
}
}
node->parent = temp;
node->left = sentinel;
node->right = sentinel;
ngx_rbt_red(node);
}
static ngx_int_t
ngx_http_ipng_stats_init_zone(ngx_shm_zone_t *shm_zone, void *data)
{
ngx_http_ipng_stats_main_conf_t *imcf = shm_zone->data;
ngx_slab_pool_t *slab;
ngx_http_ipng_stats_shctx_t *sh;
slab = (ngx_slab_pool_t *) shm_zone->shm.addr;
/* On reload, nginx may or may not set shm.exists depending on
* dynamic-module lifecycle subtleties. As a belt-and-suspenders
* check, also look at slab->data: if our shctx is already pinned
* there from a previous cycle, the zone is reusable regardless
* of what shm.exists says. */
sh = slab->data;
if (sh != NULL && sh->magic == 0x49504E47u /* "IPNG" */) {
ngx_log_error(NGX_LOG_NOTICE, shm_zone->shm.log, 0,
"ipng_stats: init_zone: reusing existing zone "
"(sources=%ui, vips=%ui)",
sh->sources.nelts, sh->vips.nelts);
shm_zone->data = sh;
return NGX_OK;
}
sh = ngx_slab_alloc(slab, sizeof(*sh));
if (sh == NULL) {
return NGX_ERROR;
}
ngx_memzero(sh, sizeof(*sh));
sh->magic = 0x49504E47u; /* "IPNG" */
slab->data = sh;
ngx_rbtree_init(&sh->rbtree, &sh->sentinel,
ngx_http_ipng_stats_rbtree_insert);
ngx_queue_init(&sh->lru);
sh->sources.nalloc = 16;
sh->sources.entries = ngx_slab_alloc(slab,
sh->sources.nalloc * sizeof(ngx_str_t));
if (sh->sources.entries == NULL) {
return NGX_ERROR;
}
sh->sources.nelts = 0;
sh->vips.nalloc = 64;
sh->vips.entries = ngx_slab_alloc(slab,
sh->vips.nalloc * sizeof(ngx_str_t));
if (sh->vips.entries == NULL) {
return NGX_ERROR;
}
sh->vips.nelts = 0;
imcf->shm_zone->data = sh;
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* init_module: rebind listen sockets with SO_BINDTODEVICE */
/* ----------------------------------------------------------------- */
/* Create a device-pinned listening socket: socket + SO_REUSEADDR +
* SO_REUSEPORT + SO_BINDTODEVICE (set *before* bind, which is what
* lets the kernel permit multiple sockets on the same wildcard
* addr+port) + bind + listen + nonblocking.
*
* SO_REUSEPORT is required so that on `nginx -s reload` the new
* master's rebind doesn't collide with the still-bound sockets held
* by the old workers during their graceful-drain window. The kernel
* still uses SO_BINDTODEVICE to filter per device, so traffic
* attribution stays correct.
*
* Returns the new fd, or -1 on failure with errno preserved. */
static ngx_socket_t
ngx_http_ipng_stats_open_dev_socket(ngx_cycle_t *cycle,
struct sockaddr *sa, socklen_t salen, int backlog, ngx_str_t *device)
{
ngx_socket_t s;
int one = 1;
char devname[IFNAMSIZ];
size_t dlen;
s = socket(sa->sa_family, SOCK_STREAM, 0);
if (s == (ngx_socket_t) -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: socket() failed");
return (ngx_socket_t) -1;
}
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: setsockopt(SO_REUSEADDR) failed");
close(s);
return (ngx_socket_t) -1;
}
if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: setsockopt(SO_REUSEPORT) failed");
close(s);
return (ngx_socket_t) -1;
}
#if (NGX_HAVE_INET6)
if (sa->sa_family == AF_INET6) {
/* Match nginx's default (ipv6only=on): keep the [::]:X listen
* strictly IPv6. Without this, Linux's bindv6only=0 default
* makes the socket claim the IPv4 wildcard too, which collides
* with sibling IPv4-specific listens on the same port (e.g. a
* mgmt-address listener). */
if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: setsockopt(IPV6_V6ONLY) failed");
close(s);
return (ngx_socket_t) -1;
}
}
#endif
dlen = device->len < IFNAMSIZ - 1 ? device->len : IFNAMSIZ - 1;
ngx_memcpy(devname, device->data, dlen);
devname[dlen] = '\0';
if (setsockopt(s, SOL_SOCKET, SO_BINDTODEVICE, devname,
(socklen_t) (dlen + 1)) == -1)
{
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: setsockopt(SO_BINDTODEVICE, \"%s\") failed",
devname);
close(s);
return (ngx_socket_t) -1;
}
if (bind(s, sa, salen) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: bind() failed for device \"%s\"", devname);
close(s);
return (ngx_socket_t) -1;
}
if (listen(s, backlog) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: listen() failed for device \"%s\"", devname);
close(s);
return (ngx_socket_t) -1;
}
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: fcntl(O_NONBLOCK) failed");
close(s);
return (ngx_socket_t) -1;
}
return s;
}
static ngx_int_t
ngx_http_ipng_stats_init_module(ngx_cycle_t *cycle)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_ipng_stats_binding_t *bindings;
ngx_listening_t *ls, *tmpl, *dup;
ngx_uint_t i, j;
ngx_uint_t *target_idx;
u_char *claimed;
imcf = ngx_http_cycle_get_module_main_conf(cycle,
ngx_http_ipng_stats_module);
if (imcf == NULL || imcf->bindings == NULL) {
return NGX_OK;
}
bindings = imcf->bindings->elts;
/* Phase 1: append a cloned ngx_listening_t for every duplicate
* binding. ngx_array_push may reallocate elts, so we store indices
* not pointers. `target_idx[j]` is the index into cycle->listening
* that binding j ends up owning. */
target_idx = ngx_pcalloc(cycle->pool,
imcf->bindings->nelts * sizeof(ngx_uint_t));
if (target_idx == NULL) {
return NGX_ERROR;
}
for (j = 0; j < imcf->bindings->nelts; j++) {
if (!bindings[j].needs_clone) continue;
ls = cycle->listening.elts;
tmpl = NULL;
for (i = 0; i < cycle->listening.nelts; i++) {
if (ls[i].socklen == bindings[j].socklen
&& ngx_cmp_sockaddr(ls[i].sockaddr, ls[i].socklen,
(struct sockaddr *) &bindings[j].sockaddr,
bindings[j].socklen, 1) == NGX_OK)
{
tmpl = &ls[i];
break;
}
}
if (tmpl == NULL) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
"ipng_stats: init_module: no template listening "
"found for cloned binding (source=\"%V\")",
&bindings[j].source);
return NGX_ERROR;
}
dup = ngx_array_push(&cycle->listening);
if (dup == NULL) return NGX_ERROR;
*dup = *tmpl;
dup->fd = (ngx_socket_t) -1;
dup->previous = NULL;
target_idx[j] = cycle->listening.nelts - 1;
}
/* Phase 2: map every non-clone binding to an existing listening
* entry (by sockaddr, first unclaimed wins). */
claimed = ngx_pcalloc(cycle->pool, cycle->listening.nelts);
if (claimed == NULL) return NGX_ERROR;
for (j = 0; j < imcf->bindings->nelts; j++) {
if (bindings[j].needs_clone) {
claimed[target_idx[j]] = 1;
continue;
}
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
if (claimed[i]) continue;
if (ls[i].socklen != bindings[j].socklen) continue;
if (ngx_cmp_sockaddr(ls[i].sockaddr, ls[i].socklen,
(struct sockaddr *) &bindings[j].sockaddr,
bindings[j].socklen, 1) != NGX_OK)
{
continue;
}
target_idx[j] = i;
claimed[i] = 1;
goto found;
}
ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
"ipng_stats: init_module: no listening entry for "
"binding (source=\"%V\")", &bindings[j].source);
return NGX_ERROR;
found:
;
}
/* Phase 3: close every pre-bound fd on a target listening entry.
* nginx ran ngx_open_listening_sockets before init_module, so the
* first-seen listen at each sockaddr has a naked bind that would
* block subsequent device-pinned binds on the same addr. Free the
* ports before we rebind. */
ls = cycle->listening.elts;
for (j = 0; j < imcf->bindings->nelts; j++) {
ngx_listening_t *t = &ls[target_idx[j]];
if (t->fd != (ngx_socket_t) -1) {
close(t->fd);
t->fd = (ngx_socket_t) -1;
}
}
/* Phase 4: rebind each target with SO_BINDTODEVICE set before
* bind(). `ls->inherited = 1` tells nginx not to touch the socket
* in any subsequent setup pass. */
for (j = 0; j < imcf->bindings->nelts; j++) {
ngx_listening_t *t = &ls[target_idx[j]];
ngx_socket_t s;
s = ngx_http_ipng_stats_open_dev_socket(cycle,
t->sockaddr, t->socklen, t->backlog, &bindings[j].device);
if (s == (ngx_socket_t) -1) {
return NGX_ERROR;
}
t->fd = s;
t->inherited = 1;
bindings[j].listening = t;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
"ipng_stats: listen %V bound to device \"%V\" "
"(source=\"%V\", fd=%d)",
&t->addr_text, &bindings[j].device,
&bindings[j].source, (int) s);
}
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* Worker init/exit and flush timer */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_init_worker(ngx_cycle_t *cycle)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_ipng_stats_worker_t *w = &ngx_http_ipng_stats_worker;
size_t arena_bytes;
if (ngx_process != NGX_PROCESS_WORKER
&& ngx_process != NGX_PROCESS_SINGLE)
{
return NGX_OK;
}
imcf = ngx_http_cycle_get_module_main_conf(cycle,
ngx_http_ipng_stats_module);
if (imcf == NULL || imcf->shm_zone == NULL) {
return NGX_OK;
}
w->log = cycle->log;
w->nalloc = 256;
w->nslots = 0;
w->dirty_head = NULL;
w->slots = ngx_pcalloc(cycle->pool,
w->nalloc * sizeof(ngx_http_ipng_stats_slot_t));
if (w->slots == NULL) {
return NGX_ERROR;
}
arena_bytes = w->nalloc * (imcf->nbuckets + 1) * sizeof(uint64_t);
w->dhist_arena = ngx_pcalloc(cycle->pool, arena_bytes);
w->uhist_arena = ngx_pcalloc(cycle->pool, arena_bytes);
if (w->dhist_arena == NULL || w->uhist_arena == NULL) {
return NGX_ERROR;
}
arena_bytes = w->nalloc * (imcf->nbytebuckets + 1) * sizeof(uint64_t);
w->bin_arena = ngx_pcalloc(cycle->pool, arena_bytes);
w->bout_arena = ngx_pcalloc(cycle->pool, arena_bytes);
if (w->bin_arena == NULL || w->bout_arena == NULL) {
return NGX_ERROR;
}
/* Schedule first flush tick. */
ngx_memzero(&w->flush_ev, sizeof(w->flush_ev));
w->flush_ev.handler = ngx_http_ipng_stats_flush_timer;
w->flush_ev.log = cycle->log;
w->flush_ev.data = w;
w->flush_ev.cancelable = 1;
ngx_add_timer(&w->flush_ev, imcf->flush_interval);
/* Logtail buffer + flush timer (UDP only). */
w->logtail_udp_fd = (ngx_socket_t) -1;
if (imcf->logtail_fmt != NULL) {
w->logtail_buf = ngx_palloc(cycle->pool, imcf->logtail_buf_size);
if (w->logtail_buf == NULL) {
return NGX_ERROR;
}
w->logtail_pos = w->logtail_buf;
w->logtail_end = w->logtail_buf + imcf->logtail_buf_size;
/* Open per-worker UDP socket. */
w->logtail_udp_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (w->logtail_udp_fd == (ngx_socket_t) -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ipng_stats: socket(SOCK_DGRAM) failed");
return NGX_ERROR;
}
ngx_memzero(&w->logtail_flush_ev, sizeof(w->logtail_flush_ev));
w->logtail_flush_ev.handler = ngx_http_ipng_stats_logtail_flush_handler;
w->logtail_flush_ev.log = cycle->log;
w->logtail_flush_ev.data = w;
w->logtail_flush_ev.cancelable = 1;
ngx_add_timer(&w->logtail_flush_ev, imcf->logtail_flush);
}
return NGX_OK;
}
static void
ngx_http_ipng_stats_exit_worker(ngx_cycle_t *cycle)
{
ngx_http_ipng_stats_worker_t *w = &ngx_http_ipng_stats_worker;
ngx_http_ipng_stats_main_conf_t *imcf;
if (w->flush_ev.timer_set) {
ngx_del_timer(&w->flush_ev);
}
/* Flush logtail buffer before the worker exits. */
imcf = ngx_http_cycle_get_module_main_conf(cycle,
ngx_http_ipng_stats_module);
if (imcf != NULL) {
ngx_http_ipng_stats_logtail_flush(w, imcf);
}
if (w->logtail_flush_ev.timer_set) {
ngx_del_timer(&w->logtail_flush_ev);
}
if (w->logtail_udp_fd != (ngx_socket_t) -1) {
close(w->logtail_udp_fd);
w->logtail_udp_fd = (ngx_socket_t) -1;
}
}
static void
ngx_http_ipng_stats_flush_timer(ngx_event_t *ev)
{
ngx_http_ipng_stats_worker_t *w = ev->data;
ngx_http_ipng_stats_main_conf_t *imcf;
imcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
ngx_http_ipng_stats_module);
if (imcf == NULL) {
return;
}
ngx_http_ipng_stats_do_flush(w, imcf, ev->log);
if (!ngx_exiting && !ngx_quit) {
ngx_add_timer(&w->flush_ev, imcf->flush_interval);
}
}
static void
ngx_http_ipng_stats_do_flush(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf, ngx_log_t *log)
{
ngx_http_ipng_stats_shctx_t *sh;
ngx_slab_pool_t *slab;
ngx_http_ipng_stats_slot_t *slot, *next;
ngx_http_ipng_stats_node_t *n;
ngx_rbtree_node_t *rb;
ngx_uint_t i;
ngx_uint_t nbuckets, nbytebuckets;
size_t node_size;
ngx_atomic_uint_t *shared_lanes;
if (imcf->shm_zone == NULL) {
return;
}
slab = (ngx_slab_pool_t *) imcf->shm_zone->shm.addr;
sh = imcf->shm_zone->data;
nbuckets = imcf->nbuckets;
nbytebuckets = imcf->nbytebuckets;
node_size = sizeof(ngx_http_ipng_stats_node_t)
+ 2 * (nbuckets + 1) * sizeof(ngx_atomic_uint_t)
+ 2 * (nbytebuckets + 1) * sizeof(ngx_atomic_uint_t);
ngx_shmtx_lock(&slab->mutex);
(void) ngx_atomic_fetch_add(&sh->flushes_total, 1);
for (slot = w->dirty_head; slot != NULL; slot = next) {
next = slot->dirty_next;
slot->dirty_next = NULL;
slot->dirty = 0;
/* Find or insert the shared-zone node for this hash. */
rb = sh->rbtree.root;
n = NULL;
while (rb != &sh->sentinel) {
if (slot->hash < rb->key) {
rb = rb->left;
continue;
}
if (slot->hash > rb->key) {
rb = rb->right;
continue;
}
/* hash match: verify full key. */
n = (ngx_http_ipng_stats_node_t *) rb;
if (n->source_id == slot->source_id
&& n->vip_id == slot->vip_id
&& n->class == slot->class)
{
break;
}
/* collision — walk right. */
rb = rb->right;
n = NULL;
}
if (n == NULL) {
n = ngx_slab_calloc_locked(slab, node_size);
if (n == NULL) {
(void) ngx_atomic_fetch_add(&sh->zone_full_events, 1);
/* Drop this slot's dirty deltas; they will reaccumulate
* on the next request if the operator resizes the zone. */
slot->requests = 0;
slot->bytes_in = 0;
slot->bytes_out = 0;
slot->duration_sum_ms = 0;
slot->upstream_sum_ms = 0;
for (i = 0; i <= nbuckets; i++) {
slot->dhist[i] = 0;
slot->uhist[i] = 0;
}
for (i = 0; i <= nbytebuckets; i++) {
slot->bin_hist[i] = 0;
slot->bout_hist[i] = 0;
}
continue;
}
n->rbnode.key = slot->hash;
n->source_id = slot->source_id;
n->vip_id = slot->vip_id;
n->class = slot->class;
ngx_rbtree_insert(&sh->rbtree, &n->rbnode);
ngx_queue_insert_tail(&sh->lru, &n->lru);
}
(void) ngx_atomic_fetch_add(&n->requests, slot->requests);
(void) ngx_atomic_fetch_add(&n->bytes_in, slot->bytes_in);
(void) ngx_atomic_fetch_add(&n->bytes_out, slot->bytes_out);
(void) ngx_atomic_fetch_add(&n->duration_sum_ms, slot->duration_sum_ms);
(void) ngx_atomic_fetch_add(&n->upstream_sum_ms, slot->upstream_sum_ms);
shared_lanes = (ngx_atomic_uint_t *) (n + 1);
for (i = 0; i <= nbuckets; i++) {
if (slot->dhist[i]) {
(void) ngx_atomic_fetch_add(&shared_lanes[i], slot->dhist[i]);
}
if (slot->uhist[i]) {
(void) ngx_atomic_fetch_add(
&shared_lanes[nbuckets + 1 + i], slot->uhist[i]);
}
}
{
ngx_atomic_uint_t *bin_lanes = shared_lanes + 2 * (nbuckets + 1);
ngx_atomic_uint_t *bout_lanes = bin_lanes + (nbytebuckets + 1);
for (i = 0; i <= nbytebuckets; i++) {
if (slot->bin_hist[i]) {
(void) ngx_atomic_fetch_add(&bin_lanes[i],
slot->bin_hist[i]);
}
if (slot->bout_hist[i]) {
(void) ngx_atomic_fetch_add(&bout_lanes[i],
slot->bout_hist[i]);
}
}
}
/* Clear local deltas. */
slot->requests = 0;
slot->bytes_in = 0;
slot->bytes_out = 0;
slot->duration_sum_ms = 0;
slot->upstream_sum_ms = 0;
for (i = 0; i <= nbuckets; i++) {
slot->dhist[i] = 0;
slot->uhist[i] = 0;
}
for (i = 0; i <= nbytebuckets; i++) {
slot->bin_hist[i] = 0;
slot->bout_hist[i] = 0;
}
ngx_queue_remove(&n->lru);
ngx_queue_insert_tail(&sh->lru, &n->lru);
}
w->dirty_head = NULL;
ngx_shmtx_unlock(&slab->mutex);
}
/* ----------------------------------------------------------------- */
/* Log-phase handler */
/* ----------------------------------------------------------------- */
static ngx_uint_t
ngx_http_ipng_stats_status_index(ngx_uint_t code)
{
if (code < NGX_HTTP_IPNG_STATS_CODE_MIN
|| code > NGX_HTTP_IPNG_STATS_CODE_MAX)
{
return NGX_HTTP_IPNG_STATS_CLASS_UNKNOWN;
}
return code / 100; /* 1..5 */
}
static const char *
ngx_http_ipng_stats_class_label(ngx_uint_t class)
{
static const char *labels[NGX_HTTP_IPNG_STATS_NCLASSES] = {
"unknown", "1xx", "2xx", "3xx", "4xx", "5xx",
};
if (class >= NGX_HTTP_IPNG_STATS_NCLASSES) return "unknown";
return labels[class];
}
static ngx_uint_t
ngx_http_ipng_stats_bucket_index(ngx_uint_t ms, ngx_uint_t *bounds,
ngx_uint_t nbuckets)
{
/* Binary search for the smallest bound >= ms. Return nbuckets for
* values exceeding the final bound (the implicit +Inf bucket). */
ngx_uint_t lo = 0, hi = nbuckets;
while (lo < hi) {
ngx_uint_t mid = (lo + hi) >> 1;
if (ms <= bounds[mid]) {
hi = mid;
} else {
lo = mid + 1;
}
}
return lo;
}
static ngx_int_t
ngx_http_ipng_stats_resolve_source(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf, ngx_str_t *source_out)
{
ngx_http_ipng_stats_binding_t *b;
ngx_uint_t i;
if (imcf->bindings != NULL) {
b = imcf->bindings->elts;
for (i = 0; i < imcf->bindings->nelts; i++) {
if (b[i].listening == r->connection->listening) {
*source_out = b[i].source;
return NGX_OK;
}
}
}
*source_out = imcf->default_source;
return NGX_OK;
}
static ngx_int_t
ngx_http_ipng_stats_canonical_vip(ngx_http_request_t *r, u_char *buf,
size_t buflen, ngx_str_t *vip_out)
{
size_t n;
if (r->connection->local_sockaddr == NULL) {
return NGX_ERROR;
}
/* ngx_sock_ntop(NGX_SOCKADDR_STRLEN) drops port and renders v6 in
* RFC 5952 form. Scope-ids are dropped as part of the format, which
* is what FR-2.5 asks for. */
n = ngx_sock_ntop(r->connection->local_sockaddr,
r->connection->local_socklen,
buf, buflen, 0);
if (n == 0) {
return NGX_ERROR;
}
vip_out->data = buf;
vip_out->len = n;
return NGX_OK;
}
/* Find or allocate a per-worker slot for this key; caller fills in
* deltas. The per-worker table is a flat array with linear scan over
* already-used slots — fine because `nslots` is small. */
static ngx_http_ipng_stats_slot_t *
ngx_http_ipng_stats_worker_slot(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_uint_t source_id, ngx_uint_t vip_id, ngx_uint_t class)
{
ngx_http_ipng_stats_slot_t *s;
ngx_uint_t i, hash;
hash = (source_id * 2654435761u)
^ (vip_id * 40503u)
^ (class * 131071u);
for (i = 0; i < w->nslots; i++) {
s = &w->slots[i];
if (s->hash == hash && s->source_id == source_id
&& s->vip_id == vip_id && s->class == class)
{
return s;
}
}
if (w->nslots >= w->nalloc) {
/* Over the per-worker cap. Drop. The flush will see zero
* dirty slots for this key and the shared zone will not
* learn of it until capacity frees up. */
return NULL;
}
s = &w->slots[w->nslots];
s->hash = hash;
s->source_id = source_id;
s->vip_id = vip_id;
s->class = class;
s->dhist = &w->dhist_arena[w->nslots * (imcf->nbuckets + 1)];
s->uhist = &w->uhist_arena[w->nslots * (imcf->nbuckets + 1)];
s->bin_hist = &w->bin_arena [w->nslots * (imcf->nbytebuckets + 1)];
s->bout_hist = &w->bout_arena[w->nslots * (imcf->nbytebuckets + 1)];
w->nslots++;
return s;
}
static void
ngx_http_ipng_stats_mark_dirty(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_slot_t *s)
{
if (s->dirty) {
return;
}
s->dirty = 1;
s->dirty_next = w->dirty_head;
w->dirty_head = s;
}
static ngx_int_t
ngx_http_ipng_stats_log_handler(ngx_http_request_t *r)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_http_ipng_stats_loc_conf_t *ilcf;
ngx_http_ipng_stats_worker_t *w = &ngx_http_ipng_stats_worker;
ngx_http_ipng_stats_shctx_t *sh;
ngx_slab_pool_t *slab;
ngx_http_ipng_stats_slot_t *slot;
ngx_str_t source, vip;
u_char vipbuf[NGX_SOCKADDR_STRLEN];
ngx_uint_t source_id, vip_id, class;
ngx_uint_t bucket;
uint64_t bin_sz, bout_sz;
ngx_msec_int_t elapsed_ms;
ngx_time_t *tp;
imcf = ngx_http_get_module_main_conf(r, ngx_http_ipng_stats_module);
if (imcf == NULL || imcf->shm_zone == NULL || !imcf->enabled) {
return NGX_OK;
}
ilcf = ngx_http_get_module_loc_conf(r, ngx_http_ipng_stats_module);
if (ilcf == NULL || !ilcf->enabled) {
return NGX_OK;
}
if (ngx_http_ipng_stats_resolve_source(r, imcf, &source) != NGX_OK) {
return NGX_OK;
}
/* For wildcard listeners (0.0.0.0 / ::), local_sockaddr initially
* holds the bind address, not the real VIP. Force a getsockname()
* so we get the actual destination address the client connected to. */
if (ngx_connection_local_sockaddr(r->connection, NULL, 0) != NGX_OK) {
return NGX_OK;
}
if (ngx_http_ipng_stats_canonical_vip(r, vipbuf, sizeof(vipbuf), &vip)
!= NGX_OK)
{
return NGX_OK;
}
class = ngx_http_ipng_stats_status_index(r->headers_out.status);
/* Intern source and vip in the shared zone. This is the only
* shared-zone write outside of flush — it runs rarely (once per
* new (source, vip) pair, ever) and takes the slab mutex. */
slab = (ngx_slab_pool_t *) imcf->shm_zone->shm.addr;
sh = imcf->shm_zone->data;
ngx_shmtx_lock(&slab->mutex);
if (ngx_http_ipng_stats_intern_shared(sh, slab, &sh->sources, &source,
&source_id) != NGX_OK
|| ngx_http_ipng_stats_intern_shared(sh, slab, &sh->vips, &vip,
&vip_id) != NGX_OK)
{
ngx_shmtx_unlock(&slab->mutex);
return NGX_OK;
}
ngx_shmtx_unlock(&slab->mutex);
slot = ngx_http_ipng_stats_worker_slot(w, imcf, source_id, vip_id, class);
if (slot == NULL) {
return NGX_OK;
}
bin_sz = r->request_length > 0 ? (uint64_t) r->request_length : 0;
bout_sz = (uint64_t) r->connection->sent;
slot->requests += 1;
slot->bytes_in += bin_sz;
slot->bytes_out += bout_sz;
bucket = ngx_http_ipng_stats_bucket_index((ngx_uint_t) bin_sz,
imcf->byte_bucket_bounds,
imcf->nbytebuckets);
slot->bin_hist[bucket] += 1;
bucket = ngx_http_ipng_stats_bucket_index((ngx_uint_t) bout_sz,
imcf->byte_bucket_bounds,
imcf->nbytebuckets);
slot->bout_hist[bucket] += 1;
/* Use the same formula nginx uses for $request_time: two-field
* subtraction via ngx_timeofday(), which is the canonical way to
* compute elapsed time in nginx's cached-time model. */
tp = ngx_timeofday();
elapsed_ms = (ngx_msec_int_t)
((tp->sec - r->start_sec) * 1000 + (tp->msec - r->start_msec));
if (elapsed_ms < 0) elapsed_ms = 0;
slot->duration_sum_ms += (uint64_t) elapsed_ms;
bucket = ngx_http_ipng_stats_bucket_index((ngx_uint_t) elapsed_ms,
imcf->bucket_bounds_ms,
imcf->nbuckets);
slot->dhist[bucket] += 1;
if (r->upstream_states != NULL && r->upstream_states->nelts > 0) {
ngx_http_upstream_state_t *us = r->upstream_states->elts;
ngx_msec_int_t up_ms = (ngx_msec_int_t) us[0].response_time;
if (up_ms > 0) {
slot->upstream_sum_ms += (uint64_t) up_ms;
bucket = ngx_http_ipng_stats_bucket_index((ngx_uint_t) up_ms,
imcf->bucket_bounds_ms,
imcf->nbuckets);
slot->uhist[bucket] += 1;
}
}
ngx_http_ipng_stats_mark_dirty(w, slot);
/* Global logtail — runs for every request, uses compiled log_format
* ops, writes to a per-worker buffer. */
if (imcf->logtail_fmt != NULL) {
ngx_http_ipng_stats_logtail_write(r, imcf, w);
}
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* String interning (called under slab mutex) */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_intern_shared(ngx_http_ipng_stats_shctx_t *sh,
ngx_slab_pool_t *slab, ngx_http_ipng_stats_intern_t *t, ngx_str_t *s,
ngx_uint_t *idx_out)
{
ngx_uint_t i;
u_char *copy;
ngx_str_t *ne;
for (i = 0; i < t->nelts; i++) {
if (t->entries[i].len == s->len
&& ngx_memcmp(t->entries[i].data, s->data, s->len) == 0)
{
*idx_out = i;
return NGX_OK;
}
}
if (t->nelts >= t->nalloc) {
/* Grow by 2x. */
ngx_uint_t new_cap = t->nalloc * 2;
ngx_str_t *ne_arr = ngx_slab_alloc_locked(slab,
new_cap * sizeof(ngx_str_t));
if (ne_arr == NULL) {
(void) ngx_atomic_fetch_add(&sh->zone_full_events, 1);
return NGX_ERROR;
}
ngx_memcpy(ne_arr, t->entries, t->nelts * sizeof(ngx_str_t));
ngx_slab_free_locked(slab, t->entries);
t->entries = ne_arr;
t->nalloc = new_cap;
}
copy = ngx_slab_alloc_locked(slab, s->len);
if (copy == NULL) {
(void) ngx_atomic_fetch_add(&sh->zone_full_events, 1);
return NGX_ERROR;
}
ngx_memcpy(copy, s->data, s->len);
ne = &t->entries[t->nelts];
ne->data = copy;
ne->len = s->len;
*idx_out = t->nelts;
t->nelts++;
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* Global logtail: write + flush */
/* ----------------------------------------------------------------- */
static void
ngx_http_ipng_stats_logtail_write(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_http_ipng_stats_worker_t *w)
{
ipng_log_op_t *ops;
ngx_uint_t i, nops;
size_t line_len;
u_char *p;
if (imcf->logtail_fmt == NULL || w->logtail_buf == NULL) {
return;
}
/* if=$variable: skip this request when the variable is empty or "0". */
if (imcf->logtail_if_index) {
ngx_http_variable_value_t *val;
val = ngx_http_get_indexed_variable(r, imcf->logtail_if_index - 1);
if (val == NULL || val->not_found
|| val->len == 0
|| (val->len == 1 && val->data[0] == '0'))
{
return;
}
}
ops = imcf->logtail_fmt->ops->elts;
nops = imcf->logtail_fmt->ops->nelts;
/* Compute line length. */
line_len = 1; /* trailing newline */
for (i = 0; i < nops; i++) {
if (ops[i].len == 0) {
line_len += ops[i].getlen(r, ops[i].data);
} else {
line_len += ops[i].len;
}
}
/* Flush if the line won't fit. */
if ((size_t)(w->logtail_end - w->logtail_pos) < line_len) {
ngx_http_ipng_stats_logtail_flush(w, imcf);
}
/* If it STILL doesn't fit (single line > buffer), write directly. */
if ((size_t)(w->logtail_end - w->logtail_pos) < line_len) {
u_char *tmp = ngx_pnalloc(r->pool, line_len);
if (tmp == NULL) {
return;
}
p = tmp;
for (i = 0; i < nops; i++) {
p = ops[i].run(r, p, &ops[i]);
}
*p++ = '\n';
if (w->logtail_udp_fd != (ngx_socket_t) -1) {
(void) sendto(w->logtail_udp_fd, tmp, p - tmp, 0,
(struct sockaddr *) &imcf->logtail_udp_addr,
sizeof(struct sockaddr_in));
}
return;
}
/* Append to buffer. */
p = w->logtail_pos;
for (i = 0; i < nops; i++) {
p = ops[i].run(r, p, &ops[i]);
}
*p++ = '\n';
w->logtail_pos = p;
}
static void
ngx_http_ipng_stats_logtail_flush(ngx_http_ipng_stats_worker_t *w,
ngx_http_ipng_stats_main_conf_t *imcf)
{
size_t n;
if (w->logtail_buf == NULL || w->logtail_pos == w->logtail_buf) {
return;
}
n = w->logtail_pos - w->logtail_buf;
/* UDP mode: fire-and-forget sendto. If nobody is listening,
* the kernel silently drops the datagram. */
if (w->logtail_udp_fd != (ngx_socket_t) -1) {
(void) sendto(w->logtail_udp_fd, w->logtail_buf, n, 0,
(struct sockaddr *) &imcf->logtail_udp_addr,
sizeof(struct sockaddr_in));
}
w->logtail_pos = w->logtail_buf;
}
static void
ngx_http_ipng_stats_logtail_flush_handler(ngx_event_t *ev)
{
ngx_http_ipng_stats_worker_t *w = ev->data;
ngx_http_ipng_stats_main_conf_t *imcf;
imcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
ngx_http_ipng_stats_module);
if (imcf != NULL) {
ngx_http_ipng_stats_logtail_flush(w, imcf);
}
if (!ngx_exiting && !ngx_quit) {
ngx_add_timer(&w->logtail_flush_ev,
imcf ? imcf->logtail_flush : 1000);
}
}
/* ----------------------------------------------------------------- */
/* Nginx variables: $ipng_source_tag */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_source_variable(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_str_t source;
(void) data;
imcf = ngx_http_get_module_main_conf(r, ngx_http_ipng_stats_module);
if (imcf == NULL) {
v->not_found = 1;
return NGX_OK;
}
if (ngx_http_ipng_stats_resolve_source(r, imcf, &source) != NGX_OK) {
v->not_found = 1;
return NGX_OK;
}
v->len = source.len;
v->valid = 1;
v->no_cacheable = 0;
v->not_found = 0;
v->data = source.data;
return NGX_OK;
}
/* ----------------------------------------------------------------- */
/* Scrape content handler */
/* ----------------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_parse_filters(ngx_http_request_t *r, ngx_str_t *src,
ngx_str_t *vip)
{
ngx_str_t key, val;
u_char *p, *last;
ngx_str_null(src);
ngx_str_null(vip);
if (r->args.len == 0) {
return NGX_OK;
}
p = r->args.data;
last = r->args.data + r->args.len;
while (p < last) {
key.data = p;
while (p < last && *p != '=' && *p != '&') p++;
key.len = p - key.data;
if (p < last && *p == '=') {
p++;
val.data = p;
while (p < last && *p != '&') p++;
val.len = p - val.data;
} else {
ngx_str_null(&val);
}
if (p < last && *p == '&') p++;
if (key.len == 10 && ngx_strncmp(key.data, "source_tag", 10) == 0) {
*src = val;
} else if (key.len == 3 && ngx_strncmp(key.data, "vip", 3) == 0) {
*vip = val;
}
}
return NGX_OK;
}
static ngx_int_t
ngx_http_ipng_stats_want_json(ngx_http_request_t *r)
{
ngx_table_elt_t *h;
ngx_list_part_t *part;
ngx_uint_t i;
part = &r->headers_in.headers.part;
h = part->elts;
for (i = 0; /* void */; i++) {
if (i >= part->nelts) {
if (part->next == NULL) break;
part = part->next;
h = part->elts;
i = 0;
}
if (h[i].key.len == 6
&& ngx_strncasecmp(h[i].key.data, (u_char *) "accept", 6) == 0)
{
if (ngx_strlcasestrn(h[i].value.data,
h[i].value.data + h[i].value.len,
(u_char *) "application/json", 16 - 1)
!= NULL)
{
return 1;
}
return 0;
}
}
return 0;
}
static ngx_int_t
ngx_http_ipng_stats_content_handler(ngx_http_request_t *r)
{
ngx_http_ipng_stats_main_conf_t *imcf;
ngx_str_t filter_src, filter_vip;
ngx_int_t rc;
if (r->method != NGX_HTTP_GET && r->method != NGX_HTTP_HEAD) {
return NGX_HTTP_NOT_ALLOWED;
}
rc = ngx_http_discard_request_body(r);
if (rc != NGX_OK) {
return rc;
}
imcf = ngx_http_get_module_main_conf(r, ngx_http_ipng_stats_module);
if (imcf == NULL || imcf->shm_zone == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_ipng_stats_parse_filters(r, &filter_src, &filter_vip);
if (ngx_http_ipng_stats_want_json(r)) {
return ngx_http_ipng_stats_render_json(r, imcf,
&filter_src, &filter_vip);
}
return ngx_http_ipng_stats_render_prom(r, imcf,
&filter_src, &filter_vip);
}
/* Scrape rendering.
*
* Cardinality model (post-schema-2): counters are keyed by
* (source_tag, vip, code-class), where code-class is one of
* "1xx".."5xx" or "unknown" — six values rather than ~60 full codes.
* Histograms drop the code label entirely and aggregate across classes
* per (source_tag, vip). Operators who need a full per-code breakdown
* should enable ipng_stats_logtail (FR-8) and process the access-log
* stream off the hot path.
*
* The renderers below walk the shared-zone LRU once under the slab
* mutex, emit per-node counters inline, and accumulate histograms into
* a per-(source, vip) aggregation table allocated from r->pool. The
* aggregation table is then drained to produce one histogram group per
* (source, vip). */
typedef struct {
ngx_uint_t source_id;
ngx_uint_t vip_id;
ngx_uint_t used;
uint64_t duration_sum_ms; /* histogram _sum for request_duration */
uint64_t upstream_sum_ms;
uint64_t bytes_in_sum; /* histogram _sum for bytes_in */
uint64_t bytes_out_sum;
uint64_t req_total; /* total requests for this (source, vip) */
uint64_t up_total; /* upstream observations */
uint64_t *dhist; /* nbuckets+1 */
uint64_t *uhist;
uint64_t *bin_hist; /* nbytebuckets+1 */
uint64_t *bout_hist;
} ngx_http_ipng_stats_agg_t;
static ngx_chain_t *
ngx_http_ipng_stats_chain_buf(ngx_http_request_t *r, size_t size)
{
ngx_buf_t *b;
ngx_chain_t *cl;
b = ngx_create_temp_buf(r->pool, size);
if (b == NULL) return NULL;
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) return NULL;
cl->buf = b;
cl->next = NULL;
return cl;
}
static ngx_int_t
ngx_http_ipng_stats_send(ngx_http_request_t *r, ngx_str_t *ctype,
ngx_chain_t *out)
{
ngx_int_t rc;
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_type = *ctype;
r->headers_out.content_type_len = ctype->len;
r->headers_out.content_length_n = -1;
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->header_only) {
return rc;
}
if (out == NULL) {
return ngx_http_send_special(r, NGX_HTTP_LAST);
}
ngx_chain_t *last = out;
while (last->next) last = last->next;
last->buf->last_buf = 1;
last->buf->last_in_chain = 1;
return ngx_http_output_filter(r, out);
}
static ngx_int_t
ngx_http_ipng_stats_append(ngx_chain_t ***last, ngx_chain_t *cl)
{
if (cl == NULL) return NGX_ERROR;
**last = cl;
*last = &cl->next;
return NGX_OK;
}
/* Per-node snapshot used by the Prometheus and JSON renderers. The
* renderers drain the shared-zone LRU into an array of these under
* the slab mutex, and then release the mutex before doing any output
* buffer allocation. That keeps ngx_pcalloc / malloc off the locked
* path — a worker crash during rendering can never leave the slab
* mutex held (NFR-4.3), and slab-lock contention no longer blocks on
* glibc's allocator. */
typedef struct {
ngx_uint_t source_id;
ngx_uint_t vip_id;
ngx_uint_t class;
uint64_t requests;
uint64_t bytes_in;
uint64_t bytes_out;
uint64_t duration_sum_ms;
uint64_t upstream_sum_ms;
} ngx_http_ipng_stats_snap_t;
/* Sanity cap on shared-zone cardinality, above which the renderer
* refuses to snapshot rather than try to allocate a potentially
* runaway buffer. This is a belt-and-suspenders guard against a
* corrupt `sh->sources.nelts` or `sh->vips.nelts` — see design.md
* on reload hardening. */
#define NGX_HTTP_IPNG_STATS_MAX_INTERN 10000
/* Find or create an aggregation entry for (source_id, vip_id). Linear
* scan — the table is small in practice (< 100 entries). */
static ngx_http_ipng_stats_agg_t *
ngx_http_ipng_stats_agg_get(ngx_http_ipng_stats_agg_t *aggs, ngx_uint_t *naggs,
ngx_uint_t naggs_alloc, ngx_uint_t source_id, ngx_uint_t vip_id)
{
ngx_uint_t i;
for (i = 0; i < *naggs; i++) {
if (aggs[i].source_id == source_id && aggs[i].vip_id == vip_id) {
return &aggs[i];
}
}
if (*naggs >= naggs_alloc) return NULL;
aggs[*naggs].source_id = source_id;
aggs[*naggs].vip_id = vip_id;
aggs[*naggs].used = 1;
return &aggs[(*naggs)++];
}
/* Deep-copy the sources and vips interning tables into r->pool so the
* renderers can dereference strings without holding the slab mutex.
* Caller holds the slab mutex. Writes the number of valid entries
* (possibly smaller than the allocated length) via *n_src_out / *n_vip_out. */
static ngx_int_t
ngx_http_ipng_stats_snapshot_strings(ngx_http_request_t *r,
ngx_http_ipng_stats_shctx_t *sh,
ngx_str_t *src_tbl, ngx_uint_t n_src_alloc, ngx_uint_t *n_src_out,
ngx_str_t *vip_tbl, ngx_uint_t n_vip_alloc, ngx_uint_t *n_vip_out)
{
ngx_uint_t i, n;
u_char *d;
n = sh->sources.nelts < n_src_alloc ? sh->sources.nelts : n_src_alloc;
for (i = 0; i < n; i++) {
if (sh->sources.entries[i].len > 0) {
d = ngx_pnalloc(r->pool, sh->sources.entries[i].len);
if (d == NULL) return NGX_ERROR;
ngx_memcpy(d, sh->sources.entries[i].data,
sh->sources.entries[i].len);
src_tbl[i].data = d;
src_tbl[i].len = sh->sources.entries[i].len;
}
}
*n_src_out = n;
n = sh->vips.nelts < n_vip_alloc ? sh->vips.nelts : n_vip_alloc;
for (i = 0; i < n; i++) {
if (sh->vips.entries[i].len > 0) {
d = ngx_pnalloc(r->pool, sh->vips.entries[i].len);
if (d == NULL) return NGX_ERROR;
ngx_memcpy(d, sh->vips.entries[i].data,
sh->vips.entries[i].len);
vip_tbl[i].data = d;
vip_tbl[i].len = sh->vips.entries[i].len;
}
}
*n_vip_out = n;
return NGX_OK;
}
/* Allocate the aggregation table plus its lane arena from r->pool. */
static ngx_int_t
ngx_http_ipng_stats_agg_alloc(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf, ngx_uint_t n,
ngx_http_ipng_stats_agg_t **aggs_out, ngx_uint_t *nalloc_out)
{
ngx_uint_t nb = imcf->nbuckets;
ngx_uint_t nbb = imcf->nbytebuckets;
size_t lanes_per = 2 * (nb + 1) + 2 * (nbb + 1);
uint64_t *arena;
ngx_http_ipng_stats_agg_t *aggs;
ngx_uint_t i;
if (n == 0) n = 1;
aggs = ngx_pcalloc(r->pool, n * sizeof(*aggs));
arena = ngx_pcalloc(r->pool, n * lanes_per * sizeof(uint64_t));
if (aggs == NULL || arena == NULL) return NGX_ERROR;
for (i = 0; i < n; i++) {
uint64_t *base = &arena[i * lanes_per];
aggs[i].dhist = base;
aggs[i].uhist = base + (nb + 1);
aggs[i].bin_hist = base + 2 * (nb + 1);
aggs[i].bout_hist = base + 2 * (nb + 1) + (nbb + 1);
}
*aggs_out = aggs;
*nalloc_out = n;
return NGX_OK;
}
/* Walk the LRU under the slab mutex, draining matching nodes into the
* caller-supplied `snaps` array and aggregating histogram lanes into
* `aggs`. All output buffers are pre-allocated by the caller — this
* function does not touch r->pool, so a crash here cannot leave the
* slab mutex held via a glibc abort.
*
* `src_tbl` / `vip_tbl` hold the deep-copied interning strings and are
* used only for filter comparisons; the renderer downstream indexes
* them again. Nodes whose ids fall outside the snapshotted range are
* skipped (a belt-and-suspenders guard against a stale id after a
* concurrent reload). */
static void
ngx_http_ipng_stats_snapshot_nodes(ngx_http_ipng_stats_shctx_t *sh,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_src, ngx_str_t *filter_vip,
ngx_str_t *src_tbl, ngx_uint_t n_src,
ngx_str_t *vip_tbl, ngx_uint_t n_vip,
ngx_http_ipng_stats_snap_t *snaps, ngx_uint_t nsnaps_alloc,
ngx_uint_t *nsnaps_out,
ngx_http_ipng_stats_agg_t *aggs, ngx_uint_t naggs_alloc,
ngx_uint_t *naggs_out)
{
ngx_queue_t *q;
ngx_http_ipng_stats_node_t *n;
ngx_str_t *src_entry, *vip_entry;
ngx_atomic_uint_t *lanes, *blanes;
ngx_http_ipng_stats_agg_t *a;
ngx_uint_t i;
ngx_uint_t nb = imcf->nbuckets;
ngx_uint_t nbb = imcf->nbytebuckets;
ngx_uint_t nsnaps = 0, naggs = 0;
for (q = ngx_queue_head(&sh->lru);
q != ngx_queue_sentinel(&sh->lru);
q = ngx_queue_next(q))
{
n = ngx_queue_data(q, ngx_http_ipng_stats_node_t, lru);
if (n->source_id >= n_src || n->vip_id >= n_vip) continue;
src_entry = &src_tbl[n->source_id];
vip_entry = &vip_tbl[n->vip_id];
if (filter_src->len > 0
&& (src_entry->len != filter_src->len
|| ngx_memcmp(src_entry->data, filter_src->data,
filter_src->len) != 0))
{
continue;
}
if (filter_vip->len > 0
&& (vip_entry->len != filter_vip->len
|| ngx_memcmp(vip_entry->data, filter_vip->data,
filter_vip->len) != 0))
{
continue;
}
if (nsnaps < nsnaps_alloc) {
snaps[nsnaps].source_id = n->source_id;
snaps[nsnaps].vip_id = n->vip_id;
snaps[nsnaps].class = n->class;
snaps[nsnaps].requests = n->requests;
snaps[nsnaps].bytes_in = n->bytes_in;
snaps[nsnaps].bytes_out = n->bytes_out;
snaps[nsnaps].duration_sum_ms = n->duration_sum_ms;
snaps[nsnaps].upstream_sum_ms = n->upstream_sum_ms;
nsnaps++;
}
a = ngx_http_ipng_stats_agg_get(aggs, &naggs, naggs_alloc,
n->source_id, n->vip_id);
if (a == NULL) continue;
a->duration_sum_ms += n->duration_sum_ms;
a->upstream_sum_ms += n->upstream_sum_ms;
a->bytes_in_sum += n->bytes_in;
a->bytes_out_sum += n->bytes_out;
a->req_total += n->requests;
lanes = (ngx_atomic_uint_t *) (n + 1);
blanes = lanes + 2 * (nb + 1);
for (i = 0; i <= nb; i++) {
a->dhist[i] += lanes[i];
a->uhist[i] += lanes[nb + 1 + i];
a->up_total += lanes[nb + 1 + i];
}
for (i = 0; i <= nbb; i++) {
a->bin_hist[i] += blanes[i];
a->bout_hist[i] += blanes[nbb + 1 + i];
}
}
*nsnaps_out = nsnaps;
*naggs_out = naggs;
}
/* -- Prometheus ---------------------------------------------------- */
static u_char *
ngx_http_ipng_stats_render_hist(u_char *p, const char *metric,
ngx_str_t *src, ngx_str_t *vip, ngx_uint_t *bounds, ngx_uint_t nb,
uint64_t *lanes, double sum_units, int is_seconds)
{
ngx_uint_t i;
uint64_t cum = 0;
for (i = 0; i < nb; i++) {
cum += lanes[i];
if (is_seconds) {
p = ngx_sprintf(p,
"%s_bucket{source_tag=\"%V\",vip=\"%V\",le=\"%.3f\"} %uL\n",
metric, src, vip,
(double) bounds[i] / 1000.0, cum);
} else {
p = ngx_sprintf(p,
"%s_bucket{source_tag=\"%V\",vip=\"%V\",le=\"%ui\"} %uL\n",
metric, src, vip, bounds[i], cum);
}
}
cum += lanes[nb];
p = ngx_sprintf(p,
"%s_bucket{source_tag=\"%V\",vip=\"%V\",le=\"+Inf\"} %uL\n"
"%s_sum{source_tag=\"%V\",vip=\"%V\"} %.3f\n"
"%s_count{source_tag=\"%V\",vip=\"%V\"} %uL\n",
metric, src, vip, cum,
metric, src, vip, sum_units,
metric, src, vip, cum);
return p;
}
static ngx_int_t
ngx_http_ipng_stats_render_prom(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_source, ngx_str_t *filter_vip)
{
ngx_http_ipng_stats_shctx_t *sh = imcf->shm_zone->data;
ngx_slab_pool_t *slab;
ngx_chain_t *out = NULL, *cl;
ngx_chain_t **last = &out;
ngx_str_t ctype = ngx_string("text/plain; version=0.0.4");
ngx_http_ipng_stats_agg_t *aggs;
ngx_http_ipng_stats_snap_t *snaps;
ngx_str_t *src_tbl = NULL, *vip_tbl = NULL;
ngx_uint_t n_src_alloc, n_vip_alloc;
ngx_uint_t n_src = 0, n_vip = 0;
ngx_uint_t naggs = 0, naggs_alloc;
ngx_uint_t nsnaps = 0, nsnaps_alloc;
ngx_uint_t i;
ngx_uint_t nb = imcf->nbuckets;
ngx_uint_t nbb = imcf->nbytebuckets;
size_t hist_sz;
slab = (ngx_slab_pool_t *) imcf->shm_zone->shm.addr;
cl = ngx_http_ipng_stats_chain_buf(r, 1536);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
cl->buf->last = ngx_sprintf(cl->buf->last,
"# nginx-ipng-stats-plugin %s (schema=%d)\n"
"# HELP nginx_ipng_requests_total Total HTTP requests.\n"
"# TYPE nginx_ipng_requests_total counter\n"
"# HELP nginx_ipng_bytes_in_total Request bytes received.\n"
"# TYPE nginx_ipng_bytes_in_total counter\n"
"# HELP nginx_ipng_bytes_out_total Response bytes sent.\n"
"# TYPE nginx_ipng_bytes_out_total counter\n"
"# HELP nginx_ipng_latency_total Sum of request durations in seconds.\n"
"# TYPE nginx_ipng_latency_total counter\n"
"# HELP nginx_ipng_request_duration_seconds Request duration histogram.\n"
"# TYPE nginx_ipng_request_duration_seconds histogram\n"
"# HELP nginx_ipng_upstream_response_seconds Upstream response-time histogram.\n"
"# TYPE nginx_ipng_upstream_response_seconds histogram\n"
"# HELP nginx_ipng_bytes_in Request size histogram in bytes.\n"
"# TYPE nginx_ipng_bytes_in histogram\n"
"# HELP nginx_ipng_bytes_out Request size histogram in bytes.\n"
"# TYPE nginx_ipng_bytes_out histogram\n",
NGX_HTTP_IPNG_STATS_VERSION, NGX_HTTP_IPNG_STATS_SCHEMA_VERSION);
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
/* Read cardinality with a brief lock. We release before allocating
* the snapshot buffers so glibc's allocator is never entered with
* the slab mutex held — a worker crash during allocation cannot
* leave the shared-memory zone locked. */
ngx_shmtx_lock(&slab->mutex);
n_src_alloc = sh->sources.nelts;
n_vip_alloc = sh->vips.nelts;
ngx_shmtx_unlock(&slab->mutex);
if (n_src_alloc > NGX_HTTP_IPNG_STATS_MAX_INTERN
|| n_vip_alloc > NGX_HTTP_IPNG_STATS_MAX_INTERN)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"ipng_stats: render: refusing to render, cardinality "
"out of range (sources=%ui, vips=%ui)",
n_src_alloc, n_vip_alloc);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
naggs_alloc = n_src_alloc * n_vip_alloc;
nsnaps_alloc = naggs_alloc * NGX_HTTP_IPNG_STATS_NCLASSES;
if (naggs_alloc == 0) naggs_alloc = 1;
if (nsnaps_alloc == 0) nsnaps_alloc = 1;
if (ngx_http_ipng_stats_agg_alloc(r, imcf, naggs_alloc,
&aggs, &naggs_alloc) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
snaps = ngx_pcalloc(r->pool, nsnaps_alloc * sizeof(*snaps));
if (n_src_alloc > 0) {
src_tbl = ngx_pcalloc(r->pool, n_src_alloc * sizeof(ngx_str_t));
if (src_tbl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (n_vip_alloc > 0) {
vip_tbl = ngx_pcalloc(r->pool, n_vip_alloc * sizeof(ngx_str_t));
if (vip_tbl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (snaps == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
ngx_shmtx_lock(&slab->mutex);
if (ngx_http_ipng_stats_snapshot_strings(r, sh,
src_tbl, n_src_alloc, &n_src,
vip_tbl, n_vip_alloc, &n_vip) != NGX_OK)
{
ngx_shmtx_unlock(&slab->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_ipng_stats_snapshot_nodes(sh, imcf,
filter_source, filter_vip,
src_tbl, n_src, vip_tbl, n_vip,
snaps, nsnaps_alloc, &nsnaps,
aggs, naggs_alloc, &naggs);
ngx_shmtx_unlock(&slab->mutex);
/* Per-node counters. */
for (i = 0; i < nsnaps; i++) {
ngx_http_ipng_stats_snap_t *s = &snaps[i];
ngx_str_t *src = &src_tbl[s->source_id];
ngx_str_t *vip = &vip_tbl[s->vip_id];
const char *cls = ngx_http_ipng_stats_class_label(s->class);
cl = ngx_http_ipng_stats_chain_buf(r, 1024);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
cl->buf->last = ngx_sprintf(cl->buf->last,
"nginx_ipng_requests_total{source_tag=\"%V\",vip=\"%V\",code=\"%s\"} %uL\n"
"nginx_ipng_bytes_in_total{source_tag=\"%V\",vip=\"%V\",code=\"%s\"} %uL\n"
"nginx_ipng_bytes_out_total{source_tag=\"%V\",vip=\"%V\",code=\"%s\"} %uL\n"
"nginx_ipng_latency_total{source_tag=\"%V\",vip=\"%V\",code=\"%s\"} %.3f\n",
src, vip, cls, s->requests,
src, vip, cls, s->bytes_in,
src, vip, cls, s->bytes_out,
src, vip, cls, (double) s->duration_sum_ms / 1000.0);
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
/* One chain link per (source, vip) for the four aggregated histograms.
* Size: per-bucket line ~96B, + sum/count/+Inf per metric ~96B each. */
hist_sz = 256 + 96 * (2 * (nb + 1) + 2 * (nbb + 1)) + 4 * 200;
for (i = 0; i < naggs; i++) {
ngx_http_ipng_stats_agg_t *a = &aggs[i];
ngx_str_t *src = &src_tbl[a->source_id];
ngx_str_t *vip = &vip_tbl[a->vip_id];
u_char *p;
cl = ngx_http_ipng_stats_chain_buf(r, hist_sz);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
p = cl->buf->last;
p = ngx_http_ipng_stats_render_hist(p,
"nginx_ipng_request_duration_seconds", src, vip,
imcf->bucket_bounds_ms, nb, a->dhist,
(double) a->duration_sum_ms / 1000.0, 1);
p = ngx_http_ipng_stats_render_hist(p,
"nginx_ipng_upstream_response_seconds", src, vip,
imcf->bucket_bounds_ms, nb, a->uhist,
(double) a->upstream_sum_ms / 1000.0, 1);
p = ngx_http_ipng_stats_render_hist(p,
"nginx_ipng_bytes_in", src, vip,
imcf->byte_bucket_bounds, nbb, a->bin_hist,
(double) a->bytes_in_sum, 0);
p = ngx_http_ipng_stats_render_hist(p,
"nginx_ipng_bytes_out", src, vip,
imcf->byte_bucket_bounds, nbb, a->bout_hist,
(double) a->bytes_out_sum, 0);
cl->buf->last = p;
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
return ngx_http_ipng_stats_send(r, &ctype, out);
}
/* -- JSON ---------------------------------------------------------- */
static ngx_int_t
ngx_http_ipng_stats_render_json(ngx_http_request_t *r,
ngx_http_ipng_stats_main_conf_t *imcf,
ngx_str_t *filter_source, ngx_str_t *filter_vip)
{
ngx_http_ipng_stats_shctx_t *sh = imcf->shm_zone->data;
ngx_slab_pool_t *slab;
ngx_chain_t *out = NULL, *cl;
ngx_chain_t **last = &out;
ngx_str_t ctype = ngx_string("application/json");
ngx_http_ipng_stats_agg_t *aggs;
ngx_http_ipng_stats_snap_t *snaps;
ngx_str_t *src_tbl = NULL, *vip_tbl = NULL;
ngx_uint_t n_src_alloc, n_vip_alloc;
ngx_uint_t n_src = 0, n_vip = 0;
ngx_uint_t nsnaps = 0, nsnaps_alloc;
ngx_uint_t naggs = 0, naggs_alloc;
ngx_uint_t i, j, emitted = 0;
ngx_uint_t nb = imcf->nbuckets;
ngx_uint_t nbb = imcf->nbytebuckets;
ngx_http_ipng_stats_agg_t *a;
size_t rec_sz;
slab = (ngx_slab_pool_t *) imcf->shm_zone->shm.addr;
cl = ngx_http_ipng_stats_chain_buf(r, 64);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
cl->buf->last = ngx_sprintf(cl->buf->last,
"{\"schema\":%d,\"records\":[",
NGX_HTTP_IPNG_STATS_SCHEMA_VERSION);
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_shmtx_lock(&slab->mutex);
n_src_alloc = sh->sources.nelts;
n_vip_alloc = sh->vips.nelts;
ngx_shmtx_unlock(&slab->mutex);
if (n_src_alloc > NGX_HTTP_IPNG_STATS_MAX_INTERN
|| n_vip_alloc > NGX_HTTP_IPNG_STATS_MAX_INTERN)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"ipng_stats: render: refusing to render, cardinality "
"out of range (sources=%ui, vips=%ui)",
n_src_alloc, n_vip_alloc);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
naggs_alloc = n_src_alloc * n_vip_alloc;
nsnaps_alloc = naggs_alloc * NGX_HTTP_IPNG_STATS_NCLASSES;
if (naggs_alloc == 0) naggs_alloc = 1;
if (nsnaps_alloc == 0) nsnaps_alloc = 1;
if (ngx_http_ipng_stats_agg_alloc(r, imcf, naggs_alloc,
&aggs, &naggs_alloc) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
snaps = ngx_pcalloc(r->pool, nsnaps_alloc * sizeof(*snaps));
if (n_src_alloc > 0) {
src_tbl = ngx_pcalloc(r->pool, n_src_alloc * sizeof(ngx_str_t));
if (src_tbl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (n_vip_alloc > 0) {
vip_tbl = ngx_pcalloc(r->pool, n_vip_alloc * sizeof(ngx_str_t));
if (vip_tbl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (snaps == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
ngx_shmtx_lock(&slab->mutex);
if (ngx_http_ipng_stats_snapshot_strings(r, sh,
src_tbl, n_src_alloc, &n_src,
vip_tbl, n_vip_alloc, &n_vip) != NGX_OK)
{
ngx_shmtx_unlock(&slab->mutex);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_http_ipng_stats_snapshot_nodes(sh, imcf,
filter_source, filter_vip,
src_tbl, n_src, vip_tbl, n_vip,
snaps, nsnaps_alloc, &nsnaps,
aggs, naggs_alloc, &naggs);
ngx_shmtx_unlock(&slab->mutex);
/* One JSON record per aggregated (source, vip). Size upper-bound
* accounts for: fixed overhead, up to NCLASSES class entries, 4
* histograms. */
rec_sz = 512
+ 160 * NGX_HTTP_IPNG_STATS_NCLASSES
+ 48 * (2 * (nb + 1) + 2 * (nbb + 1))
+ 4 * 128;
for (i = 0; i < naggs; i++) {
a = &aggs[i];
ngx_str_t *src = &src_tbl[a->source_id];
ngx_str_t *vip = &vip_tbl[a->vip_id];
u_char *p;
ngx_uint_t first_class = 1;
cl = ngx_http_ipng_stats_chain_buf(r, rec_sz);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
p = cl->buf->last;
p = ngx_sprintf(p,
"%s{\"source_tag\":\"%V\",\"vip\":\"%V\",\"classes\":{",
(emitted == 0) ? "" : ",", src, vip);
for (j = 0; j < nsnaps; j++) {
if (snaps[j].source_id != a->source_id
|| snaps[j].vip_id != a->vip_id) continue;
p = ngx_sprintf(p,
"%s\"%s\":{\"requests\":%uL,\"bytes_in\":%uL,"
"\"bytes_out\":%uL,\"latency_ms\":%uL,"
"\"upstream_latency_ms\":%uL}",
first_class ? "" : ",",
ngx_http_ipng_stats_class_label(snaps[j].class),
snaps[j].requests, snaps[j].bytes_in,
snaps[j].bytes_out, snaps[j].duration_sum_ms,
snaps[j].upstream_sum_ms);
first_class = 0;
}
p = ngx_sprintf(p, "},\"request_duration_ms\":{\"sum\":%uL,"
"\"count\":%uL,\"buckets\":{",
a->duration_sum_ms, a->req_total);
for (j = 0; j <= nb; j++) {
if (j == nb) {
p = ngx_sprintf(p, "%s\"+Inf\":%uL", j == 0 ? "" : ",",
a->dhist[j]);
} else {
p = ngx_sprintf(p, "%s\"%ui\":%uL", j == 0 ? "" : ",",
imcf->bucket_bounds_ms[j], a->dhist[j]);
}
}
p = ngx_sprintf(p, "}},\"upstream_response_ms\":{\"sum\":%uL,"
"\"count\":%uL,\"buckets\":{",
a->upstream_sum_ms, a->up_total);
for (j = 0; j <= nb; j++) {
if (j == nb) {
p = ngx_sprintf(p, "%s\"+Inf\":%uL", j == 0 ? "" : ",",
a->uhist[j]);
} else {
p = ngx_sprintf(p, "%s\"%ui\":%uL", j == 0 ? "" : ",",
imcf->bucket_bounds_ms[j], a->uhist[j]);
}
}
p = ngx_sprintf(p,
"}},\"bytes_in\":{\"sum\":%uL,\"count\":%uL,\"buckets\":{",
a->bytes_in_sum, a->req_total);
for (j = 0; j <= nbb; j++) {
if (j == nbb) {
p = ngx_sprintf(p, "%s\"+Inf\":%uL", j == 0 ? "" : ",",
a->bin_hist[j]);
} else {
p = ngx_sprintf(p, "%s\"%ui\":%uL", j == 0 ? "" : ",",
imcf->byte_bucket_bounds[j], a->bin_hist[j]);
}
}
p = ngx_sprintf(p,
"}},\"bytes_out\":{\"sum\":%uL,\"count\":%uL,\"buckets\":{",
a->bytes_out_sum, a->req_total);
for (j = 0; j <= nbb; j++) {
if (j == nbb) {
p = ngx_sprintf(p, "%s\"+Inf\":%uL", j == 0 ? "" : ",",
a->bout_hist[j]);
} else {
p = ngx_sprintf(p, "%s\"%ui\":%uL", j == 0 ? "" : ",",
imcf->byte_bucket_bounds[j], a->bout_hist[j]);
}
}
p = ngx_sprintf(p, "}}}");
cl->buf->last = p;
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
emitted++;
}
cl = ngx_http_ipng_stats_chain_buf(r, 8);
if (cl == NULL) return NGX_HTTP_INTERNAL_SERVER_ERROR;
cl->buf->last = ngx_sprintf(cl->buf->last, "]}\n");
if (ngx_http_ipng_stats_append(&last, cl) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return ngx_http_ipng_stats_send(r, &ctype, out);
}