Add is_tor plumbing from collector->aggregator->frontend/cli
This commit is contained in:
@@ -6,22 +6,25 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// LogRecord holds the four dimensions extracted from a single nginx log line.
|
||||
// LogRecord holds the dimensions extracted from a single nginx log line.
|
||||
type LogRecord struct {
|
||||
Website string
|
||||
ClientPrefix string
|
||||
URI string
|
||||
Status string
|
||||
IsTor bool
|
||||
}
|
||||
|
||||
// ParseLine parses a tab-separated logtail log line:
|
||||
//
|
||||
// $host \t $remote_addr \t $msec \t $request_method \t $request_uri \t $status \t $body_bytes_sent \t $request_time
|
||||
// $host \t $remote_addr \t $msec \t $request_method \t $request_uri \t $status \t $body_bytes_sent \t $request_time \t $is_tor
|
||||
//
|
||||
// The is_tor field (0 or 1) is optional for backward compatibility with
|
||||
// older log files that omit it; it defaults to false when absent.
|
||||
// Returns false for lines with fewer than 8 fields.
|
||||
func ParseLine(line string, v4bits, v6bits int) (LogRecord, bool) {
|
||||
// SplitN caps allocations; we need exactly 8 fields.
|
||||
fields := strings.SplitN(line, "\t", 8)
|
||||
// SplitN caps allocations; we need up to 9 fields.
|
||||
fields := strings.SplitN(line, "\t", 9)
|
||||
if len(fields) < 8 {
|
||||
return LogRecord{}, false
|
||||
}
|
||||
@@ -36,11 +39,14 @@ func ParseLine(line string, v4bits, v6bits int) (LogRecord, bool) {
|
||||
return LogRecord{}, false
|
||||
}
|
||||
|
||||
isTor := len(fields) == 9 && fields[8] == "1"
|
||||
|
||||
return LogRecord{
|
||||
Website: fields[0],
|
||||
ClientPrefix: prefix,
|
||||
URI: uri,
|
||||
Status: fields[5],
|
||||
IsTor: isTor,
|
||||
}, true
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,42 @@ func TestParseLine(t *testing.T) {
|
||||
Status: "429",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "is_tor=1 sets IsTor true",
|
||||
line: "tor.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001\t1",
|
||||
wantOK: true,
|
||||
want: LogRecord{
|
||||
Website: "tor.example.com",
|
||||
ClientPrefix: "1.2.3.0/24",
|
||||
URI: "/",
|
||||
Status: "200",
|
||||
IsTor: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "is_tor=0 sets IsTor false",
|
||||
line: "normal.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001\t0",
|
||||
wantOK: true,
|
||||
want: LogRecord{
|
||||
Website: "normal.example.com",
|
||||
ClientPrefix: "1.2.3.0/24",
|
||||
URI: "/",
|
||||
Status: "200",
|
||||
IsTor: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "missing is_tor field defaults to false (backward compat)",
|
||||
line: "old.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001",
|
||||
wantOK: true,
|
||||
want: LogRecord{
|
||||
Website: "old.example.com",
|
||||
ClientPrefix: "1.2.3.0/24",
|
||||
URI: "/",
|
||||
Status: "200",
|
||||
IsTor: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
|
||||
@@ -104,10 +104,10 @@ func TestGRPCEndToEnd(t *testing.T) {
|
||||
|
||||
// Pre-populate with known data then rotate so it's queryable
|
||||
for i := 0; i < 500; i++ {
|
||||
store.ingest(LogRecord{"busy.com", "1.2.3.0/24", "/api", "200"})
|
||||
store.ingest(LogRecord{Website: "busy.com", ClientPrefix: "1.2.3.0/24", URI: "/api", Status: "200"})
|
||||
}
|
||||
for i := 0; i < 200; i++ {
|
||||
store.ingest(LogRecord{"quiet.com", "5.6.7.0/24", "/", "429"})
|
||||
store.ingest(LogRecord{Website: "quiet.com", ClientPrefix: "5.6.7.0/24", URI: "/", Status: "429"})
|
||||
}
|
||||
store.rotate(time.Now())
|
||||
|
||||
@@ -192,7 +192,7 @@ func TestGRPCEndToEnd(t *testing.T) {
|
||||
t.Fatalf("StreamSnapshots error: %v", err)
|
||||
}
|
||||
|
||||
store.ingest(LogRecord{"new.com", "9.9.9.0/24", "/new", "200"})
|
||||
store.ingest(LogRecord{Website: "new.com", ClientPrefix: "9.9.9.0/24", URI: "/new", Status: "200"})
|
||||
store.rotate(time.Now())
|
||||
|
||||
snap, err := stream.Recv()
|
||||
|
||||
@@ -15,7 +15,7 @@ type Store struct {
|
||||
source string
|
||||
|
||||
// live map — written only by the Run goroutine; no locking needed on writes
|
||||
live map[st.Tuple4]int64
|
||||
live map[st.Tuple5]int64
|
||||
liveLen int
|
||||
|
||||
// ring buffers — protected by mu for reads
|
||||
@@ -36,7 +36,7 @@ type Store struct {
|
||||
func NewStore(source string) *Store {
|
||||
return &Store{
|
||||
source: source,
|
||||
live: make(map[st.Tuple4]int64, liveMapCap),
|
||||
live: make(map[st.Tuple5]int64, liveMapCap),
|
||||
subs: make(map[chan st.Snapshot]struct{}),
|
||||
}
|
||||
}
|
||||
@@ -44,7 +44,7 @@ func NewStore(source string) *Store {
|
||||
// ingest records one log record into the live map.
|
||||
// Must only be called from the Run goroutine.
|
||||
func (s *Store) ingest(r LogRecord) {
|
||||
key := st.Tuple4{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status}
|
||||
key := st.Tuple5{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status, IsTor: r.IsTor}
|
||||
if _, exists := s.live[key]; !exists {
|
||||
if s.liveLen >= liveMapCap {
|
||||
return
|
||||
@@ -77,7 +77,7 @@ func (s *Store) rotate(now time.Time) {
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
s.live = make(map[st.Tuple4]int64, liveMapCap)
|
||||
s.live = make(map[st.Tuple5]int64, liveMapCap)
|
||||
s.liveLen = 0
|
||||
|
||||
s.broadcast(fine)
|
||||
|
||||
@@ -15,7 +15,7 @@ func makeStore() *Store {
|
||||
|
||||
func ingestN(s *Store, website, prefix, uri, status string, n int) {
|
||||
for i := 0; i < n; i++ {
|
||||
s.ingest(LogRecord{website, prefix, uri, status})
|
||||
s.ingest(LogRecord{Website: website, ClientPrefix: prefix, URI: uri, Status: status})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user