Prepare v2ray client/server transport

This commit is contained in:
世界
2022-08-22 18:53:47 +08:00
parent 6253e2e24c
commit 082872b2f3
27 changed files with 1490 additions and 74 deletions

View File

@@ -0,0 +1,99 @@
package v2raygrpc
import (
"context"
"crypto/tls"
"net"
"sync"
"time"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing/common"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
var _ adapter.V2RayClientTransport = (*Client)(nil)
type Client struct {
ctx context.Context
dialer N.Dialer
serverAddr string
serviceName string
dialOptions []grpc.DialOption
conn *grpc.ClientConn
connAccess sync.Mutex
}
func NewClient(ctx context.Context, dialer N.Dialer, serverAddr M.Socksaddr, serviceName string, tlsConfig *tls.Config) adapter.V2RayClientTransport {
var dialOptions []grpc.DialOption
if tlsConfig != nil {
dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
dialOptions = append(dialOptions, grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 500 * time.Millisecond,
Multiplier: 1.5,
Jitter: 0.2,
MaxDelay: 19 * time.Second,
},
MinConnectTimeout: 5 * time.Second,
}))
dialOptions = append(dialOptions, grpc.WithContextDialer(func(ctx context.Context, server string) (net.Conn, error) {
return dialer.DialContext(ctx, N.NetworkTCP, M.ParseSocksaddr(server))
}))
dialOptions = append(dialOptions, grpc.WithReturnConnectionError())
return &Client{
ctx: ctx,
dialer: dialer,
serverAddr: serverAddr.String(),
serviceName: serviceName,
dialOptions: dialOptions,
}
}
func (c *Client) Close() error {
return common.Close(c.conn)
}
func (c *Client) connect() (*grpc.ClientConn, error) {
conn := c.conn
if conn != nil && conn.GetState() != connectivity.Shutdown {
return conn, nil
}
c.connAccess.Lock()
defer c.connAccess.Unlock()
conn = c.conn
if conn != nil && conn.GetState() != connectivity.Shutdown {
return conn, nil
}
conn, err := grpc.DialContext(c.ctx, c.serverAddr, c.dialOptions...)
if err != nil {
return nil, err
}
c.conn = conn
return conn, nil
}
func (c *Client) DialContext(ctx context.Context) (net.Conn, error) {
clientConn, err := c.connect()
if err != nil {
return nil, err
}
client := NewGunServiceClient(clientConn).(GunServiceCustomNameClient)
ctx, cancel := context.WithCancel(ctx)
stream, err := client.TunCustomName(ctx, c.serviceName)
if err != nil {
cancel()
return nil, err
}
return NewGRPCConn(stream, cancel), nil
}

106
transport/v2raygrpc/conn.go Normal file
View File

@@ -0,0 +1,106 @@
package v2raygrpc
import (
"context"
"io"
"net"
"os"
"strings"
"time"
"github.com/sagernet/sing/common/rw"
)
var _ net.Conn = (*GRPCConn)(nil)
type GRPCConn struct {
GunService
cancel context.CancelFunc
cache []byte
}
func NewGRPCConn(service GunService, cancel context.CancelFunc) *GRPCConn {
if client, isClient := service.(GunService_TunClient); isClient {
service = &clientConnWrapper{client}
}
return &GRPCConn{
GunService: service,
cancel: cancel,
}
}
func (c *GRPCConn) Read(b []byte) (n int, err error) {
if len(c.cache) > 0 {
n = copy(b, c.cache)
c.cache = c.cache[n:]
return
}
hunk, err := c.Recv()
err = wrapError(err)
if err != nil {
return
}
n = copy(b, hunk.Data)
if n < len(hunk.Data) {
c.cache = hunk.Data[n:]
}
return
}
func (c *GRPCConn) Write(b []byte) (n int, err error) {
err = wrapError(c.Send(&Hunk{Data: b}))
if err != nil {
return
}
return len(b), nil
}
func (c *GRPCConn) Close() error {
c.cancel()
return nil
}
func (c *GRPCConn) LocalAddr() net.Addr {
return nil
}
func (c *GRPCConn) RemoteAddr() net.Addr {
return nil
}
func (c *GRPCConn) SetDeadline(t time.Time) error {
return os.ErrInvalid
}
func (c *GRPCConn) SetReadDeadline(t time.Time) error {
return os.ErrInvalid
}
func (c *GRPCConn) SetWriteDeadline(t time.Time) error {
return os.ErrInvalid
}
func (c *GRPCConn) Upstream() any {
return c.GunService
}
var _ rw.WriteCloser = (*clientConnWrapper)(nil)
type clientConnWrapper struct {
GunService_TunClient
}
func (c *clientConnWrapper) CloseWrite() error {
return c.CloseSend()
}
func wrapError(err error) error {
// grpc uses stupid internal error types
if err == nil {
return nil
}
if strings.Contains(err.Error(), "EOF") {
return io.EOF
}
return err
}

