platform: Refactor CommandClient & Connections
This commit is contained in:
@@ -56,6 +56,9 @@ type StartedService struct {
|
||||
urlTestHistoryStorage *urltest.HistoryStorage
|
||||
clashModeSubscriber *observable.Subscriber[struct{}]
|
||||
clashModeObserver *observable.Observer[struct{}]
|
||||
|
||||
connectionEventSubscriber *observable.Subscriber[trafficontrol.ConnectionEvent]
|
||||
connectionEventObserver *observable.Observer[trafficontrol.ConnectionEvent]
|
||||
}
|
||||
|
||||
type ServiceOptions struct {
|
||||
@@ -83,17 +86,19 @@ func NewStartedService(options ServiceOptions) *StartedService {
|
||||
// userID: options.UserID,
|
||||
// groupID: options.GroupID,
|
||||
// systemProxyEnabled: options.SystemProxyEnabled,
|
||||
serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
|
||||
serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
|
||||
logSubscriber: observable.NewSubscriber[*log.Entry](128),
|
||||
urlTestSubscriber: observable.NewSubscriber[struct{}](1),
|
||||
urlTestHistoryStorage: urltest.NewHistoryStorage(),
|
||||
clashModeSubscriber: observable.NewSubscriber[struct{}](1),
|
||||
serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
|
||||
serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
|
||||
logSubscriber: observable.NewSubscriber[*log.Entry](128),
|
||||
urlTestSubscriber: observable.NewSubscriber[struct{}](1),
|
||||
urlTestHistoryStorage: urltest.NewHistoryStorage(),
|
||||
clashModeSubscriber: observable.NewSubscriber[struct{}](1),
|
||||
connectionEventSubscriber: observable.NewSubscriber[trafficontrol.ConnectionEvent](256),
|
||||
}
|
||||
s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
|
||||
s.logObserver = observable.NewObserver(s.logSubscriber, 64)
|
||||
s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
|
||||
s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
|
||||
s.connectionEventObserver = observable.NewObserver(s.connectionEventSubscriber, 64)
|
||||
return s
|
||||
}
|
||||
|
||||
@@ -183,6 +188,7 @@ func (s *StartedService) StartOrReloadService(profileContent string, options *Ov
|
||||
instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
|
||||
if instance.clashServer != nil {
|
||||
instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
|
||||
instance.clashServer.(*clashapi.Server).TrafficManager().SetEventHook(s.connectionEventSubscriber)
|
||||
}
|
||||
s.serviceAccess.Unlock()
|
||||
err = instance.Start()
|
||||
@@ -666,7 +672,7 @@ func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *Set
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[Connections]) error {
|
||||
func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[ConnectionEvents]) error {
|
||||
err := s.waitForStarted(server.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -674,69 +680,253 @@ func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsReque
|
||||
s.serviceAccess.RLock()
|
||||
boxService := s.instance
|
||||
s.serviceAccess.RUnlock()
|
||||
ticker := time.NewTicker(time.Duration(request.Interval))
|
||||
defer ticker.Stop()
|
||||
|
||||
if boxService.clashServer == nil {
|
||||
return E.New("clash server not available")
|
||||
}
|
||||
|
||||
trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
|
||||
var (
|
||||
connections = make(map[uuid.UUID]*Connection)
|
||||
outConnections []*Connection
|
||||
)
|
||||
|
||||
subscription, done, err := s.connectionEventObserver.Subscribe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.connectionEventObserver.UnSubscribe(subscription)
|
||||
|
||||
connectionSnapshots := make(map[uuid.UUID]connectionSnapshot)
|
||||
initialEvents := s.buildInitialConnectionState(trafficManager, connectionSnapshots)
|
||||
err = server.Send(&ConnectionEvents{
|
||||
Events: initialEvents,
|
||||
Reset_: true,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
interval := time.Duration(request.Interval)
|
||||
if interval <= 0 {
|
||||
interval = time.Second
|
||||
}
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
outConnections = outConnections[:0]
|
||||
for _, connection := range trafficManager.Connections() {
|
||||
outConnections = append(outConnections, newConnection(connections, connection, false))
|
||||
}
|
||||
for _, connection := range trafficManager.ClosedConnections() {
|
||||
outConnections = append(outConnections, newConnection(connections, connection, true))
|
||||
}
|
||||
err := server.Send(&Connections{Connections: outConnections})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return s.ctx.Err()
|
||||
case <-server.Context().Done():
|
||||
return server.Context().Err()
|
||||
case <-done:
|
||||
return nil
|
||||
|
||||
case event := <-subscription:
|
||||
var pendingEvents []*ConnectionEvent
|
||||
if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
|
||||
pendingEvents = append(pendingEvents, protoEvent)
|
||||
}
|
||||
drain:
|
||||
for {
|
||||
select {
|
||||
case event = <-subscription:
|
||||
if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
|
||||
pendingEvents = append(pendingEvents, protoEvent)
|
||||
}
|
||||
default:
|
||||
break drain
|
||||
}
|
||||
}
|
||||
if len(pendingEvents) > 0 {
|
||||
err = server.Send(&ConnectionEvents{Events: pendingEvents})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
protoEvents := s.buildTrafficUpdates(trafficManager, connectionSnapshots)
|
||||
if len(protoEvents) == 0 {
|
||||
continue
|
||||
}
|
||||
err = server.Send(&ConnectionEvents{Events: protoEvents})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol.TrackerMetadata, isClosed bool) *Connection {
|
||||
if oldConnection, loaded := connections[metadata.ID]; loaded {
|
||||
if isClosed {
|
||||
if oldConnection.ClosedAt == 0 {
|
||||
oldConnection.Uplink = 0
|
||||
oldConnection.Downlink = 0
|
||||
oldConnection.ClosedAt = metadata.ClosedAt.UnixMilli()
|
||||
}
|
||||
return oldConnection
|
||||
type connectionSnapshot struct {
|
||||
uplink int64
|
||||
downlink int64
|
||||
hadTraffic bool
|
||||
}
|
||||
|
||||
func (s *StartedService) buildInitialConnectionState(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
|
||||
var events []*ConnectionEvent
|
||||
|
||||
for _, metadata := range manager.Connections() {
|
||||
events = append(events, &ConnectionEvent{
|
||||
Type: ConnectionEventType_CONNECTION_EVENT_NEW,
|
||||
Id: metadata.ID.String(),
|
||||
Connection: buildConnectionProto(metadata),
|
||||
})
|
||||
snapshots[metadata.ID] = connectionSnapshot{
|
||||
uplink: metadata.Upload.Load(),
|
||||
downlink: metadata.Download.Load(),
|
||||
}
|
||||
lastUplink := oldConnection.UplinkTotal
|
||||
lastDownlink := oldConnection.DownlinkTotal
|
||||
uplinkTotal := metadata.Upload.Load()
|
||||
downlinkTotal := metadata.Download.Load()
|
||||
oldConnection.Uplink = uplinkTotal - lastUplink
|
||||
oldConnection.Downlink = downlinkTotal - lastDownlink
|
||||
oldConnection.UplinkTotal = uplinkTotal
|
||||
oldConnection.DownlinkTotal = downlinkTotal
|
||||
return oldConnection
|
||||
}
|
||||
|
||||
for _, metadata := range manager.ClosedConnections() {
|
||||
conn := buildConnectionProto(metadata)
|
||||
conn.ClosedAt = metadata.ClosedAt.UnixMilli()
|
||||
events = append(events, &ConnectionEvent{
|
||||
Type: ConnectionEventType_CONNECTION_EVENT_NEW,
|
||||
Id: metadata.ID.String(),
|
||||
Connection: conn,
|
||||
})
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
func (s *StartedService) applyConnectionEvent(event trafficontrol.ConnectionEvent, snapshots map[uuid.UUID]connectionSnapshot) *ConnectionEvent {
|
||||
switch event.Type {
|
||||
case trafficontrol.ConnectionEventNew:
|
||||
if _, exists := snapshots[event.ID]; exists {
|
||||
return nil
|
||||
}
|
||||
snapshots[event.ID] = connectionSnapshot{
|
||||
uplink: event.Metadata.Upload.Load(),
|
||||
downlink: event.Metadata.Download.Load(),
|
||||
}
|
||||
return &ConnectionEvent{
|
||||
Type: ConnectionEventType_CONNECTION_EVENT_NEW,
|
||||
Id: event.ID.String(),
|
||||
Connection: buildConnectionProto(event.Metadata),
|
||||
}
|
||||
case trafficontrol.ConnectionEventClosed:
|
||||
delete(snapshots, event.ID)
|
||||
protoEvent := &ConnectionEvent{
|
||||
Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
|
||||
Id: event.ID.String(),
|
||||
}
|
||||
closedAt := event.ClosedAt
|
||||
if closedAt.IsZero() && !event.Metadata.ClosedAt.IsZero() {
|
||||
closedAt = event.Metadata.ClosedAt
|
||||
}
|
||||
if closedAt.IsZero() {
|
||||
closedAt = time.Now()
|
||||
}
|
||||
protoEvent.ClosedAt = closedAt.UnixMilli()
|
||||
if event.Metadata.ID != uuid.Nil {
|
||||
conn := buildConnectionProto(event.Metadata)
|
||||
conn.ClosedAt = protoEvent.ClosedAt
|
||||
protoEvent.Connection = conn
|
||||
}
|
||||
return protoEvent
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StartedService) buildTrafficUpdates(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
|
||||
activeConnections := manager.Connections()
|
||||
activeIndex := make(map[uuid.UUID]trafficontrol.TrackerMetadata, len(activeConnections))
|
||||
var events []*ConnectionEvent
|
||||
|
||||
for _, metadata := range activeConnections {
|
||||
activeIndex[metadata.ID] = metadata
|
||||
currentUpload := metadata.Upload.Load()
|
||||
currentDownload := metadata.Download.Load()
|
||||
snapshot, exists := snapshots[metadata.ID]
|
||||
if !exists {
|
||||
snapshots[metadata.ID] = connectionSnapshot{
|
||||
uplink: currentUpload,
|
||||
downlink: currentDownload,
|
||||
}
|
||||
events = append(events, &ConnectionEvent{
|
||||
Type: ConnectionEventType_CONNECTION_EVENT_NEW,
|
||||
Id: metadata.ID.String(),
|
||||
Connection: buildConnectionProto(metadata),
|
||||
})
|
||||
continue
|
||||
}
|
||||
uplinkDelta := currentUpload - snapshot.uplink
|
||||
downlinkDelta := currentDownload - snapshot.downlink
|
||||
if uplinkDelta < 0 || downlinkDelta < 0 {
|
||||
snapshots[metadata.ID] = connectionSnapshot{
|
||||
uplink: currentUpload,
|
||||
downlink: currentDownload,
|
||||
}
|
||||
continue
|
||||
}
|
||||
if uplinkDelta > 0 || downlinkDelta > 0 {
|
||||
snapshots[metadata.ID] = connectionSnapshot{
|
||||
uplink: currentUpload,
|
||||
downlink: currentDownload,
|
||||
hadTraffic: true,
|
||||
}
|
||||
events = append(events, &ConnectionEvent{
|
||||
Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
|
||||
Id: metadata.ID.String(),
|
||||
UplinkDelta: uplinkDelta,
|
||||
DownlinkDelta: downlinkDelta,
|
||||
})
|
||||
continue
|
||||
}
|
||||
if snapshot.hadTraffic {
|
||||
snapshots[metadata.ID] = connectionSnapshot{
|
||||
uplink: currentUpload,
|
||||
downlink: currentDownload,
|
||||
}
|
||||
events = append(events, &ConnectionEvent{
|
||||
Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
|
||||
Id: metadata.ID.String(),
|
||||
UplinkDelta: 0,
|
||||
DownlinkDelta: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var closedIndex map[uuid.UUID]trafficontrol.TrackerMetadata
|
||||
for id := range snapshots {
|
||||
if _, exists := activeIndex[id]; exists {
|
||||
continue
|
||||
}
|
||||
if closedIndex == nil {
|
||||
closedIndex = make(map[uuid.UUID]trafficontrol.TrackerMetadata)
|
||||
for _, metadata := range manager.ClosedConnections() {
|
||||
closedIndex[metadata.ID] = metadata
|
||||
}
|
||||
}
|
||||
closedAt := time.Now()
|
||||
var conn *Connection
|
||||
if metadata, ok := closedIndex[id]; ok {
|
||||
if !metadata.ClosedAt.IsZero() {
|
||||
closedAt = metadata.ClosedAt
|
||||
}
|
||||
conn = buildConnectionProto(metadata)
|
||||
conn.ClosedAt = closedAt.UnixMilli()
|
||||
}
|
||||
events = append(events, &ConnectionEvent{
|
||||
Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
|
||||
Id: id.String(),
|
||||
ClosedAt: closedAt.UnixMilli(),
|
||||
Connection: conn,
|
||||
})
|
||||
delete(snapshots, id)
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
func buildConnectionProto(metadata trafficontrol.TrackerMetadata) *Connection {
|
||||
var rule string
|
||||
if metadata.Rule != nil {
|
||||
rule = metadata.Rule.String()
|
||||
}
|
||||
uplinkTotal := metadata.Upload.Load()
|
||||
downlinkTotal := metadata.Download.Load()
|
||||
uplink := uplinkTotal
|
||||
downlink := downlinkTotal
|
||||
var closedAt int64
|
||||
if !metadata.ClosedAt.IsZero() {
|
||||
closedAt = metadata.ClosedAt.UnixMilli()
|
||||
uplink = 0
|
||||
downlink = 0
|
||||
}
|
||||
var processInfo *ProcessInfo
|
||||
if metadata.Metadata.ProcessInfo != nil {
|
||||
processInfo = &ProcessInfo{
|
||||
@@ -747,7 +937,7 @@ func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol
|
||||
PackageName: metadata.Metadata.ProcessInfo.AndroidPackageName,
|
||||
}
|
||||
}
|
||||
connection := &Connection{
|
||||
return &Connection{
|
||||
Id: metadata.ID.String(),
|
||||
Inbound: metadata.Metadata.Inbound,
|
||||
InboundType: metadata.Metadata.InboundType,
|
||||
@@ -760,9 +950,6 @@ func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol
|
||||
User: metadata.Metadata.User,
|
||||
FromOutbound: metadata.Metadata.Outbound,
|
||||
CreatedAt: metadata.CreatedAt.UnixMilli(),
|
||||
ClosedAt: closedAt,
|
||||
Uplink: uplink,
|
||||
Downlink: downlink,
|
||||
UplinkTotal: uplinkTotal,
|
||||
DownlinkTotal: downlinkTotal,
|
||||
Rule: rule,
|
||||
@@ -771,8 +958,6 @@ func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol
|
||||
ChainList: metadata.Chain,
|
||||
ProcessInfo: processInfo,
|
||||
}
|
||||
connections[metadata.ID] = connection
|
||||
return connection
|
||||
}
|
||||
|
||||
func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {
|
||||
|
||||
Reference in New Issue
Block a user