116 lines
2.3 KiB
Go
116 lines
2.3 KiB
Go
package ipstat
|
|
|
|
import (
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.nyne.dev/o/ip_stat/internal"
|
|
)
|
|
|
|
const (
|
|
defaultBucketSize = 5 * time.Minute
|
|
)
|
|
|
|
// IPStat is used to count the number of unique IPs in a given window. It is thread-safe.
|
|
type IPStat struct {
|
|
count map[string]int64
|
|
window time.Duration
|
|
buckets []*bucket
|
|
current *bucket
|
|
index int
|
|
ipChan chan string
|
|
stat atomic.Int64
|
|
bucketSize time.Duration
|
|
}
|
|
|
|
type bucket struct {
|
|
ips *internal.Set[string]
|
|
}
|
|
|
|
func (b *bucket) add(ip string) {
|
|
b.ips.Add(ip)
|
|
}
|
|
|
|
// NewIPStat creates a new IPStat with the given window size. A new goroutine is started to maintain the IPStat. Remember to call Stop() to clean up the goroutine.
|
|
func NewIPStat(window time.Duration, opts ...Option) *IPStat {
|
|
options := &Options{
|
|
bucketSize: defaultBucketSize,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(options)
|
|
}
|
|
bucketSize := options.bucketSize
|
|
if window < bucketSize {
|
|
panic("window size is too small")
|
|
}
|
|
window = window.Truncate(bucketSize)
|
|
numBuckets := int(window / bucketSize)
|
|
buckets := make([]*bucket, numBuckets)
|
|
for i := 0; i < numBuckets; i++ {
|
|
buckets[i] = &bucket{
|
|
ips: internal.NewSet[string](),
|
|
}
|
|
}
|
|
stat := &IPStat{
|
|
count: make(map[string]int64),
|
|
window: window,
|
|
buckets: buckets,
|
|
current: &bucket{
|
|
ips: internal.NewSet[string](),
|
|
},
|
|
ipChan: make(chan string, 100),
|
|
bucketSize: bucketSize,
|
|
}
|
|
stat.runMaintainer()
|
|
return stat
|
|
}
|
|
|
|
// Add adds an IP to the IPStat.
|
|
func (s *IPStat) Add(ip string) {
|
|
s.ipChan <- ip
|
|
}
|
|
|
|
func (s *IPStat) runMaintainer() {
|
|
go func() {
|
|
ticker := time.NewTicker(s.bucketSize)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.buckets[s.index].ips.Range(func(ip string) bool {
|
|
s.count[ip]--
|
|
if s.count[ip] == 0 {
|
|
delete(s.count, ip)
|
|
}
|
|
return true
|
|
})
|
|
s.current.ips.Range(func(ip string) bool {
|
|
s.count[ip]++
|
|
return true
|
|
})
|
|
s.buckets[s.index] = s.current
|
|
s.index = (s.index + 1) % len(s.buckets)
|
|
s.current = &bucket{
|
|
ips: internal.NewSet[string](),
|
|
}
|
|
s.stat.Store(int64(len(s.count)))
|
|
case ip, ok := <-s.ipChan:
|
|
if !ok {
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
s.current.add(ip)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop stops the IPStat.
|
|
func (s *IPStat) Stop() {
|
|
close(s.ipChan)
|
|
}
|
|
|
|
// Count returns the number of unique IPs in the IPStat.
|
|
func (s *IPStat) Count() int64 {
|
|
return s.stat.Load()
|
|
}
|