First Commmit

This commit is contained in:
CN-JS-HuiBai
2026-04-14 22:41:14 +08:00
commit 9f867b19da
1086 changed files with 147554 additions and 0 deletions

View File

@@ -0,0 +1,181 @@
package trafficontrol
import (
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/sagernet/sing-box/common/compatible"
C "github.com/sagernet/sing-box/constant"
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/json"
"github.com/sagernet/sing/common/observable"
"github.com/sagernet/sing/common/x/list"
"github.com/gofrs/uuid/v5"
)
type ConnectionEventType int
const (
ConnectionEventNew ConnectionEventType = iota
ConnectionEventUpdate
ConnectionEventClosed
)
type ConnectionEvent struct {
Type ConnectionEventType
ID uuid.UUID
Metadata *TrackerMetadata
UplinkDelta int64
DownlinkDelta int64
ClosedAt time.Time
}
const closedConnectionsLimit = 1000
type Manager struct {
uploadTotal atomic.Int64
downloadTotal atomic.Int64
connections compatible.Map[uuid.UUID, Tracker]
closedConnectionsAccess sync.Mutex
closedConnections list.List[TrackerMetadata]
memory uint64
eventSubscriber *observable.Subscriber[ConnectionEvent]
}
func NewManager() *Manager {
return &Manager{}
}
func (m *Manager) SetEventHook(subscriber *observable.Subscriber[ConnectionEvent]) {
m.eventSubscriber = subscriber
}
func (m *Manager) Join(c Tracker) {
metadata := c.Metadata()
m.connections.Store(metadata.ID, c)
if m.eventSubscriber != nil {
m.eventSubscriber.Emit(ConnectionEvent{
Type: ConnectionEventNew,
ID: metadata.ID,
Metadata: metadata,
})
}
}
func (m *Manager) Leave(c Tracker) {
metadata := c.Metadata()
_, loaded := m.connections.LoadAndDelete(metadata.ID)
if loaded {
closedAt := time.Now()
metadata.ClosedAt = closedAt
metadataCopy := *metadata
m.closedConnectionsAccess.Lock()
if m.closedConnections.Len() >= closedConnectionsLimit {
m.closedConnections.PopFront()
}
m.closedConnections.PushBack(metadataCopy)
m.closedConnectionsAccess.Unlock()
if m.eventSubscriber != nil {
m.eventSubscriber.Emit(ConnectionEvent{
Type: ConnectionEventClosed,
ID: metadata.ID,
Metadata: &metadataCopy,
ClosedAt: closedAt,
})
}
}
}
func (m *Manager) PushUploaded(size int64) {
m.uploadTotal.Add(size)
}
func (m *Manager) PushDownloaded(size int64) {
m.downloadTotal.Add(size)
}
func (m *Manager) Total() (up int64, down int64) {
return m.uploadTotal.Load(), m.downloadTotal.Load()
}
func (m *Manager) ConnectionsLen() int {
return m.connections.Len()
}
func (m *Manager) Connections() []*TrackerMetadata {
var connections []*TrackerMetadata
m.connections.Range(func(_ uuid.UUID, value Tracker) bool {
connections = append(connections, value.Metadata())
return true
})
return connections
}
func (m *Manager) ClosedConnections() []*TrackerMetadata {
m.closedConnectionsAccess.Lock()
values := m.closedConnections.Array()
m.closedConnectionsAccess.Unlock()
if len(values) == 0 {
return nil
}
connections := make([]*TrackerMetadata, len(values))
for i := range values {
connections[i] = &values[i]
}
return connections
}
func (m *Manager) Connection(id uuid.UUID) Tracker {
connection, loaded := m.connections.Load(id)
if !loaded {
return nil
}
return connection
}
func (m *Manager) Snapshot() *Snapshot {
var connections []Tracker
m.connections.Range(func(_ uuid.UUID, value Tracker) bool {
if value.Metadata().OutboundType != C.TypeDNS {
connections = append(connections, value)
}
return true
})
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
m.memory = memStats.StackInuse + memStats.HeapInuse + memStats.HeapIdle - memStats.HeapReleased
return &Snapshot{
Upload: m.uploadTotal.Load(),
Download: m.downloadTotal.Load(),
Connections: connections,
Memory: m.memory,
}
}
func (m *Manager) ResetStatistic() {
m.uploadTotal.Store(0)
m.downloadTotal.Store(0)
}
type Snapshot struct {
Download int64
Upload int64
Connections []Tracker
Memory uint64
}
func (s *Snapshot) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]any{
"downloadTotal": s.Download,
"uploadTotal": s.Upload,
"connections": common.Map(s.Connections, func(t Tracker) *TrackerMetadata { return t.Metadata() }),
"memory": s.Memory,
})
}

View File

