Files
SingboxForPanel/service/xboard/service.go
2026-04-14 23:43:25 +08:00

415 lines
10 KiB
Go

package xboard
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/sagernet/sing-box/adapter"
boxService "github.com/sagernet/sing-box/adapter/service"
C "github.com/sagernet/sing-box/constant"
"github.com/sagernet/sing-box/log"
"github.com/sagernet/sing-box/option"
"github.com/sagernet/sing-box/service/ssmapi"
E "github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/json/badoption"
"github.com/sagernet/sing/service"
)
func RegisterService(registry *boxService.Registry) {
boxService.Register[option.XBoardServiceOptions](registry, C.TypeXBoard, NewService)
}
type Service struct {
boxService.Adapter
ctx context.Context
cancel context.CancelFunc
logger log.ContextLogger
options option.XBoardServiceOptions
httpClient *http.Client
traffics map[string]*ssmapi.TrafficManager
users map[string]*ssmapi.UserManager
servers map[string]adapter.ManagedSSMServer
localUsers map[string]userData
inboundTags []string
syncTicker *time.Ticker
reportTicker *time.Ticker
access sync.Mutex
inboundManager adapter.InboundManager
}
type XNodeConfig struct {
Port int `json:"port"`
Protocol string `json:"protocol"`
Settings json.RawMessage `json:"settings"`
StreamSettings json.RawMessage `json:"streamSettings"`
}
type XRealitySettings struct {
Dest string `json:"dest"`
ServerNames []string `json:"serverNames"`
PrivateKey string `json:"privateKey"`
ShortId string `json:"shortId"`
}
type XStreamSettings struct {
Network string `json:"network"`
Security string `json:"security"`
RealitySettings XRealitySettings `json:"realitySettings"`
}
func NewService(ctx context.Context, logger log.ContextLogger, tag string, options option.XBoardServiceOptions) (adapter.Service, error) {
ctx, cancel := context.WithCancel(ctx)
s := &Service{
Adapter: boxService.NewAdapter(C.TypeXBoard, tag),
ctx: ctx,
cancel: cancel,
logger: logger,
options: options,
httpClient: &http.Client{Timeout: 10 * time.Second},
traffics: make(map[string]*ssmapi.TrafficManager),
users: make(map[string]*ssmapi.UserManager),
servers: make(map[string]adapter.ManagedSSMServer),
syncTicker: time.NewTicker(time.Duration(options.SyncInterval)),
reportTicker: time.NewTicker(time.Duration(options.ReportInterval)),
inboundManager: service.FromContext[adapter.InboundManager](ctx),
}
if s.options.SyncInterval == 0 {
s.syncTicker.Stop()
s.syncTicker = time.NewTicker(1 * time.Minute)
}
if s.options.ReportInterval == 0 {
s.reportTicker.Stop()
s.reportTicker = time.NewTicker(1 * time.Minute)
}
return s, nil
}
func (s *Service) Start(stage adapter.StartStage) error {
if stage != adapter.StartStateStart {
return nil
}
// Fetch node config and setup inbound
err := s.setupNode()
if err != nil {
s.logger.Error("Xboard setup error: ", err)
// Don't return error to allow sing-box to continue, service will retry in loop
}
go s.loop()
return nil
}
func (s *Service) setupNode() error {
s.logger.Info("Xboard fetching node config...")
config, err := s.fetchConfig()
if err != nil {
return err
}
inboundTag := "xboard-inbound"
var inboundOptions any
switch config.Protocol {
case "vless":
vlessOptions := option.VLESSInboundOptions{
ListenOptions: option.ListenOptions{
ListenPort: uint16(config.Port),
},
}
// Handle Reality
var streamSettings XStreamSettings
json.Unmarshal(config.StreamSettings, &streamSettings)
if streamSettings.Security == "reality" {
vlessOptions.TLS = &option.InboundTLSOptions{
Enabled: true,
ServerName: streamSettings.RealitySettings.ServerNames[0],
Reality: &option.InboundRealityOptions{
Enabled: true,
Handshake: option.InboundRealityHandshakeOptions{
ServerOptions: option.ServerOptions{
Server: streamSettings.RealitySettings.Dest,
ServerPort: 443, // Default for most reality setups
},
},
PrivateKey: streamSettings.RealitySettings.PrivateKey,
ShortID: badoption.Listable[string]{streamSettings.RealitySettings.ShortId},
},
}
}
inboundOptions = vlessOptions
case "vmess":
vmessOptions := option.VMessInboundOptions{
ListenOptions: option.ListenOptions{
ListenPort: uint16(config.Port),
},
}
inboundOptions = vmessOptions
default:
return fmt.Errorf("unsupported protocol: %s", config.Protocol)
}
// Remove old if exists
s.inboundManager.Remove(inboundTag)
// Create new inbound
err = s.inboundManager.Create(s.ctx, nil, s.logger, inboundTag, config.Protocol, inboundOptions)
if err != nil {
return err
}
// Register the new inbound in our managed list
inbound, _ := s.inboundManager.Get(inboundTag)
managedServer, isManaged := inbound.(adapter.ManagedSSMServer)
if isManaged {
traffic := ssmapi.NewTrafficManager()
managedServer.SetTracker(traffic)
user := ssmapi.NewUserManager(managedServer, traffic)
s.access.Lock()
s.traffics[inboundTag] = traffic
s.users[inboundTag] = user
s.servers[inboundTag] = managedServer
s.inboundTags = []string{inboundTag}
s.access.Unlock()
s.logger.Info("Xboard dynamic inbound [", inboundTag, "] created on port ", config.Port)
}
return nil
}
func (s *Service) fetchConfig() (*XNodeConfig, error) {
url := fmt.Sprintf("%s/api/v1/server/UniProxy/config?node_id=%d&node_type=%s&token=%s", s.options.PanelURL, s.options.NodeID, s.options.NodeType, s.options.Key)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("User-Agent", "sing-box/xboard")
resp, err := s.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
respBody, _ := io.ReadAll(resp.Body)
return nil, E.New("failed to fetch config, status: ", resp.Status, ", body: ", string(respBody))
}
var result struct {
Data XNodeConfig `json:"data"`
}
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return nil, err
}
return &result.Data, nil
}
func (s *Service) loop() {
// Initial sync
s.syncUsers()
for {
select {
case <-s.ctx.Done():
return
case <-s.syncTicker.C:
s.syncUsers()
case <-s.reportTicker.C:
s.reportTraffic()
}
}
}
type userData struct {
ID int
Email string
Key string
Flow string
}
func (s *Service) syncUsers() {
s.logger.Info("Xboard sync users...")
xUsers, err := s.fetchUsers()
if err != nil {
s.logger.Error("Xboard sync error: ", err)
return
}
s.access.Lock()
defer s.access.Unlock()
newUsers := make(map[string]userData)
for _, u := range xUsers {
key := u.UUID
if key == "" {
key = u.Passwd
}
if key == "" {
continue
}
newUsers[u.Email] = userData{
ID: u.ID,
Email: u.Email,
Key: key,
Flow: u.Flow,
}
}
for tag, server := range s.servers {
// Update users in each manager
users := make([]string, 0, len(newUsers))
keys := make([]string, 0, len(newUsers))
flows := make([]string, 0, len(newUsers))
for _, u := range newUsers {
users = append(users, u.Email)
keys = append(keys, u.Key)
flows = append(flows, u.Flow)
}
err = server.UpdateUsers(users, keys, flows)
if err != nil {
s.logger.Error("Update users for inbound ", tag, ": ", err)
}
}
// Update local ID mapping
s.localUsers = newUsers
s.logger.Info("Xboard sync completed, total users: ", len(xUsers))
}
func (s *Service) reportTraffic() {
s.logger.Trace("Xboard reporting traffic...")
s.access.Lock()
localUsers := s.localUsers
s.access.Unlock()
if len(localUsers) == 0 {
return
}
type pushItem struct {
UserID int `json:"user_id"`
U int64 `json:"u"`
D int64 `json:"d"`
}
usageMap := make(map[int]*pushItem)
for _, trafficManager := range s.traffics {
users := make([]*ssmapi.UserObject, 0, len(localUsers))
for email := range localUsers {
users = append(users, &ssmapi.UserObject{UserName: email})
}
// Read incremental usage
trafficManager.ReadUsers(users, true)
for _, u := range users {
if u.UplinkBytes == 0 && u.DownlinkBytes == 0 {
continue
}
meta, ok := localUsers[u.UserName]
if !ok {
continue
}
item, ok := usageMap[meta.ID]
if !ok {
item = &pushItem{UserID: meta.ID}
usageMap[meta.ID] = item
}
item.U += u.UplinkBytes
item.D += u.DownlinkBytes
}
}
if len(usageMap) == 0 {
return
}
pushData := make([]*pushItem, 0, len(usageMap))
for _, item := range usageMap {
pushData = append(pushData, item)
}
err := s.pushTraffic(pushData)
if err != nil {
s.logger.Error("Xboard report error: ", err)
} else {
s.logger.Info("Xboard report completed, users reported: ", len(pushData))
}
}
func (s *Service) pushTraffic(data any) error {
url := fmt.Sprintf("%s/api/v1/server/UniProxy/push?node_id=%d&node_type=%s&token=%s", s.options.PanelURL, s.options.NodeID, s.options.NodeType, s.options.Key)
body, _ := json.Marshal(data)
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "sing-box/xboard")
resp, err := s.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
respBody, _ := io.ReadAll(resp.Body)
return E.New("failed to push traffic, status: ", resp.Status, ", body: ", string(respBody))
}
return nil
}
func (s *Service) Close() error {
s.cancel()
s.syncTicker.Stop()
s.reportTicker.Stop()
return nil
}
// Xboard User Model
type XUser struct {
ID int `json:"id"`
Email string `json:"email"`
UUID string `json:"uuid"`
Passwd string `json:"passwd"`
Flow string `json:"flow"`
}
func (s *Service) fetchUsers() ([]XUser, error) {
url := fmt.Sprintf("%s/api/v1/server/UniProxy/user?node_id=%d&node_type=%s&token=%s", s.options.PanelURL, s.options.NodeID, s.options.NodeType, s.options.Key)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("User-Agent", "sing-box/xboard")
resp, err := s.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
respBody, _ := io.ReadAll(resp.Body)
return nil, E.New("failed to fetch users, status: ", resp.Status, ", body: ", string(respBody))
}
var result struct {
Data []XUser `json:"data"`
}
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return nil, err
}
return result.Data, nil
}