1 | | - | package signal |
2 | | - | |
3 | | - | import ( |
4 | | - | "context" |
5 | | - | "sync" |
6 | | - | |
7 | | - | "github.com/DVKunion/SeaMoon/pkg/api/enum" |
8 | | - | "github.com/DVKunion/SeaMoon/pkg/api/service" |
9 | | - | "github.com/DVKunion/SeaMoon/pkg/listener" |
10 | | - | "github.com/DVKunion/SeaMoon/pkg/system/xlog" |
11 | | - | ) |
12 | | - | |
13 | | - | func (sb *Bus) proxyHandler(ctx context.Context, pys *proxySignal) { |
14 | | - | // proxy sync change task |
15 | | - | // 如果是需要同步的,记得释放锁 |
16 | | - | defer func() { |
17 | | - | if pys.wg != nil { |
18 | | - | pys.wg.Done() |
19 | | - | } |
20 | | - | }() |
21 | | - | proxy, err := service.SVC.GetProxyById(ctx, pys.id) |
22 | | - | if err != nil { |
23 | | - | xlog.Error(xlog.SignalGetObjError, "obj", "proxy", "err", err) |
24 | | - | service.SVC.UpdateProxyStatus(ctx, pys.id, enum.ProxyStatusError, err.Error()) |
25 | | - | return |
26 | | - | } |
27 | | - | // 缓冲逻辑:状态没改变时候,不需要处理 |
28 | | - | if proxy.Status == &pys.next { |
29 | | - | xlog.Warn(xlog.SignalMissOperationWarn, "id", pys.id, "type", "proxy", "status", pys.next) |
30 | | - | return |
31 | | - | } |
32 | | - | service.SVC.UpdateProxyStatus(ctx, pys.id, pys.next, "") |
33 | | - | switch pys.next { |
34 | | - | case enum.ProxyStatusActive, enum.ProxyStatusRecover: |
35 | | - | sigCtx, cancel := context.WithCancel(ctx) |
36 | | - | if server, err := listener.TCPListen(sigCtx, proxy); err != nil { |
37 | | - | xlog.Error(xlog.SignalListenerError, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr(), "err", err) |
38 | | - | service.SVC.UpdateProxyStatus(ctx, pys.id, enum.ProxyStatusError, err.Error()) |
39 | | - | cancel() |
40 | | - | return |
41 | | - | } else { |
42 | | - | sb.canceler[pys.id] = cancel |
43 | | - | sb.listener[pys.id] = server |
44 | | - | } |
45 | | - | xlog.Info(xlog.SignalStartProxy, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr()) |
46 | | - | service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusActive, "") |
47 | | - | case enum.ProxyStatusInactive: |
48 | | - | if cancel, ok := sb.canceler[pys.id]; ok { |
49 | | - | // 先调一下 cancel |
50 | | - | cancel() |
51 | | - | if ln, exist := sb.listener[pys.id]; exist { |
52 | | - | // 尝试着去停一下 ln, 防止泄漏 |
53 | | - | err := ln.Close() |
54 | | - | if err != nil { |
55 | | - | // 错了就错了吧,说明 ctx 挂了一般 goroutines 也跟着挂了 |
56 | | - | xlog.Error(xlog.SignalListenerError, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr(), "err", err) |
57 | | - | } |
58 | | - | } |
59 | | - | } |
60 | | - | xlog.Info(xlog.SignalStopProxy, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr()) |
61 | | - | case enum.ProxyStatusDelete: |
62 | | - | // 先同步停止服务 |
63 | | - | wg := &sync.WaitGroup{} |
64 | | - | wg.Add(1) |
65 | | - | sb.SendProxySignal(pys.id, enum.ProxyStatusInactive, wg) |
66 | | - | wg.Wait() |
67 | | - | // 最后删除数据 |
68 | | - | if err = service.SVC.SpeedProxy(ctx, proxy); err != nil { |
69 | | - | xlog.Error(xlog.SignalSpeedProxyError, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr(), "err", err) |
70 | | - | service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusError, err.Error()) |
71 | | - | return |
72 | | - | } |
73 | | - | xlog.Info(xlog.SignalDeleteProxy, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr()) |
74 | | - | case enum.ProxyStatusSpeeding: |
75 | | - | if err = service.SVC.SpeedProxy(ctx, proxy); err != nil { |
76 | | - | xlog.Error(xlog.SignalSpeedProxyError, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr(), "err", err) |
77 | | - | service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusError, err.Error()) |
78 | | - | return |
79 | | - | } |
80 | | - | xlog.Info(xlog.SignalSpeedProxy, "id", pys.id, "type", proxy.Type, "addr", proxy.Addr()) |
81 | | - | service.SVC.UpdateProxyStatus(ctx, proxy.ID, enum.ProxyStatusActive, "") |
82 | | - | } |
83 | | - | } |
84 | | - | |
85 | | - | func (sb *Bus) providerHandler(ctx context.Context, prs *providerSignal) { |
86 | | - | // proxy sync change task |
87 | | - | // 如果是需要同步的,记得释放锁 |
88 | | - | defer func() { |
89 | | - | if prs.wg != nil { |
90 | | - | prs.wg.Done() |
91 | | - | } |
92 | | - | }() |
93 | | - | provider, err := service.SVC.GetProviderById(ctx, prs.id) |
94 | | - | if err != nil { |
95 | | - | xlog.Error(xlog.SignalGetObjError, "obj", "provider", "err", err) |
96 | | - | service.SVC.UpdateProviderStatus(ctx, prs.id, enum.ProvStatusFailed, err.Error()) |
97 | | - | return |
98 | | - | } |
99 | | - | // 缓冲逻辑:状态没改变时候,不需要处理 |
100 | | - | if provider.Status == &prs.next { |
101 | | - | xlog.Warn(xlog.SignalMissOperationWarn, "id", prs.id, "type", "provider", "status", prs.next) |
102 | | - | return |
103 | | - | } |
104 | | - | service.SVC.UpdateProviderStatus(ctx, provider.ID, prs.next, "") |
105 | | - | switch prs.next { |
106 | | - | case enum.ProvStatusSync: |
107 | | - | if err = service.SVC.SyncProvider(ctx, provider); err != nil { |
108 | | - | xlog.Error(xlog.SignalSyncProviderError, "obj", "provider", "err", err) |
109 | | - | service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSyncError, err.Error()) |
110 | | - | return |
111 | | - | } |
112 | | - | xlog.Info(xlog.SignalSyncProvider, "id", provider.ID, "type", provider.Type) |
113 | | - | service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSuccess, "") |
114 | | - | case enum.ProvStatusDelete: |
115 | | - | wg := &sync.WaitGroup{} |
116 | | - | for _, tun := range provider.Tunnels { |
117 | | - | wg.Add(1) |
118 | | - | sb.SendTunnelSignal(tun.ID, enum.TunnelDelete, wg) |
119 | | - | } |
120 | | - | wg.Wait() |
121 | | - | // 然后删除数据 |
122 | | - | if err = service.SVC.DeleteProvider(ctx, provider.ID); err != nil { |
123 | | - | xlog.Error(xlog.SignalSyncProviderError, "obj", "provider", "err", err) |
124 | | - | service.SVC.UpdateProviderStatus(ctx, provider.ID, enum.ProvStatusSyncError, err.Error()) |
125 | | - | return |
126 | | - | } |
127 | | - | xlog.Info(xlog.SignalDeleteProvider, "id", provider.ID, "type", provider.Type) |
128 | | - | } |
129 | | - | } |
130 | | - | |
131 | | - | func (sb *Bus) tunnelHandler(ctx context.Context, ts *tunnelSignal) { |
132 | | - | // proxy sync change task |
133 | | - | // 如果是需要同步的,记得释放锁 |
134 | | - | defer func() { |
135 | | - | if ts.wg != nil { |
136 | | - | ts.wg.Done() |
137 | | - | } |
138 | | - | }() |
139 | | - | tun, err := service.SVC.GetTunnelById(ctx, ts.id) |
140 | | - | if err != nil { |
141 | | - | xlog.Error(xlog.SignalGetObjError, "obj", "tunnel", "err", err) |
142 | | - | service.SVC.UpdateTunnelStatus(ctx, ts.id, enum.TunnelError, err.Error()) |
143 | | - | return |
144 | | - | } |
145 | | - | // 缓冲逻辑:状态没改变时候,不需要处理 |
146 | | - | if tun.Status == &ts.next { |
147 | | - | xlog.Warn(xlog.SignalMissOperationWarn, "id", ts.id, "type", "tunnel", "status", ts.next) |
148 | | - | return |
149 | | - | } |
150 | | - | service.SVC.UpdateTunnelStatus(ctx, tun.ID, ts.next, "") |
151 | | - | switch ts.next { |
152 | | - | case enum.TunnelActive: |
153 | | - | if addr, err := service.SVC.DeployTunnel(ctx, tun); err != nil { |
154 | | - | xlog.Error(xlog.SignalDeployTunError, "obj", "tunnel", "err", err) |
155 | | - | service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error()) |
156 | | - | return |
157 | | - | } else { |
158 | | - | service.SVC.UpdateTunnelAddr(ctx, tun.ID, addr) |
159 | | - | } |
160 | | - | xlog.Info(xlog.SignalDeployTunnel, "id", tun.ID, "type", tun.Type) |
161 | | - | case enum.TunnelInactive: |
162 | | - | if err := service.SVC.StopTunnel(ctx, tun); err != nil { |
163 | | - | xlog.Error(xlog.SignalStopTunError, "obj", "tunnel", "err", err) |
164 | | - | service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error()) |
165 | | - | return |
166 | | - | } |
167 | | - | xlog.Info(xlog.SignalStopTunnel, "id", tun.ID, "type", tun.Type) |
168 | | - | case enum.TunnelDelete: |
169 | | - | // 先停掉本地的服务 |
170 | | - | wg := &sync.WaitGroup{} |
171 | | - | for _, py := range tun.Proxies { |
172 | | - | wg.Add(1) |
173 | | - | sb.SendProxySignal(py.ID, enum.ProxyStatusDelete, wg) |
174 | | - | } |
175 | | - | wg.Wait() |
176 | | - | wg = &sync.WaitGroup{} |
177 | | - | // 再停掉远端的服务 |
178 | | - | wg.Add(1) |
179 | | - | sb.SendTunnelSignal(tun.ID, enum.TunnelInactive, wg) |
180 | | - | wg.Wait() |
181 | | - | // 最后删除服务即可 |
182 | | - | if err := service.SVC.DeleteTunnel(ctx, tun.ID); err != nil { |
183 | | - | xlog.Error(xlog.SignalDeleteTunError, "obj", "tunnel", "err", err) |
184 | | - | service.SVC.UpdateTunnelStatus(ctx, tun.ID, enum.TunnelError, err.Error()) |
185 | | - | return |
186 | | - | } |
187 | | - | xlog.Info(xlog.SignalDeleteTunnel, "id", tun.ID, "type", tun.Type) |
188 | | - | } |
189 | | - | } |
190 | | - | |