View File

@@ -0,0 +1,51 @@
package v2raygrpc
import (
"context"
"google.golang.org/grpc"
)
type GunService interface {
Context() context.Context
Send(*Hunk) error
Recv() (*Hunk, error)
}
func ServerDesc(name string) grpc.ServiceDesc {
return grpc.ServiceDesc{
ServiceName: name,
HandlerType: (*GunServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Tun",
Handler: _GunService_Tun_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "gun.proto",
}
}
func (c *gunServiceClient) TunCustomName(ctx context.Context, name string, opts ...grpc.CallOption) (GunService_TunClient, error) {
stream, err := c.cc.NewStream(ctx, &ServerDesc(name).Streams[0], "/"+name+"/Tun", opts...)
if err != nil {
return nil, err
}
x := &gunServiceTunClient{stream}
return x, nil
}
var _ GunServiceCustomNameClient = (*gunServiceClient)(nil)
type GunServiceCustomNameClient interface {
TunCustomName(ctx context.Context, name string, opts ...grpc.CallOption) (GunService_TunClient, error)
Tun(ctx context.Context, opts ...grpc.CallOption) (GunService_TunClient, error)
}
func RegisterGunServiceCustomNameServer(s *grpc.Server, srv GunServiceServer, name string) {
desc := ServerDesc(name)
s.RegisterService(&desc, srv)
}

View File

@@ -0,0 +1,53 @@
package v2raygrpc
import (
"context"
"crypto/tls"
"net"
"github.com/sagernet/sing-box/adapter"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var _ adapter.V2RayServerTransport = (*Server)(nil)
type Server struct {
ctx context.Context
handler N.TCPConnectionHandler
server *grpc.Server
}
func NewServer(ctx context.Context, serviceName string, tlsConfig *tls.Config, handler N.TCPConnectionHandler) *Server {
var serverOptions []grpc.ServerOption
if tlsConfig != nil {
tlsConfig.NextProtos = []string{"h2"}
serverOptions = append(serverOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
server := &Server{ctx, handler, grpc.NewServer(serverOptions...)}
RegisterGunServiceCustomNameServer(server.server, server, serviceName)
return server
}
func (s *Server) Tun(server GunService_TunServer) error {
ctx, cancel := context.WithCancel(s.ctx)
conn := NewGRPCConn(server, cancel)
go s.handler.NewConnection(ctx, conn, M.Metadata{})
<-ctx.Done()
return nil
}
func (s *Server) mustEmbedUnimplementedGunServiceServer() {
}
func (s *Server) Serve(listener net.Listener) error {
return s.server.Serve(listener)
}
func (s *Server) Close() error {
s.server.Stop()
return nil
}

View File

@@ -0,0 +1,150 @@
package v2raygrpc
import (
reflect "reflect"
sync "sync"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Hunk struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *Hunk) Reset() {
*x = Hunk{}
if protoimpl.UnsafeEnabled {
mi := &file_transport_v2raygrpc_stream_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Hunk) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Hunk) ProtoMessage() {}
func (x *Hunk) ProtoReflect() protoreflect.Message {
mi := &file_transport_v2raygrpc_stream_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Hunk.ProtoReflect.Descriptor instead.
func (*Hunk) Descriptor() ([]byte, []int) {
return file_transport_v2raygrpc_stream_proto_rawDescGZIP(), []int{0}
}
func (x *Hunk) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
var File_transport_v2raygrpc_stream_proto protoreflect.FileDescriptor
var file_transport_v2raygrpc_stream_proto_rawDesc = []byte{
0x0a, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x76, 0x32, 0x72, 0x61,
0x79, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x13, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x76, 0x32,
0x72, 0x61, 0x79, 0x67, 0x72, 0x70, 0x63, 0x22, 0x1a, 0x0a, 0x04, 0x48, 0x75, 0x6e, 0x6b, 0x12,
0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64,
0x61, 0x74, 0x61, 0x32, 0x4d, 0x0a, 0x0a, 0x47, 0x75, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x12, 0x3f, 0x0a, 0x03, 0x54, 0x75, 0x6e, 0x12, 0x19, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73,
0x70, 0x6f, 0x72, 0x74, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x48,
0x75, 0x6e, 0x6b, 0x1a, 0x19, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e,
0x76, 0x32, 0x72, 0x61, 0x79, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x75, 0x6e, 0x6b, 0x28, 0x01,
0x30, 0x01, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x73, 0x61, 0x67, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x69, 0x6e, 0x67, 0x2d, 0x62,
0x6f, 0x78, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x76, 0x32, 0x72,
0x61, 0x79, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_transport_v2raygrpc_stream_proto_rawDescOnce sync.Once
file_transport_v2raygrpc_stream_proto_rawDescData = file_transport_v2raygrpc_stream_proto_rawDesc
)
func file_transport_v2raygrpc_stream_proto_rawDescGZIP() []byte {
file_transport_v2raygrpc_stream_proto_rawDescOnce.Do(func() {
file_transport_v2raygrpc_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_transport_v2raygrpc_stream_proto_rawDescData)
})
return file_transport_v2raygrpc_stream_proto_rawDescData
}
var (
file_transport_v2raygrpc_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
file_transport_v2raygrpc_stream_proto_goTypes = []interface{}{
(*Hunk)(nil), // 0: transport.v2raygrpc.Hunk
}
)
var file_transport_v2raygrpc_stream_proto_depIdxs = []int32{
0, // 0: transport.v2raygrpc.GunService.Tun:input_type -> transport.v2raygrpc.Hunk
0, // 1: transport.v2raygrpc.GunService.Tun:output_type -> transport.v2raygrpc.Hunk
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_transport_v2raygrpc_stream_proto_init() }
func file_transport_v2raygrpc_stream_proto_init() {
if File_transport_v2raygrpc_stream_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_transport_v2raygrpc_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Hunk); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_transport_v2raygrpc_stream_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_transport_v2raygrpc_stream_proto_goTypes,
DependencyIndexes: file_transport_v2raygrpc_stream_proto_depIdxs,
MessageInfos: file_transport_v2raygrpc_stream_proto_msgTypes,
}.Build()
File_transport_v2raygrpc_stream_proto = out.File
file_transport_v2raygrpc_stream_proto_rawDesc = nil
file_transport_v2raygrpc_stream_proto_goTypes = nil
file_transport_v2raygrpc_stream_proto_depIdxs = nil
}

View File

@@ -0,0 +1,12 @@
syntax = "proto3";
package transport.v2raygrpc;
option go_package = "github.com/sagernet/sing-box/transport/v2raygrpc";
message Hunk {
bytes data = 1;
}
service GunService {
rpc Tun (stream Hunk) returns (stream Hunk);
}

View File

@@ -0,0 +1,131 @@
package v2raygrpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// GunServiceClient is the client API for GunService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type GunServiceClient interface {
Tun(ctx context.Context, opts ...grpc.CallOption) (GunService_TunClient, error)
}
type gunServiceClient struct {
cc grpc.ClientConnInterface
}
func NewGunServiceClient(cc grpc.ClientConnInterface) GunServiceClient {
return &gunServiceClient{cc}
}
func (c *gunServiceClient) Tun(ctx context.Context, opts ...grpc.CallOption) (GunService_TunClient, error) {
stream, err := c.cc.NewStream(ctx, &GunService_ServiceDesc.Streams[0], "/transport.v2raygrpc.GunService/Tun", opts...)
if err != nil {
return nil, err
}
x := &gunServiceTunClient{stream}
return x, nil
}
type GunService_TunClient interface {
Send(*Hunk) error
Recv() (*Hunk, error)
grpc.ClientStream
}
type gunServiceTunClient struct {
grpc.ClientStream
}
func (x *gunServiceTunClient) Send(m *Hunk) error {
return x.ClientStream.SendMsg(m)
}
func (x *gunServiceTunClient) Recv() (*Hunk, error) {
m := new(Hunk)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// GunServiceServer is the server API for GunService service.
// All implementations must embed UnimplementedGunServiceServer
// for forward compatibility
type GunServiceServer interface {
Tun(GunService_TunServer) error
mustEmbedUnimplementedGunServiceServer()
}
// UnimplementedGunServiceServer must be embedded to have forward compatible implementations.
type UnimplementedGunServiceServer struct{}
func (UnimplementedGunServiceServer) Tun(GunService_TunServer) error {
return status.Errorf(codes.Unimplemented, "method Tun not implemented")
}
func (UnimplementedGunServiceServer) mustEmbedUnimplementedGunServiceServer() {}
// UnsafeGunServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to GunServiceServer will
// result in compilation errors.
type UnsafeGunServiceServer interface {
mustEmbedUnimplementedGunServiceServer()
}
func RegisterGunServiceServer(s grpc.ServiceRegistrar, srv GunServiceServer) {
s.RegisterService(&GunService_ServiceDesc, srv)
}
func _GunService_Tun_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(GunServiceServer).Tun(&gunServiceTunServer{stream})
}
type GunService_TunServer interface {
Send(*Hunk) error
Recv() (*Hunk, error)
grpc.ServerStream
}
type gunServiceTunServer struct {
grpc.ServerStream
}
func (x *gunServiceTunServer) Send(m *Hunk) error {
return x.ServerStream.SendMsg(m)
}
func (x *gunServiceTunServer) Recv() (*Hunk, error) {
m := new(Hunk)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// GunService_ServiceDesc is the grpc.ServiceDesc for GunService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var GunService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "transport.v2raygrpc.GunService",
HandlerType: (*GunServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Tun",
Handler: _GunService_Tun_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "transport/v2raygrpc/stream.proto",
}