@@ -0,0 +1,254 @@
package trafficontrol
import (
"net"
"sync/atomic"
"time"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/bufio"
F "github.com/sagernet/sing/common/format"
"github.com/sagernet/sing/common/json"
N "github.com/sagernet/sing/common/network"
"github.com/gofrs/uuid/v5"
)
type TrackerMetadata struct {
ID uuid.UUID
Metadata adapter.InboundContext
CreatedAt time.Time
ClosedAt time.Time
Upload *atomic.Int64
Download *atomic.Int64
Chain []string
Rule adapter.Rule
Outbound string
OutboundType string
}
func (t TrackerMetadata) MarshalJSON() ([]byte, error) {
var inbound string
if t.Metadata.Inbound != "" {
inbound = t.Metadata.InboundType + "/" + t.Metadata.Inbound
} else {
inbound = t.Metadata.InboundType
}
var domain string
if t.Metadata.Domain != "" {
domain = t.Metadata.Domain
} else {
domain = t.Metadata.Destination.Fqdn
}
var processPath string
if t.Metadata.ProcessInfo != nil {
if t.Metadata.ProcessInfo.ProcessPath != "" {
processPath = t.Metadata.ProcessInfo.ProcessPath
} else if len(t.Metadata.ProcessInfo.AndroidPackageNames) > 0 {
processPath = t.Metadata.ProcessInfo.AndroidPackageNames[0]
}
if processPath == "" {
if t.Metadata.ProcessInfo.UserId != -1 {
processPath = F.ToString(t.Metadata.ProcessInfo.UserId)
}
} else if t.Metadata.ProcessInfo.UserName != "" {
processPath = F.ToString(processPath, " (", t.Metadata.ProcessInfo.UserName, ")")
} else if t.Metadata.ProcessInfo.UserId != -1 {
processPath = F.ToString(processPath, " (", t.Metadata.ProcessInfo.UserId, ")")
}
}
var rule string
if t.Rule != nil {
rule = F.ToString(t.Rule, " => ", t.Rule.Action())
} else {
rule = "final"
}
return json.Marshal(map[string]any{
"id": t.ID,
"metadata": map[string]any{
"network": t.Metadata.Network,
"type": inbound,
"sourceIP": t.Metadata.Source.Addr,
"destinationIP": t.Metadata.Destination.Addr,
"sourcePort": F.ToString(t.Metadata.Source.Port),
"destinationPort": F.ToString(t.Metadata.Destination.Port),
"host": domain,
"dnsMode": "normal",
"processPath": processPath,
},
"upload": t.Upload.Load(),
"download": t.Download.Load(),
"start": t.CreatedAt,
"chains": t.Chain,
"rule": rule,
"rulePayload": "",
})
}
type Tracker interface {
Metadata() *TrackerMetadata
Close() error
}
type TCPConn struct {
N.ExtendedConn
metadata TrackerMetadata
manager *Manager
}
func (tt *TCPConn) Metadata() *TrackerMetadata {
return &tt.metadata
}
func (tt *TCPConn) Close() error {
tt.manager.Leave(tt)
return tt.ExtendedConn.Close()
}
func (tt *TCPConn) Upstream() any {
return tt.ExtendedConn
}
func (tt *TCPConn) ReaderReplaceable() bool {
return true
}
func (tt *TCPConn) WriterReplaceable() bool {
return true
}
func NewTCPTracker(conn net.Conn, manager *Manager, metadata adapter.InboundContext, outboundManager adapter.OutboundManager, matchRule adapter.Rule, matchOutbound adapter.Outbound) *TCPConn {
id, _ := uuid.NewV4()
var (
chain []string
next string
outbound string
outboundType string
)
if matchOutbound != nil {
next = matchOutbound.Tag()
} else {
next = outboundManager.Default().Tag()
}
for {
detour, loaded := outboundManager.Outbound(next)
if !loaded {
break
}
chain = append(chain, next)
outbound = detour.Tag()
outboundType = detour.Type()
group, isGroup := detour.(adapter.OutboundGroup)
if !isGroup {
break
}
next = group.Now()
}
upload := new(atomic.Int64)
download := new(atomic.Int64)
tracker := &TCPConn{
ExtendedConn: bufio.NewCounterConn(conn, []N.CountFunc{func(n int64) {
upload.Add(n)
manager.PushUploaded(n)
}}, []N.CountFunc{func(n int64) {
download.Add(n)
manager.PushDownloaded(n)
}}),
metadata: TrackerMetadata{
ID: id,
Metadata: metadata,
CreatedAt: time.Now(),
Upload: upload,
Download: download,
Chain: common.Reverse(chain),
Rule: matchRule,
Outbound: outbound,
OutboundType: outboundType,
},
manager: manager,
}
manager.Join(tracker)
return tracker
}
type UDPConn struct {
N.PacketConn `json:"-"`
metadata TrackerMetadata
manager *Manager
}
func (ut *UDPConn) Metadata() *TrackerMetadata {
return &ut.metadata
}
func (ut *UDPConn) Close() error {
ut.manager.Leave(ut)
return ut.PacketConn.Close()
}
func (ut *UDPConn) Upstream() any {
return ut.PacketConn
}
func (ut *UDPConn) ReaderReplaceable() bool {
return true
}
func (ut *UDPConn) WriterReplaceable() bool {
return true
}
func NewUDPTracker(conn N.PacketConn, manager *Manager, metadata adapter.InboundContext, outboundManager adapter.OutboundManager, matchRule adapter.Rule, matchOutbound adapter.Outbound) *UDPConn {
id, _ := uuid.NewV4()
var (
chain []string
next string
outbound string
outboundType string
)
if matchOutbound != nil {
next = matchOutbound.Tag()
} else {
next = outboundManager.Default().Tag()
}
for {
detour, loaded := outboundManager.Outbound(next)
if !loaded {
break
}
chain = append(chain, next)
outbound = detour.Tag()
outboundType = detour.Type()
group, isGroup := detour.(adapter.OutboundGroup)
if !isGroup {
break
}
next = group.Now()
}
upload := new(atomic.Int64)
download := new(atomic.Int64)
trackerConn := &UDPConn{
PacketConn: bufio.NewCounterPacketConn(conn, []N.CountFunc{func(n int64) {
upload.Add(n)
manager.PushUploaded(n)
}}, []N.CountFunc{func(n int64) {
download.Add(n)
manager.PushDownloaded(n)
}}),
metadata: TrackerMetadata{
ID: id,
Metadata: metadata,
CreatedAt: time.Now(),
Upload: upload,
Download: download,
Chain: common.Reverse(chain),
Rule: matchRule,
Outbound: outbound,
OutboundType: outboundType,
},
manager: manager,
}
manager.Join(trackerConn)
return trackerConn
}