Projects STRLCPY cdebug Commits c2f50c5a
🤬
  • ■ ■ ■ ■ ■
    cmd/portforward/portforward.go
    skipped 3 lines
    4 4   "context"
    5 5   "errors"
    6 6   "fmt"
    7  - "os"
    8  - "os/signal"
     7 + "math/rand"
    9 8   "strings"
    10  - "syscall"
     9 + "sync"
     10 + "time"
    11 11   
    12 12   "github.com/docker/docker/api/types"
    13 13   "github.com/docker/docker/api/types/container"
    skipped 4 lines
    18 18   
    19 19   "github.com/iximiuz/cdebug/pkg/cliutil"
    20 20   "github.com/iximiuz/cdebug/pkg/docker"
    21  - "github.com/iximiuz/cdebug/pkg/jsonutil"
     21 + "github.com/iximiuz/cdebug/pkg/signalutil"
    22 22   "github.com/iximiuz/cdebug/pkg/uuid"
    23 23  )
    24 24   
    skipped 5 lines
    30 30  // - Remote port forwarding: implement me!
    31 31  //
    32 32  // Local port forwarding's possible modes (kinda sorta as in ssh -L):
    33  -// - REMOTE_PORT # binds REMOTE_IP:REMOTE_PORT to a random port on localhost
    34  -// - REMOTE_<IP|ALIAS|NET>:REMOTE_PORT # The second form is needed to:
    35  -// # 1) allow exposing target's localhost ports
    36  -// # 2) specify a concrete IP for a multi-network target
     33 +// - REMOTE_PORT # binds TARGET_IP:REMOTE_PORT to a random port on localhost
     34 +// - REMOTE_<IP|ALIAS|NET>:REMOTE_PORT # binds arbitrary REMOTE_ID:REMOTE_PORT to a random port on localhost
     35 +// # 1) allows exposing target's localhost ports
     36 +// # 2) allows specifyng a concrete IP for a multi-network target
     37 +// # 3) allows specifyng an arbitrary destination reachable from the target
    37 38  //
    38  -// - LOCAL_PORT:REMOTE_PORT # binds REMOTE_IP:REMOTE_PORT to LOCAL_PORT on localhost
     39 +// - LOCAL_PORT:REMOTE_PORT # much like the above form but uses a concrete port on the host system
    39 40  // - LOCAL_PORT:REMOTE_<IP|ALIAS|NET>:REMOTE_PORT
    40 41  //
    41  -// - LOCAL_IP:LOCAL_PORT:REMOTE_PORT # similar to LOCAL_PORT:REMOTE_PORT but LOCAL_IP is used instead of localhost
    42  -// - LOCAL_IP:LOCAL_PORT:REMOTE_<IP|ALIAS|NET>:REMOTE_PORT
     42 +// - LOCAL_HOST:LOCAL_PORT:REMOTE_PORT # similar to LOCAL_PORT:REMOTE_PORT but LOCAL_HOST is used instead of 127.0.0.1
     43 +// - LOCAL_HOST:LOCAL_PORT:REMOTE_<IP|ALIAS|NET>:REMOTE_PORT
    43 44  //
    44 45  // Remote port forwarding's possible modes (kinda sorta as in ssh -R):
    45 46  // - coming soon...
    skipped 3 lines
    49 50   
    50 51   outFormatText = "text"
    51 52   outFormatJSON = "json"
     53 + 
     54 + cleanupTimeout = 3 * time.Second
    52 55  )
    53 56   
    54 57  var (
    55 58   errNoAddr = errors.New("target container must have at least one IP address")
    56 59   errBadLocalPort = errors.New("bad local port")
     60 + errBadRemoteHost = errors.New("bad remote host")
    57 61   errBadRemotePort = errors.New("bad remote port")
    58 62  )
    59 63   
    60 64  type options struct {
    61  - target string
    62  - locals []string
    63  - remotes []string
    64  - output string
    65  - quiet bool
     65 + target string
     66 + locals []string
     67 + remotes []string
     68 + runningTimeout time.Duration
     69 + output string
     70 + quiet bool
    66 71  }
    67 72   
    68 73  func NewCommand(cli cliutil.CLI) *cobra.Command {
    skipped 30 lines
    99 104   "local",
    100 105   "L",
    101 106   nil,
    102  - `Local port forwarding in the form [[LOCAL_IP:]LOCAL_PORT:][REMOTE_IP|REMOTE_NETWORK|REMOTE_ALIAS:]REMOTE_PORT`,
     107 + `Local port forwarding in the form [[LOCAL_HOST:]LOCAL_PORT:][REMOTE_HOST:]REMOTE_PORT`,
    103 108   )
    104 109   
    105 110   flags.StringSliceVarP(
    skipped 1 lines
    107 112   "remote",
    108 113   "R",
    109 114   nil,
    110  - `Remote port forwarding in the form [REMOTE_IP|REMOTE_NETWORK|REMOTE_ALIAS:]REMOTE_PORT:LOCAL_IP:LOCAL_PORT`,
     115 + `Remote port forwarding in the form [REMOTE_HOST:]REMOTE_PORT:LOCAL_HOST:LOCAL_PORT`,
     116 + )
     117 + 
     118 + flags.DurationVar(
     119 + &opts.runningTimeout,
     120 + "running-timeout",
     121 + 10*time.Second,
     122 + `How long to wait until the target is up and running`,
    111 123   )
    112 124   
    113 125   flags.BoolVarP(
    skipped 4 lines
    118 130   `Suppress verbose output`,
    119 131   )
    120 132   
    121  - flags.StringVarP(
    122  - &opts.output,
    123  - "output",
    124  - "o",
    125  - outFormatText,
    126  - `Output format ("text" | "json")`,
    127  - )
    128  - 
    129 133   return cmd
    130 134  }
    131 135   
    skipped 3 lines
    135 139   return err
    136 140   }
    137 141   
    138  - target, err := client.ContainerInspect(ctx, opts.target)
    139  - if err != nil {
    140  - return err
    141  - }
    142  - if err := validateTarget(target); err != nil {
    143  - return err
    144  - }
    145  - 
     142 + // TODO: Pull only if not present locally.
    146 143   cli.PrintAux("Pulling forwarder image...\n")
    147 144   if err := client.ImagePullEx(ctx, forwarderImage, types.ImagePullOptions{}); err != nil {
    148 145   return fmt.Errorf("cannot pull forwarder image %q: %w", forwarderImage, err)
    149 146   }
    150 147   
    151  - locals, err := parseLocalForwardings(target, opts.locals)
    152  - if err != nil {
    153  - return err
    154  - }
     148 + ctx, cancel := context.WithCancel(signalutil.InterruptibleContext(ctx))
     149 + defer cancel()
    155 150   
    156  - // TODO: It's probably a good idea to monitor forwarders too.
    157  - for i, l := range locals {
    158  - forwarder, err := startLocalForwarder(ctx, client, target, l)
     151 + for {
     152 + cont, err := runLocalPortForwarding(ctx, cli, client, opts)
    159 153   if err != nil {
    160 154   return err
    161 155   }
    162  - defer func() {
    163  - if err := client.ContainerKill(ctx, forwarder.ID, "KILL"); err != nil {
    164  - logrus.Debugf("Cannot kill forwarder container: %s", err)
    165  - }
    166  - }()
     156 + if !cont || ctx.Err() != nil {
     157 + cli.PrintAux("Forwarding's done. Exiting...\n")
     158 + return nil
     159 + }
     160 + 
     161 + cli.PrintAux("Giving target %s to get up and running again...\n", opts.runningTimeout)
     162 + }
     163 +}
     164 + 
     165 +func runLocalPortForwarding(
     166 + ctx context.Context,
     167 + cli cliutil.CLI,
     168 + client dockerclient.CommonAPIClient,
     169 + opts *options,
     170 +) (bool, error) {
     171 + target, err := getRunningTarget(ctx, client, opts.target, opts.runningTimeout)
     172 + if err != nil {
     173 + return false, err
     174 + }
    167 175   
    168  - if len(l.localPort) == 0 {
    169  - for _, bindings := range forwarder.NetworkSettings.Ports {
    170  - locals[i].localPort = bindings[0].HostPort
    171  - break
    172  - }
    173  - }
     176 + if err := validateTarget(target); err != nil {
     177 + return false, err
    174 178   }
    175 179   
    176  - switch opts.output {
    177  - case outFormatJSON:
    178  - cli.PrintOut(localForwardingsToJSON(locals) + "\n")
    179  - case outFormatText:
    180  - cli.PrintOut(localForwardingsToText(locals) + "\n")
    181  - default:
    182  - panic("unreachable!")
     180 + locals, err := parseLocalForwardings(target, opts.locals)
     181 + if err != nil {
     182 + return false, err
    183 183   }
    184 184   
    185  - signalCh := make(chan os.Signal, 128)
    186  - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
    187  - defer close(signalCh)
     185 + // Start a new context bound to a single target lifecycle.
     186 + // It'll be used mostly to terminate the forwarders if a
     187 + // given instance of the target terminates.
     188 + ctx, cancel := context.WithCancel(ctx)
     189 + defer cancel()
     190 + 
     191 + fwdersErrorCh := startLocalForwarders(ctx, cli, client, target, locals)
     192 + 
     193 + targetStatusCh, targetErrorCh := client.ContainerWait(
     194 + ctx,
     195 + target.ID,
     196 + container.WaitConditionNotRunning,
     197 + )
    188 198   
    189  - targetStatusCh, targetErrCh := client.ContainerWait(ctx, target.ID, container.WaitConditionNotRunning)
    190 199   select {
    191  - case <-signalCh:
    192  - cli.PrintAux("Exiting...")
     200 + case err := <-fwdersErrorCh:
     201 + // Couldn't start or keep one or more forwarders running.
     202 + // All forwarders must be down (best effort) at this time.
     203 + return false, err
     204 + 
     205 + case <-targetStatusCh:
     206 + // Target exited/restarting.
     207 + cli.PrintAux("Target exited\n")
    193 208   
    194  - case err := <-targetErrCh:
    195  - if err != nil {
    196  - return fmt.Errorf("waiting for target container failed: %w", err)
     209 + case err := <-targetErrorCh:
     210 + // No idea what happened to the target, but better restart the forwarders
     211 + // (or exit while trying because the target is already gone).
     212 + if ctx.Err() == nil { // Ignoring 'context canceled' errors...
     213 + logrus.Debugf("Target error: %s", err)
    197 214   }
     215 + }
    198 216   
    199  - case <-targetStatusCh:
     217 + cli.PrintAux("Stopping the forwarders...\n")
     218 + cancel() // Tell the forwarders it's time to stop.
     219 + if err := <-fwdersErrorCh; err != nil {
     220 + logrus.Debugf("Error stopping forwarder(s): %s", err)
     221 + }
     222 + 
     223 + if opts.runningTimeout == 0 {
     224 + return false, nil
    200 225   }
    201 226   
    202  - return nil
     227 + return true, nil
    203 228  }
    204 229   
    205  -func validateTarget(target types.ContainerJSON) error {
    206  - if target.State == nil || !target.State.Running {
    207  - return errors.New("target container found but it's not running")
     230 +func getRunningTarget(
     231 + ctx context.Context,
     232 + client dockerclient.CommonAPIClient,
     233 + target string,
     234 + runningTimeout time.Duration,
     235 +) (types.ContainerJSON, error) {
     236 + ctx, cancel := context.WithTimeout(ctx, runningTimeout)
     237 + defer cancel()
     238 + 
     239 + for {
     240 + cont, err := client.ContainerInspect(ctx, target)
     241 + if err != nil {
     242 + return cont, err
     243 + }
     244 + if cont.State != nil && cont.State.Running {
     245 + return cont, nil
     246 + }
     247 + 
     248 + select {
     249 + case <-ctx.Done():
     250 + return cont, fmt.Errorf("target is not running after %s", runningTimeout)
     251 + case <-time.After(100 * time.Millisecond):
     252 + }
    208 253   }
     254 +}
    209 255   
     256 +func validateTarget(target types.ContainerJSON) error {
    210 257   hasIP := false
    211 258   for _, net := range target.NetworkSettings.Networks {
    212 259   hasIP = hasIP || len(net.IPAddress) > 0
    skipped 6 lines
    219 266  }
    220 267   
    221 268  type forwarding struct {
    222  - localIP string
     269 + localHost string
    223 270   localPort string
    224  - remoteIP string
     271 + remoteHost string
    225 272   remotePort string
    226 273  }
    227 274   
    228  -func (f forwarding) toDockerPortSpec() string {
    229  - // ip:hostPort:containerPort | ip::containerPort | hostPort:containerPort | containerPort
    230  - return f.localIP + ":" + f.localPort + ":" + f.remotePort
     275 +type directForwarding struct {
     276 + forwarding
     277 + targetNetwork string
     278 +}
     279 + 
     280 +type sidecarForwarding struct {
     281 + forwarding
     282 + targetID string // for netns
     283 + targetNetwork string
     284 + targetHost string
     285 + sidecarPort string
    231 286  }
    232 287   
    233 288  func parseLocalForwardings(
    skipped 22 lines
    256 311   return forwarding{}, errBadRemotePort
    257 312   }
    258 313   
    259  - remoteIP, err := unambiguousIP(target)
    260  - if err != nil {
     314 + if _, err := unambiguousIP(target); err != nil {
    261 315   return forwarding{}, err
    262 316   }
    263 317   
    264  - // localPort will be later assigned by Docker dynamically
    265 318   return forwarding{
    266  - localIP: "127.0.0.1",
    267  - remoteIP: remoteIP,
    268 319   remotePort: parts[0],
    269 320   }, nil
    270 321   }
    271 322   
    272  - if len(parts) == 2 {
    273  - // Either LOCAL_PORT:REMOTE_PORT or REMOTE_<IP|ALIAS|NETWORK>:REMOTE_PORT
    274  - 
     323 + if len(parts) == 2 { // Either LOCAL_PORT:REMOTE_PORT or REMOTE_HOST:REMOTE_PORT
    275 324   if _, err := nat.ParsePort(parts[1]); err != nil {
    276 325   return forwarding{}, errBadRemotePort
    277 326   }
    278 327   
    279 328   if _, err := nat.ParsePort(parts[0]); err == nil {
    280 329   // Case 2: LOCAL_PORT:REMOTE_PORT
    281  - remoteIP, err := unambiguousIP(target)
    282  - if err != nil {
     330 + if _, err := unambiguousIP(target); err != nil {
    283 331   return forwarding{}, err
    284 332   }
    285 333   
    286 334   return forwarding{
    287  - localIP: "127.0.0.1",
    288 335   localPort: parts[0],
    289  - remoteIP: remoteIP,
    290 336   remotePort: parts[1],
    291 337   }, nil
    292 338   }
    293 339   
    294  - // Case 3: REMOTE_<IP|ALIAS|NETWORK>:REMOTE_PORT
    295  - remoteIP, err := lookupTargetIP(target, parts[0])
    296  - if err != nil {
    297  - return forwarding{}, err
    298  - }
    299  - 
    300  - // localPort will be later assigned by Docker dynamically
     340 + // Case 3: REMOTE_HOST:REMOTE_PORT
    301 341   return forwarding{
    302  - localIP: "127.0.0.1",
     342 + remoteHost: parts[0],
    303 343   remotePort: parts[1],
    304  - remoteIP: remoteIP,
    305 344   }, nil
    306 345   }
    307 346   
    308 347   if len(parts) == 3 {
    309  - // Either LOCAL_PORT:REMOTE_<IP|ALIAS|NET>:REMOTE_PORT or LOCAL_IP:LOCAL_PORT:REMOTE_PORT
     348 + // Either LOCAL_PORT:REMOTE_HOST:REMOTE_PORT or (LOCAL_HOST:LOCAL_PORT:REMOTE_PORT | LOCAL_HOST::REMOTE_PORT)
     349 + if _, err := nat.ParsePort(parts[2]); err != nil {
     350 + return forwarding{}, errBadRemotePort
     351 + }
    310 352   
    311 353   if _, err := nat.ParsePort(parts[0]); err == nil {
    312  - // Case 4: LOCAL_PORT:REMOTE_<IP|ALIAS|NET>:REMOTE_PORT
    313  - remoteIP, err := lookupTargetIP(target, parts[1])
    314  - if err != nil {
    315  - return forwarding{}, err
    316  - }
    317  - 
    318  - if _, err := nat.ParsePort(parts[2]); err != nil {
    319  - return forwarding{}, errBadRemotePort
     354 + // Case 4: LOCAL_PORT:REMOTE_HOST:REMOTE_PORT
     355 + if len(parts[1]) == 0 {
     356 + return forwarding{}, errBadRemoteHost
    320 357   }
    321 358   
    322 359   return forwarding{
    323  - localIP: "127.0.0.1",
    324 360   localPort: parts[0],
    325  - remoteIP: remoteIP,
     361 + remoteHost: parts[1],
    326 362   remotePort: parts[2],
    327 363   }, nil
    328 364   }
    329 365   
    330  - // Case 5: LOCAL_IP:LOCAL_PORT:REMOTE_PORT
    331  - remoteIP, err := unambiguousIP(target)
    332  - if err != nil {
     366 + // Case 5: LOCAL_HOST:LOCAL_PORT:REMOTE_PORT or LOCAL_HOST::REMOTE_PORT
     367 + if _, err := unambiguousIP(target); err != nil {
    333 368   return forwarding{}, err
    334 369   }
    335 370   
    336 371   return forwarding{
    337  - localIP: parts[0],
     372 + localHost: parts[0],
    338 373   localPort: parts[1],
    339  - remoteIP: remoteIP,
    340 374   remotePort: parts[2],
    341 375   }, nil
    342 376   }
    343 377   
    344  - // Case 6: LOCAL_IP:LOCAL_PORT:REMOTE_<IP|ALIAS|NET>:REMOTE_PORT
    345  - if _, err := nat.ParsePort(parts[1]); err != nil {
     378 + // Case 6: LOCAL_HOST:LOCAL_PORT:REMOTE_HOST:REMOTE_PORT or LOCAL_HOST::REMOTE_HOST:REMOTE_PORT
     379 + if _, err := nat.ParsePort(parts[1]); err != nil && len(parts[1]) > 0 {
    346 380   return forwarding{}, errBadLocalPort
    347 381   }
    348 382   if _, err := nat.ParsePort(parts[3]); err != nil {
    349 383   return forwarding{}, errBadRemotePort
    350 384   }
    351 385   
    352  - remoteIP, err := lookupTargetIP(target, parts[2])
    353  - if err != nil {
    354  - return forwarding{}, err
    355  - }
    356  - 
    357 386   return forwarding{
    358  - localIP: parts[0],
     387 + localHost: parts[0],
    359 388   localPort: parts[1],
    360  - remoteIP: remoteIP,
     389 + remoteHost: parts[2],
    361 390   remotePort: parts[3],
    362 391   }, nil
    363 392  }
    skipped 41 lines
    405 434   return "", errors.New("cannot derive remote host")
    406 435  }
    407 436   
     437 +func lookupPortBindings(target types.ContainerJSON, targetPort string) []nat.PortBinding {
     438 + for port, bindings := range target.NetworkSettings.Ports {
     439 + if targetPort == port.Port() {
     440 + return bindings
     441 + }
     442 + }
     443 + return nil
     444 +}
     445 + 
    408 446  func targetNetworkByIP(target types.ContainerJSON, ip string) (string, error) {
    409 447   for name, net := range target.NetworkSettings.Networks {
    410 448   if net.IPAddress == ip {
    skipped 3 lines
    414 452   return "", errors.New("cannot deduce target network by IP")
    415 453  }
    416 454   
    417  -func startLocalForwarder(
     455 +func startLocalForwarders(
     456 + ctx context.Context,
     457 + cli cliutil.CLI,
     458 + client dockerclient.CommonAPIClient,
     459 + target types.ContainerJSON,
     460 + locals []forwarding,
     461 +) <-chan error {
     462 + doneCh := make(chan error, 1)
     463 + 
     464 + go func() {
     465 + var errored bool
     466 + var wg sync.WaitGroup
     467 + 
     468 + for _, fwd := range locals {
     469 + wg.Add(1)
     470 + 
     471 + go func(fwd forwarding) {
     472 + defer wg.Done()
     473 + 
     474 + if err := runLocalForwarder(ctx, cli, client, target, fwd); err != nil {
     475 + logrus.Debugf("Forwarding error: %s", err)
     476 + errored = true
     477 + }
     478 + }(fwd)
     479 + }
     480 + 
     481 + wg.Wait()
     482 + if errored {
     483 + doneCh <- errors.New("one or more forwarders failed")
     484 + }
     485 + close(doneCh)
     486 + }()
     487 + 
     488 + return doneCh
     489 +}
     490 + 
     491 +func runLocalForwarder(
    418 492   ctx context.Context,
     493 + cli cliutil.CLI,
    419 494   client dockerclient.CommonAPIClient,
    420 495   target types.ContainerJSON,
    421 496   fwd forwarding,
    422  -) (types.ContainerJSON, error) {
    423  - exposedPorts, portBindings, err := nat.ParsePortSpecs([]string{fwd.toDockerPortSpec()})
     497 +) error {
     498 + if len(fwd.localHost) == 0 {
     499 + fwd.localHost = "127.0.0.1"
     500 + }
     501 + 
     502 + if len(fwd.remoteHost) == 0 {
     503 + remoteIP, err := unambiguousIP(target)
     504 + if err != nil {
     505 + return err
     506 + }
     507 + 
     508 + network, err := targetNetworkByIP(target, remoteIP)
     509 + if err != nil {
     510 + return err
     511 + }
     512 + 
     513 + return runLocalDirectForwarder(
     514 + ctx,
     515 + cli,
     516 + client,
     517 + directForwarding{
     518 + targetNetwork: network,
     519 + forwarding: forwarding{
     520 + localHost: fwd.localHost,
     521 + localPort: fwd.localPort,
     522 + remoteHost: remoteIP,
     523 + remotePort: fwd.remotePort,
     524 + },
     525 + },
     526 + )
     527 + }
     528 + 
     529 + if remoteIP, err := lookupTargetIP(target, fwd.remoteHost); err == nil {
     530 + network, err := targetNetworkByIP(target, remoteIP)
     531 + if err != nil {
     532 + return err
     533 + }
     534 + 
     535 + return runLocalDirectForwarder(
     536 + ctx,
     537 + cli,
     538 + client,
     539 + directForwarding{
     540 + targetNetwork: network,
     541 + forwarding: forwarding{
     542 + localHost: fwd.localHost,
     543 + localPort: fwd.localPort,
     544 + remoteHost: remoteIP,
     545 + remotePort: fwd.remotePort,
     546 + },
     547 + },
     548 + )
     549 + }
     550 + 
     551 + // In a multi-network case, pick a random one.
     552 + var targetNetwork, targetIP string
     553 + for name, settings := range target.NetworkSettings.Networks {
     554 + if len(settings.IPAddress) > 0 {
     555 + targetNetwork = name
     556 + targetIP = settings.IPAddress
     557 + break
     558 + }
     559 + }
     560 + if len(targetNetwork) == 0 || len(targetIP) == 0 {
     561 + return errors.New("target is not attached to any networks")
     562 + }
     563 + 
     564 + return runLocalSidecarForwarder(
     565 + ctx,
     566 + cli,
     567 + client,
     568 + sidecarForwarding{
     569 + targetID: target.ID,
     570 + targetNetwork: targetNetwork,
     571 + targetHost: targetIP,
     572 + forwarding: fwd, // as is
     573 + },
     574 + )
     575 +}
     576 + 
     577 +func runLocalDirectForwarder(
     578 + ctx context.Context,
     579 + cli cliutil.CLI,
     580 + client dockerclient.CommonAPIClient,
     581 + fwd directForwarding,
     582 +) error {
     583 + // TODO: Try start() N times.
     584 + 
     585 + forwarderID, err := startLocalDirectForwarder(ctx, client, fwd)
     586 + defer cleanupContainerIfExist(client, forwarderID)
    424 587   if err != nil {
    425  - return types.ContainerJSON{}, err
     588 + return fmt.Errorf("starting forwarder failed: %w", err)
     589 + }
     590 + 
     591 + if err := printLocalDirectForwarding(ctx, cli, client, fwd, forwarderID); err != nil {
     592 + return err
     593 + }
     594 + 
     595 + fwderStatusCh, fwderErrCh := client.ContainerWait(
     596 + ctx,
     597 + forwarderID,
     598 + container.WaitConditionNotRunning,
     599 + )
     600 + 
     601 + // TODO: If a forwarder was alive long enough, but then suddenly exited,
     602 + // we may want to restart it w/o decreasing the number of attempts.
     603 + select {
     604 + case <-ctx.Done():
     605 + return nil
     606 + 
     607 + case status := <-fwderStatusCh:
     608 + return fmt.Errorf(
     609 + "forwarder %s exited with code %d: %v",
     610 + forwarderID, status.StatusCode, status.Error,
     611 + )
     612 + 
     613 + case err := <-fwderErrCh:
     614 + logrus.Debugf("Forwarder error: %s", err)
     615 + return fmt.Errorf("forwarder %s hiccuped: %w", forwarderID, err)
    426 616   }
     617 +}
    427 618   
    428  - network, err := targetNetworkByIP(target, fwd.remoteIP)
     619 +func startLocalDirectForwarder(
     620 + ctx context.Context,
     621 + client dockerclient.CommonAPIClient,
     622 + fwd directForwarding,
     623 +) (string, error) {
     624 + portMapSpec := fwd.localHost + ":" + fwd.localPort + ":" + fwd.remotePort
     625 + exposedPorts, portBindings, err := nat.ParsePortSpecs([]string{portMapSpec})
    429 626   if err != nil {
    430  - return types.ContainerJSON{}, err
     627 + return "", err
    431 628   }
    432 629   
    433 630   resp, err := client.ContainerCreate(
    skipped 2 lines
    436 633   Image: forwarderImage,
    437 634   Entrypoint: []string{"socat"},
    438 635   Cmd: []string{
    439  - fmt.Sprintf("TCP-LISTEN:%s,fork", fwd.remotePort),
    440  - fmt.Sprintf("TCP-CONNECT:%s:%s", fwd.remoteIP, fwd.remotePort),
     636 + fmt.Sprintf("TCP4-LISTEN:%s,fork", fwd.remotePort),
     637 + fmt.Sprintf("TCP-CONNECT:%s:%s", fwd.remoteHost, fwd.remotePort),
    441 638   },
     639 + Env: []string{"SOCAT_DEFAULT_LISTEN_IP=0.0.0.0"},
    442 640   ExposedPorts: exposedPorts,
    443 641   },
    444 642   &container.HostConfig{
    445  - AutoRemove: true,
    446 643   PortBindings: portBindings,
    447  - NetworkMode: container.NetworkMode(network),
     644 + NetworkMode: container.NetworkMode(fwd.targetNetwork),
    448 645   },
    449 646   nil,
    450 647   nil,
    451 648   "cdebug-fwd-"+uuid.ShortID(),
    452 649   )
    453 650   if err != nil {
    454  - return types.ContainerJSON{}, fmt.Errorf("cannot create forwarder container: %w", err)
     651 + return "", fmt.Errorf("cannot create forwarder container: %w", err)
    455 652   }
    456 653   
    457 654   if err := client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
    458  - return types.ContainerJSON{}, fmt.Errorf("cannot start forwarder container: %w", err)
     655 + return resp.ID, fmt.Errorf("cannot start forwarder container: %w", err)
     656 + }
     657 + 
     658 + return resp.ID, nil
     659 +}
     660 + 
     661 +func runLocalSidecarForwarder(
     662 + ctx context.Context,
     663 + cli cliutil.CLI,
     664 + client dockerclient.CommonAPIClient,
     665 + fwd sidecarForwarding,
     666 +) error {
     667 + // TODO: Try starting sidecar and forwarder N times.
     668 + 
     669 + sidecarID, sidecarPort, err := startLocalSidecarForwarder(
     670 + ctx, client, fwd.targetID, fwd.remoteHost, fwd.remotePort,
     671 + )
     672 + defer cleanupContainerIfExist(client, sidecarID)
     673 + if err != nil {
     674 + return fmt.Errorf("starting forwarder sidecar failed: %w", err)
    459 675   }
    460 676   
    461  - forwarder, err := client.ContainerInspect(ctx, resp.ID)
     677 + fwd.sidecarPort = sidecarPort // randomly chosen
     678 + 
     679 + forwarderID, err := startLocalDirectForwarder(
     680 + ctx,
     681 + client,
     682 + directForwarding{
     683 + targetNetwork: fwd.targetNetwork,
     684 + forwarding: forwarding{
     685 + localHost: fwd.localHost,
     686 + localPort: fwd.localPort,
     687 + remoteHost: fwd.targetHost,
     688 + remotePort: fwd.sidecarPort,
     689 + },
     690 + },
     691 + )
     692 + defer cleanupContainerIfExist(client, forwarderID)
    462 693   if err != nil {
    463  - return forwarder, fmt.Errorf("cannot inspect forwarder container: %w", err)
     694 + return fmt.Errorf("starting forwarder faield: %w", err)
    464 695   }
    465  - return forwarder, nil
     696 + 
     697 + if err := printLocalSidecarForwarding(ctx, cli, client, fwd, forwarderID); err != nil {
     698 + return err
     699 + }
     700 + 
     701 + sidecarStatusCh, sidecarErrCh := client.ContainerWait(
     702 + ctx,
     703 + sidecarID,
     704 + container.WaitConditionNotRunning,
     705 + )
     706 + 
     707 + fwderStatusCh, fwderErrCh := client.ContainerWait(
     708 + ctx,
     709 + forwarderID,
     710 + container.WaitConditionNotRunning,
     711 + )
     712 + 
     713 + // TODO: If a forwarder and/or was alive long enough, we may want to
     714 + // restart them w/o decreasing the number of attempts.
     715 + select {
     716 + case <-ctx.Done():
     717 + return nil
     718 + 
     719 + case status := <-sidecarStatusCh:
     720 + return fmt.Errorf(
     721 + "forwarder sidecar %s exited with code %d: %v",
     722 + sidecarID, status.StatusCode, status.Error,
     723 + )
     724 + 
     725 + case status := <-fwderStatusCh:
     726 + return fmt.Errorf(
     727 + "forwarder %s exited with code %d: %v",
     728 + forwarderID, status.StatusCode, status.Error,
     729 + )
     730 + 
     731 + case err := <-sidecarErrCh:
     732 + logrus.Debugf("Forwarder sidecar error: %s", err)
     733 + return fmt.Errorf("forwarder sidecar %s hiccuped: %w", sidecarID, err)
     734 + 
     735 + case err := <-fwderErrCh:
     736 + logrus.Debugf("Forwarder error: %s", err)
     737 + return fmt.Errorf("forwarder %s hiccuped: %w", forwarderID, err)
     738 + }
    466 739  }
    467 740   
    468  -func localForwardingsToJSON(fwds []forwarding) string {
    469  - out := []map[string]string{}
    470  - for _, f := range fwds {
    471  - out = append(out, map[string]string{
    472  - "localHost": f.localIP,
    473  - "localPort": f.localPort,
    474  - "remoteHost": f.remoteIP,
    475  - "remotePort": f.remotePort,
    476  - })
     741 +func startLocalSidecarForwarder(
     742 + ctx context.Context,
     743 + client dockerclient.CommonAPIClient,
     744 + targetID string,
     745 + remoteHost string,
     746 + remotePort string,
     747 +) (string, string, error) {
     748 + // TODO: This random port may conflict with a port already used by the
     749 + // target container. Instead, we should use socat TCP-LISTEN:0 and
     750 + // detect what port was assigned by the OS with a separate command.
     751 + randomPort := fmt.Sprintf("%d", 32000+rand.Intn(25000))
     752 + resp, err := client.ContainerCreate(
     753 + ctx,
     754 + &container.Config{
     755 + Image: forwarderImage,
     756 + Entrypoint: []string{"socat"},
     757 + Cmd: []string{
     758 + fmt.Sprintf("TCP4-LISTEN:%s,fork", randomPort),
     759 + fmt.Sprintf("TCP-CONNECT:%s:%s", remoteHost, remotePort),
     760 + },
     761 + Env: []string{"SOCAT_DEFAULT_LISTEN_IP=0.0.0.0"},
     762 + },
     763 + &container.HostConfig{
     764 + NetworkMode: container.NetworkMode("container:" + targetID),
     765 + },
     766 + nil,
     767 + nil,
     768 + "cdebug-fwd-sidecar-"+uuid.ShortID(),
     769 + )
     770 + if err != nil {
     771 + return "", "", fmt.Errorf("cannot create forwarder sidecar container: %w", err)
     772 + }
     773 + 
     774 + if err := client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
     775 + return resp.ID, "", fmt.Errorf("cannot start forwarder sidecar container: %w", err)
     776 + }
     777 + 
     778 + return resp.ID, randomPort, nil
     779 +}
     780 + 
     781 +func printLocalDirectForwarding(
     782 + ctx context.Context,
     783 + cli cliutil.CLI,
     784 + client dockerclient.CommonAPIClient,
     785 + fwd directForwarding,
     786 + forwarderID string,
     787 +) error {
     788 + if len(fwd.localPort) == 0 {
     789 + forwarder, err := client.ContainerInspect(ctx, forwarderID)
     790 + if err != nil {
     791 + return fmt.Errorf("cannot inspect forwarder container: %w", err)
     792 + }
     793 + 
     794 + bindings := lookupPortBindings(forwarder, fwd.remotePort)
     795 + if len(bindings) == 0 {
     796 + logrus.Debugf("Empty port bindings in forwarder %s", forwarder.ID)
     797 + fwd.localPort = "<unknown>"
     798 + } else {
     799 + // Every forwarder should have just one port exposed.
     800 + fwd.localPort = bindings[0].HostPort
     801 + }
     802 + }
     803 + 
     804 + cli.PrintOut(
     805 + "Forwarding %s:%s to %s:%s\n",
     806 + fwd.localHost, fwd.localPort,
     807 + fwd.remoteHost, fwd.remotePort,
     808 + )
     809 + 
     810 + return nil
     811 +}
     812 + 
     813 +func printLocalSidecarForwarding(
     814 + ctx context.Context,
     815 + cli cliutil.CLI,
     816 + client dockerclient.CommonAPIClient,
     817 + fwd sidecarForwarding,
     818 + forwarderID string,
     819 +) error {
     820 + if len(fwd.localPort) == 0 {
     821 + forwarder, err := client.ContainerInspect(ctx, forwarderID)
     822 + if err != nil {
     823 + return fmt.Errorf("cannot inspect forwarder container: %w", err)
     824 + }
     825 + 
     826 + bindings := lookupPortBindings(forwarder, fwd.sidecarPort)
     827 + if len(bindings) == 0 {
     828 + logrus.Debugf("Empty port bindings in forwarder %s", forwarder.ID)
     829 + fwd.localPort = "<unknown>"
     830 + } else {
     831 + // Every forwarder should have just one port exposed.
     832 + fwd.localPort = bindings[0].HostPort
     833 + }
    477 834   }
    478  - return jsonutil.DumpIndent(out)
     835 + 
     836 + cli.PrintOut(
     837 + "Forwarding %s:%s to %s:%s through %s:%s\n",
     838 + fwd.localHost, fwd.localPort,
     839 + fwd.remoteHost, fwd.remotePort,
     840 + fwd.targetHost, fwd.sidecarPort,
     841 + )
     842 + 
     843 + return nil
    479 844  }
    480 845   
    481  -func localForwardingsToText(fwds []forwarding) string {
    482  - out := []string{}
    483  - for _, f := range fwds {
    484  - out = append(out, fmt.Sprintf(
    485  - "Forwarding %s:%s to %s:%s",
    486  - f.localIP, f.localPort, f.remoteIP, f.remotePort,
    487  - ))
     846 +func cleanupContainerIfExist(
     847 + client dockerclient.CommonAPIClient,
     848 + contID string,
     849 +) {
     850 + if len(contID) == 0 {
     851 + return
     852 + }
     853 + 
     854 + ctx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
     855 + defer cancel()
     856 + 
     857 + if err := client.ContainerRemove(ctx, contID, types.ContainerRemoveOptions{Force: true}); err != nil {
     858 + logrus.Debugf("Cannot force-remove container %s: %s", contID, err)
    488 859   }
    489  - return strings.Join(out, "\n")
    490 860  }
    491 861   
  • ■ ■ ■ ■ ■ ■
    main.go
    1 1  package main
    2 2   
    3 3  import (
     4 + cryptorand "crypto/rand"
     5 + "encoding/binary"
    4 6   "fmt"
     7 + mathrand "math/rand"
    5 8   "os"
     9 + "time"
    6 10   
    7 11   "github.com/moby/term"
    8 12   "github.com/sirupsen/logrus"
    skipped 9 lines
    18 22   commit = "none"
    19 23   date = "unknown"
    20 24  )
     25 + 
     26 +func init() {
     27 + var buf [8]byte
     28 + if _, err := cryptorand.Read(buf[:]); err == nil {
     29 + mathrand.Seed(int64(binary.LittleEndian.Uint64(buf[:])))
     30 + } else {
     31 + mathrand.Seed(time.Now().UnixNano())
     32 + }
     33 +}
    21 34   
    22 35  func main() {
    23 36   stdin, stdout, stderr := term.StdStreams()
    skipped 56 lines
  • ■ ■ ■ ■ ■ ■
    pkg/signalutil/signalutil.go
     1 +package signalutil
     2 + 
     3 +import (
     4 + "context"
     5 + "os"
     6 + "os/signal"
     7 + "syscall"
     8 +)
     9 + 
     10 +func InterruptibleContext(ctx context.Context) context.Context {
     11 + ctx, cancel := context.WithCancel(ctx)
     12 + 
     13 + go func() {
     14 + defer cancel()
     15 + 
     16 + signalCh := make(chan os.Signal, 128)
     17 + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
     18 + defer signal.Stop(signalCh)
     19 + 
     20 + select {
     21 + case <-signalCh:
     22 + case <-ctx.Done():
     23 + }
     24 + }()
     25 + 
     26 + return ctx
     27 +}
     28 + 
Please wait...
Page is in error, reload to recover