Projects STRLCPY SeaMoon Commits 29562547
🤬
  • ■ ■ ■ ■ ■ ■
    pkg/api/controller/v1/tunnel.go
    skipped 3 lines
    4 4   "context"
    5 5   "net/http"
    6 6   "reflect"
     7 + "sync"
    7 8   
    8 9   "github.com/gin-gonic/gin"
    9 10   
    10 11   "github.com/DVKunion/SeaMoon/pkg/api/controller/servant"
     12 + "github.com/DVKunion/SeaMoon/pkg/api/enum"
    11 13   "github.com/DVKunion/SeaMoon/pkg/api/models"
    12 14   "github.com/DVKunion/SeaMoon/pkg/api/service"
     15 + "github.com/DVKunion/SeaMoon/pkg/signal"
    13 16   "github.com/DVKunion/SeaMoon/pkg/system/errors"
    14 17   "github.com/DVKunion/SeaMoon/pkg/system/xlog"
    15 18  )
    skipped 74 lines
    90 93   return
    91 94   }
    92 95   
    93  - if err = service.SVC.DeleteTunnel(c, uint(id)); err != nil {
    94  - servant.ErrorMsg(c, http.StatusInternalServerError, errors.ApiError(xlog.ApiServiceError, err))
    95  - } else {
    96  - servant.SuccessMsg(c, 1, nil)
    97  - }
     96 + wg := &sync.WaitGroup{}
     97 + wg.Add(1)
     98 + signal.Signal().SendTunnelSignal(uint(id), enum.TunnelDelete, wg)
     99 + wg.Wait()
     100 + 
     101 + servant.SuccessMsg(c, 1, nil)
    98 102  }
    99 103   
    100 104  func extra() func(api interface{}) {
    skipped 15 lines
  • ■ ■ ■ ■ ■
    pkg/api/enum/provider.go
    skipped 12 lines
    13 13  type ProviderStatus int8
    14 14   
    15 15  const (
    16  - ProvStatusCrete ProviderStatus = iota + 1
     16 + ProvStatusCreate ProviderStatus = iota + 1
    17 17   ProvStatusSuccess
    18 18   ProvStatusFailed
    19 19   ProvStatusSync
    20 20   ProvStatusForbidden
    21 21   ProvStatusSyncError
    22  - ProvStatusArrearsError
    23 22   ProvStatusDelete
    24 23  )
    25 24   
  • ■ ■ ■ ■ ■ ■
    pkg/api/service/proxy.go
    skipped 7 lines
    8 8   "github.com/DVKunion/SeaMoon/pkg/api/database/dao"
    9 9   "github.com/DVKunion/SeaMoon/pkg/api/enum"
    10 10   "github.com/DVKunion/SeaMoon/pkg/api/models"
    11  - "github.com/DVKunion/SeaMoon/pkg/system/errors"
    12 11   "github.com/DVKunion/SeaMoon/pkg/system/xlog"
    13 12  )
    14 13   
    skipped 46 lines
    61 60   if _, err := query.WithContext(ctx).Where(query.ID.Eq(id)).UpdateSimple(query.Conn.Add(1)); err != nil {
    62 61   xlog.Error(xlog.ServiceDBUpdateFiledError, "type", "proxy_conn", "err", err)
    63 62   }
    64  - case 2:
     63 + case -1:
    65 64   if _, err := query.WithContext(ctx).Where(query.ID.Eq(id)).UpdateSimple(query.Conn.Sub(1)); err != nil {
    66 65   xlog.Error(xlog.ServiceDBUpdateFiledError, "type", "proxy_conn", "err", err)
    67 66   }
    skipped 11 lines
    79 78   }
    80 79  }
    81 80   
     81 +func (p *proxy) UpdateProxyNetworkLag(ctx context.Context, id uint, lag int64) {
     82 + query := dao.Q.Proxy
     83 + 
     84 + if _, err := query.WithContext(ctx).Where(query.ID.Eq(id)).Update(
     85 + query.Lag, lag,
     86 + ); err != nil {
     87 + xlog.Error(xlog.ServiceDBUpdateFiledError, "type", "proxy_lag", "err", err)
     88 + }
     89 +}
     90 + 
    82 91  func (p *proxy) UpdateProxyStatus(ctx context.Context, id uint, status enum.ProxyStatus, msg string) {
    83 92   query := dao.Q.Proxy
    84 93   
    skipped 6 lines
    91 100  }
    92 101   
    93 102  func (p *proxy) DeleteProxy(ctx context.Context, id uint) error {
    94  - target, err := p.GetProxyById(ctx, id)
    95  - if err != nil {
    96  - return err
    97  - }
    98  - if *target.Status == enum.ProxyStatusActive {
    99  - return errors.New(xlog.ServiceDBDeleteError)
    100  - }
    101 103   query := dao.Q.Proxy
    102 104   res, err := query.WithContext(ctx).Where(query.ID.Eq(id)).Delete()
    103 105   if err != nil || res.Error != nil {
    skipped 47 lines
  • ■ ■ ■ ■ ■ ■
    pkg/listener/tcp.go
    skipped 10 lines
    11 11   "github.com/DVKunion/SeaMoon/pkg/service"
    12 12   "github.com/DVKunion/SeaMoon/pkg/system/errors"
    13 13   "github.com/DVKunion/SeaMoon/pkg/system/xlog"
    14  - "github.com/DVKunion/SeaMoon/pkg/tools"
    15 14  )
    16 15   
    17 16  func TCPListen(ctx context.Context, py *models.Proxy) (net.Listener, error) {
    skipped 34 lines
    52 51   continue
    53 52   }
    54 53   go func() {
    55  - if _, err = db_service.SVC.UpdateProxy(ctx, id, &models.Proxy{
    56  - Lag: tools.Int64Ptr(destConn.Delay()),
    57  - }); err != nil {
    58  - xlog.Error(xlog.ListenerLagError, "id", id, "err", err)
    59  - }
    60  - }()
    61  - go func() {
    62 54   in, out, err := network.Transport(conn, destConn)
    63 55   if err != nil {
    64 56   xlog.Error(xlog.NetworkTransportError, "err", err)
    65 57   }
    66 58   db_service.SVC.UpdateProxyConn(ctx, id, -1)
    67 59   db_service.SVC.UpdateProxyNetworkInfo(ctx, id, in, out)
     60 + }()
     61 + go func() {
     62 + db_service.SVC.UpdateProxyNetworkLag(ctx, id, destConn.Delay())
    68 63   }()
    69 64   }
    70 65   }
    skipped 2 lines
  • ■ ■ ■ ■ ■ ■
    pkg/network/transport.go
    skipped 5 lines
    6 6   "sync"
    7 7   
    8 8   "github.com/gorilla/websocket"
    9  - 
    10  - "github.com/DVKunion/SeaMoon/pkg/system/errors"
    11 9  )
    12 10   
    13 11  const bufferSize = 64 * 1024
    skipped 43 lines
    57 55   case e := <-done:
    58 56   wg.Wait()
    59 57   // 忽略 websocket 正常断开
    60  - if errors.As(e, net.OpError{}) &&
    61  - errors.As(e.(*net.OpError).Err, websocket.CloseError{}) &&
    62  - e.(*net.OpError).Err.(*websocket.CloseError).Code == websocket.CloseNormalClosure {
    63  - e = nil
     58 + if opErr, ok := e.(*net.OpError); ok {
     59 + if closeErr, ok := opErr.Err.(*websocket.CloseError); ok {
     60 + if closeErr.Code == websocket.CloseNormalClosure {
     61 + e = nil
     62 + }
     63 + }
    64 64   }
    65 65   return inbound, outbound, e
    66 66   }
    skipped 10 lines
  • ■ ■ ■ ■ ■ ■
    pkg/signal/handler.go
    1  -package signal
    2  - 
    3  -import (
    4  - "context"
    5  - "sync"
    6  - 
    7  - "github.com/DVKunion/SeaMoon/pkg/api/enum"
    8  - "github.com/DVKunion/SeaMoon/pkg/api/service"
    9  - "github.com/DVKunion/SeaMoon/pkg/listener"
    10  - "github.com/DVKunion/SeaMoon/pkg/system/xlog"
    11  -)
    12  - 
    13  -func (sb *Bus) proxyHandler(ctx context.Context, pys *proxySignal) {
    14  - // proxy sync change task
    15  - // 如果是需要同步的,记得释放锁
    16  - defer func() {
    17  - if pys.wg != nil {
    18  - pys.wg.Done()
    19  - }
    20  - }()
    21  - proxy, err := service.SVC.GetProxyById(ctx, pys.id)
    22  - if err != nil {
    23  - xlog.Error(xlog.SignalGetObjError, "obj", "proxy", "err", err)
    24  - service.SVC.UpdateProxyStatus(ctx, pys.id, enum.ProxyStatusError, err.Error())
    25  - return
    26  - }
    27  - // 缓冲逻辑:状态没改变时候,不需要处理
    28  - if proxy.Status == &pys.next {
    29  - xlog.Warn(xlog.SignalMissOperationWarn, "id", pys.id, "type", "proxy", "status", pys.next)
    30  - return
    31  - }
    32  - service.SVC.UpdateProxyStatus(ctx, pys.id, pys.next, "")
    33  - switch pys.next {
    34  - case enum.ProxyStatusActive, enum.ProxyStatusRecover:
    35  - sigCtx, cancel := context.WithCancel(ctx)
    36  - if server, err := listener.TCPListen(sigCtx, proxy); err != nil {
    37  - xlog.Error(xlog.SignalListenerError, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr(), "err", err)
    38  - service.SVC.UpdateProxyStatus(ctx, pys.id, enum.ProxyStatusError, err.Error())
    39  - cancel()
    40  - return
    41  - } else {
    42  - sb.canceler[pys.id] = cancel
    43  - sb.listener[pys.id] = server
    44  - }
    45  - xlog.Info(xlog.SignalStartProxy, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr())
    46  - service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusActive, "")
    47  - case enum.ProxyStatusInactive:
    48  - if cancel, ok := sb.canceler[pys.id]; ok {
    49  - // 先调一下 cancel
    50  - cancel()
    51  - if ln, exist := sb.listener[pys.id]; exist {
    52  - // 尝试着去停一下 ln, 防止泄漏
    53  - err := ln.Close()
    54  - if err != nil {
    55  - // 错了就错了吧,说明 ctx 挂了一般 goroutines 也跟着挂了
    56  - xlog.Error(xlog.SignalListenerError, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr(), "err", err)
    57  - }
    58  - }
    59  - }
    60  - xlog.Info(xlog.SignalStopProxy, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr())
    61  - case enum.ProxyStatusDelete:
    62  - // 先同步停止服务
    63  - wg := &sync.WaitGroup{}
    64  - wg.Add(1)
    65  - sb.SendProxySignal(pys.id, enum.ProxyStatusInactive, wg)
    66  - wg.Wait()
    67  - // 最后删除数据
    68  - if err = service.SVC.SpeedProxy(ctx, proxy); err != nil {
    69  - xlog.Error(xlog.SignalSpeedProxyError, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr(), "err", err)
    70  - service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusError, err.Error())
    71  - return
    72  - }
    73  - xlog.Info(xlog.SignalDeleteProxy, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr())
    74  - case enum.ProxyStatusSpeeding:
    75  - if err = service.SVC.SpeedProxy(ctx, proxy); err != nil {
    76  - xlog.Error(xlog.SignalSpeedProxyError, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr(), "err", err)
    77  - service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusError, err.Error())
    78  - return
    79  - }
    80  - xlog.Info(xlog.SignalSpeedProxy, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr())
    81  - service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusActive, "")
    82  - }
    83  -}
    84  - 
    85  -func (sb *Bus) providerHandler(ctx context.Context, prs *providerSignal) {
    86  - // proxy sync change task
    87  - // 如果是需要同步的,记得释放锁
    88  - defer func() {
    89  - if prs.wg != nil {
    90  - prs.wg.Done()
    91  - }
    92  - }()
    93  - provider, err := service.SVC.GetProviderById(ctx, prs.id)
    94  - if err != nil {
    95  - xlog.Error(xlog.SignalGetObjError, "obj", "provider", "err", err)
    96  - service.SVC.UpdateProviderStatus(ctx, prs.id, enum.ProvStatusFailed, err.Error())
    97  - return
    98  - }
    99  - // 缓冲逻辑:状态没改变时候,不需要处理
    100  - if provider.Status == &prs.next {
    101  - xlog.Warn(xlog.SignalMissOperationWarn, "id", prs.id, "type", "provider", "status", prs.next)
    102  - return
    103  - }
    104  - service.SVC.UpdateProviderStatus(ctx, provider.ID, prs.next, "")
    105  - switch prs.next {
    106  - case enum.ProvStatusSync:
    107  - if err = service.SVC.SyncProvider(ctx, provider); err != nil {
    108  - xlog.Error(xlog.SignalSyncProviderError, "obj", "provider", "err", err)
    109  - service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSyncError, err.Error())
    110  - return
    111  - }
    112  - xlog.Info(xlog.SignalSyncProvider, "id", provider.ID, "type", provider.Type)
    113  - service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSuccess, "")
    114  - case enum.ProvStatusDelete:
    115  - wg := &sync.WaitGroup{}
    116  - for _, tun := range provider.Tunnels {
    117  - wg.Add(1)
    118  - sb.SendTunnelSignal(tun.ID, enum.TunnelDelete, wg)
    119  - }
    120  - wg.Wait()
    121  - // 然后删除数据
    122  - if err = service.SVC.DeleteProvider(ctx, provider.ID); err != nil {
    123  - xlog.Error(xlog.SignalSyncProviderError, "obj", "provider", "err", err)
    124  - service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSyncError, err.Error())
    125  - return
    126  - }
    127  - xlog.Info(xlog.SignalDeleteProvider, "id", provider.ID, "type", provider.Type)
    128  - }
    129  -}
    130  - 
    131  -func (sb *Bus) tunnelHandler(ctx context.Context, ts *tunnelSignal) {
    132  - // proxy sync change task
    133  - // 如果是需要同步的,记得释放锁
    134  - defer func() {
    135  - if ts.wg != nil {
    136  - ts.wg.Done()
    137  - }
    138  - }()
    139  - tun, err := service.SVC.GetTunnelById(ctx, ts.id)
    140  - if err != nil {
    141  - xlog.Error(xlog.SignalGetObjError, "obj", "tunnel", "err", err)
    142  - service.SVC.UpdateTunnelStatus(ctx, ts.id, enum.TunnelError, err.Error())
    143  - return
    144  - }
    145  - // 缓冲逻辑:状态没改变时候,不需要处理
    146  - if tun.Status == &ts.next {
    147  - xlog.Warn(xlog.SignalMissOperationWarn, "id", ts.id, "type", "tunnel", "status", ts.next)
    148  - return
    149  - }
    150  - service.SVC.UpdateTunnelStatus(ctx, tun.ID, ts.next, "")
    151  - switch ts.next {
    152  - case enum.TunnelActive:
    153  - if addr, err := service.SVC.DeployTunnel(ctx, tun); err != nil {
    154  - xlog.Error(xlog.SignalDeployTunError, "obj", "tunnel", "err", err)
    155  - service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error())
    156  - return
    157  - } else {
    158  - service.SVC.UpdateTunnelAddr(ctx, tun.ID, addr)
    159  - }
    160  - xlog.Info(xlog.SignalDeployTunnel, "id", tun.ID, "type", tun.Type)
    161  - case enum.TunnelInactive:
    162  - if err := service.SVC.StopTunnel(ctx, tun); err != nil {
    163  - xlog.Error(xlog.SignalStopTunError, "obj", "tunnel", "err", err)
    164  - service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error())
    165  - return
    166  - }
    167  - xlog.Info(xlog.SignalStopTunnel, "id", tun.ID, "type", tun.Type)
    168  - case enum.TunnelDelete:
    169  - // 先停掉本地的服务
    170  - wg := &sync.WaitGroup{}
    171  - for _, py := range tun.Proxies {
    172  - wg.Add(1)
    173  - sb.SendProxySignal(py.ID, enum.ProxyStatusDelete, wg)
    174  - }
    175  - wg.Wait()
    176  - wg = &sync.WaitGroup{}
    177  - // 再停掉远端的服务
    178  - wg.Add(1)
    179  - sb.SendTunnelSignal(tun.ID, enum.TunnelInactive, wg)
    180  - wg.Wait()
    181  - // 最后删除服务即可
    182  - if err := service.SVC.DeleteTunnel(ctx, tun.ID); err != nil {
    183  - xlog.Error(xlog.SignalDeleteTunError, "obj", "tunnel", "err", err)
    184  - service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error())
    185  - return
    186  - }
    187  - xlog.Info(xlog.SignalDeleteTunnel, "id", tun.ID, "type", tun.Type)
    188  - }
    189  -}
    190  - 
  • ■ ■ ■ ■ ■ ■
    pkg/signal/handler_provider.go
     1 +package signal
     2 + 
     3 +import (
     4 + "context"
     5 + 
     6 + "github.com/DVKunion/SeaMoon/pkg/api/enum"
     7 + "github.com/DVKunion/SeaMoon/pkg/api/service"
     8 + "github.com/DVKunion/SeaMoon/pkg/system/xlog"
     9 +)
     10 + 
     11 +func (sb *Bus) providerHandler(ctx context.Context, prs *providerSignal) {
     12 + // proxy sync change task
     13 + // 如果是需要同步的,记得释放锁
     14 + defer func() {
     15 + if prs.wg != nil {
     16 + prs.wg.Done()
     17 + }
     18 + }()
     19 + provider, err := service.SVC.GetProviderById(ctx, prs.id)
     20 + if err != nil {
     21 + xlog.Error(xlog.SignalGetObjError, "obj", "provider", "err", err)
     22 + service.SVC.UpdateProviderStatus(ctx, prs.id, enum.ProvStatusFailed, err.Error())
     23 + return
     24 + }
     25 + // 缓冲逻辑:状态没改变时候,不需要处理
     26 + if *provider.Status == prs.next {
     27 + xlog.Warn(xlog.SignalMissOperationWarn, "id", prs.id, "type", "provider", "status", prs.next)
     28 + return
     29 + }
     30 + service.SVC.UpdateProviderStatus(ctx, provider.ID, prs.next, "")
     31 + switch prs.next {
     32 + case enum.ProvStatusSync:
     33 + if err = service.SVC.SyncProvider(ctx, provider); err != nil {
     34 + xlog.Error(xlog.SignalSyncProviderError, "obj", "provider", "err", err)
     35 + service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSyncError, err.Error())
     36 + return
     37 + }
     38 + xlog.Info(xlog.SignalSyncProvider, "id", provider.ID, "type", *provider.Type)
     39 + service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSuccess, "")
     40 + case enum.ProvStatusDelete:
     41 + for _, tun := range provider.Tunnels {
     42 + sb.deleteTunnel(ctx, &tun)
     43 + }
     44 + // 然后删除数据
     45 + if err = service.SVC.DeleteProvider(ctx, provider.ID); err != nil {
     46 + xlog.Error(xlog.SignalSyncProviderError, "obj", "provider", "err", err)
     47 + service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSyncError, err.Error())
     48 + return
     49 + }
     50 + xlog.Info(xlog.SignalDeleteProvider, "id", provider.ID, "type", provider.Type)
     51 + }
     52 +}
     53 + 
  • ■ ■ ■ ■ ■ ■
    pkg/signal/handler_proxy.go
     1 +package signal
     2 + 
     3 +import (
     4 + "context"
     5 + 
     6 + "github.com/DVKunion/SeaMoon/pkg/api/enum"
     7 + "github.com/DVKunion/SeaMoon/pkg/api/models"
     8 + "github.com/DVKunion/SeaMoon/pkg/api/service"
     9 + "github.com/DVKunion/SeaMoon/pkg/listener"
     10 + "github.com/DVKunion/SeaMoon/pkg/system/xlog"
     11 +)
     12 + 
     13 +func (sb *Bus) proxyHandler(ctx context.Context, pys *proxySignal) {
     14 + // proxy sync change task
     15 + // 如果是需要同步的,记得释放锁
     16 + defer func() {
     17 + if pys.wg != nil {
     18 + pys.wg.Done()
     19 + }
     20 + }()
     21 + proxy, err := service.SVC.GetProxyById(ctx, pys.id)
     22 + if err != nil {
     23 + xlog.Error(xlog.SignalGetObjError, "obj", "proxy", "err", err)
     24 + service.SVC.UpdateProxyStatus(ctx, pys.id, enum.ProxyStatusError, err.Error())
     25 + return
     26 + }
     27 + // 缓冲逻辑:状态没改变时候,不需要处理
     28 + if *proxy.Status == pys.next {
     29 + xlog.Warn(xlog.SignalMissOperationWarn, "id", pys.id, "type", "proxy", "status", pys.next)
     30 + return
     31 + }
     32 + service.SVC.UpdateProxyStatus(ctx, pys.id, pys.next, "")
     33 + switch pys.next {
     34 + case enum.ProxyStatusActive, enum.ProxyStatusRecover:
     35 + sigCtx, cancel := context.WithCancel(ctx)
     36 + if server, err := listener.TCPListen(sigCtx, proxy); err != nil {
     37 + xlog.Error(xlog.SignalListenerError, "id", pys.id, "type", *proxy.Type, "addr", proxy.Addr(), "err", err)
     38 + service.SVC.UpdateProxyStatus(ctx, pys.id, enum.ProxyStatusError, err.Error())
     39 + cancel()
     40 + return
     41 + } else {
     42 + sb.canceler[pys.id] = cancel
     43 + sb.listener[pys.id] = server
     44 + }
     45 + xlog.Info(xlog.SignalStartProxy, "id", pys.id, "type", *proxy.Type, "addr", proxy.Addr())
     46 + service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusActive, "")
     47 + case enum.ProxyStatusInactive:
     48 + sb.stopProxy(proxy)
     49 + case enum.ProxyStatusDelete:
     50 + sb.deleteProxy(ctx, proxy)
     51 + case enum.ProxyStatusSpeeding:
     52 + if err = service.SVC.SpeedProxy(ctx, proxy); err != nil {
     53 + xlog.Error(xlog.SignalSpeedProxyError, "id", pys.id, "type", *proxy.Type, "addr", proxy.Addr(), "err", err)
     54 + service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusError, err.Error())
     55 + return
     56 + }
     57 + xlog.Info(xlog.SignalSpeedProxy, "id", pys.id, "type", *proxy.Type, "addr", proxy.Addr())
     58 + service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusActive, "")
     59 + }
     60 +}
     61 + 
     62 +func (sb *Bus) stopProxy(proxy *models.Proxy) {
     63 + if cancel, ok := sb.canceler[proxy.ID]; ok {
     64 + // 先调一下 cancel
     65 + cancel()
     66 + if ln, exist := sb.listener[proxy.ID]; exist {
     67 + // 尝试着去停一下 ln, 防止泄漏
     68 + err := ln.Close()
     69 + if err != nil {
     70 + // 错了就错了吧,说明 ctx 挂了一般 goroutines 也跟着挂了
     71 + xlog.Error(xlog.SignalListenerError, "id", proxy.ID, "type", *proxy.Type, "addr", proxy.Addr(), "err", err)
     72 + }
     73 + }
     74 + }
     75 + xlog.Info(xlog.SignalStopProxy, "id", proxy.ID, "type", *proxy.Type, "addr", proxy.Addr())
     76 +}
     77 + 
     78 +func (sb *Bus) deleteProxy(ctx context.Context, proxy *models.Proxy) {
     79 + sb.stopProxy(proxy)
     80 + // 最后删除数据
     81 + if err := service.SVC.DeleteProxy(ctx, proxy.ID); err != nil {
     82 + xlog.Error(xlog.ServiceDBDeleteError, "id", proxy.ID, "type", *proxy.Type, "addr", proxy.Addr(), "err", err)
     83 + service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusError, err.Error())
     84 + return
     85 + }
     86 + xlog.Info(xlog.SignalDeleteProxy, "id", proxy.ID, "type", *proxy.Type, "addr", proxy.Addr())
     87 + 
     88 +}
     89 + 
  • ■ ■ ■ ■ ■ ■
    pkg/signal/handler_tunnel.go
     1 +package signal
     2 + 
     3 +import (
     4 + "context"
     5 + 
     6 + "github.com/DVKunion/SeaMoon/pkg/api/enum"
     7 + "github.com/DVKunion/SeaMoon/pkg/api/models"
     8 + "github.com/DVKunion/SeaMoon/pkg/api/service"
     9 + "github.com/DVKunion/SeaMoon/pkg/system/xlog"
     10 +)
     11 + 
     12 +func (sb *Bus) tunnelHandler(ctx context.Context, ts *tunnelSignal) {
     13 + // proxy sync change task
     14 + // 如果是需要同步的,记得释放锁
     15 + defer func() {
     16 + if ts.wg != nil {
     17 + ts.wg.Done()
     18 + }
     19 + }()
     20 + tun, err := service.SVC.GetTunnelById(ctx, ts.id)
     21 + if err != nil {
     22 + xlog.Error(xlog.SignalGetObjError, "obj", "tunnel", "err", err)
     23 + service.SVC.UpdateTunnelStatus(ctx, ts.id, enum.TunnelError, err.Error())
     24 + return
     25 + }
     26 + // 缓冲逻辑:状态没改变时候,不需要处理
     27 + if *tun.Status == ts.next {
     28 + xlog.Warn(xlog.SignalMissOperationWarn, "id", ts.id, "type", "tunnel", "status", ts.next)
     29 + return
     30 + }
     31 + service.SVC.UpdateTunnelStatus(ctx, tun.ID, ts.next, "")
     32 + switch ts.next {
     33 + case enum.TunnelActive:
     34 + if addr, err := service.SVC.DeployTunnel(ctx, tun); err != nil {
     35 + xlog.Error(xlog.SignalDeployTunError, "obj", "tunnel", "err", err)
     36 + service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error())
     37 + return
     38 + } else {
     39 + service.SVC.UpdateTunnelAddr(ctx, tun.ID, addr)
     40 + }
     41 + xlog.Info(xlog.SignalDeployTunnel, "id", tun.ID, "type", tun.Type)
     42 + case enum.TunnelInactive:
     43 + _ = sb.stopTunnel(ctx, tun)
     44 + case enum.TunnelDelete:
     45 + sb.deleteTunnel(ctx, tun)
     46 + }
     47 +}
     48 + 
     49 +func (sb *Bus) stopTunnel(ctx context.Context, tun *models.Tunnel) error {
     50 + if err := service.SVC.StopTunnel(ctx, tun); err != nil {
     51 + xlog.Error(xlog.SignalStopTunError, "obj", "tunnel", "err", err)
     52 + service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error())
     53 + return err
     54 + }
     55 + xlog.Info(xlog.SignalStopTunnel, "id", tun.ID, "type", tun.Type)
     56 + return nil
     57 +}
     58 + 
     59 +func (sb *Bus) deleteTunnel(ctx context.Context, tun *models.Tunnel) {
     60 + // 先停掉本地的服务
     61 + for _, py := range tun.Proxies {
     62 + sb.deleteProxy(ctx, &py)
     63 + }
     64 + if err := sb.stopTunnel(ctx, tun); err != nil {
     65 + xlog.Error(xlog.SignalDeleteTunError, "obj", "tunnel", "err", err)
     66 + service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error())
     67 + return
     68 + }
     69 + // 最后删除服务即可
     70 + if err := service.SVC.DeleteTunnel(ctx, tun.ID); err != nil {
     71 + xlog.Error(xlog.SignalDeleteTunError, "obj", "tunnel", "err", err)
     72 + service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error())
     73 + return
     74 + }
     75 + xlog.Info(xlog.SignalDeleteTunnel, "id", tun.ID, "type", tun.Type)
     76 +}
     77 + 
  • ■ ■ ■ ■ ■
    pkg/system/xlog/consts.go
    skipped 102 lines
    103 103  const (
    104 104   ListenerAcceptError = "listener unexpect error"
    105 105   ListenerDailError = "listener dail to remote error"
    106  - ListenerLagError = "listener latency calculation error"
    107 106  )
    108 107   
    109 108  // SIGNAL 相关错误
    skipped 3 lines
    113 112   SignalListenerError = "signal listener unexpect error"
    114 113   SignalRecoverProxyError = "signal try recover active proxy error"
    115 114   SignalSpeedProxyError = "signal try test speed error"
    116  - SignalSyncProviderError = " signal try sync provider error"
     115 + SignalSyncProviderError = "signal try sync provider error"
    117 116   SignalDeployTunError = "signal try deploy tunnel error"
    118 117   SignalStopTunError = "signal try stop tunnel error"
    119 118   SignalDeleteTunError = "signal try delete tunnel error"
    skipped 2 lines
Please wait...
Page is in error, reload to recover