Projects STRLCPY cdebug Commits 80e0169a
🤬
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■ ■
    cmd/portforward/portforward.go
    skipped 16 lines
    17 17   
    18 18   "github.com/iximiuz/cdebug/pkg/cliutil"
    19 19   "github.com/iximiuz/cdebug/pkg/docker"
     20 + "github.com/iximiuz/cdebug/pkg/jsonutil"
    20 21   "github.com/iximiuz/cdebug/pkg/signalutil"
    21 22   "github.com/iximiuz/cdebug/pkg/uuid"
    22 23  )
    skipped 89 lines
    112 113   )
    113 114   
    114 115   flags.DurationVar(
    115  - &opts.output,
    116  - "running-timeout",
    117 116   &opts.runningTimeout,
     117 + "running-timeout",
     118 + 10*time.Second,
    118 119   `How long to wait until the target is up and running`,
    119 120   )
    120 121   
    skipped 5 lines
    126 127   `Suppress verbose output`,
    127 128   )
    128 129   
    129  - flags.StringVarP(
    130  - &opts.output,
    131  - "output",
    132  - "o",
    133  - outFormatText,
    134  - `Output format ("text" | "json")`,
    135  - )
    136  - 
    137 130   return cmd
    138 131  }
    139 132   
    skipped 3 lines
    143 136   return err
    144 137   }
    145 138   
    146  - target, err := getRunningTarget(ctx, opts.target, opts.runningTimeout)
     139 + cli.PrintAux("Pulling forwarder image...\n")
     140 + if err := client.ImagePullEx(ctx, forwarderImage, types.ImagePullOptions{}); err != nil {
     141 + return fmt.Errorf("cannot pull forwarder image %q: %w", forwarderImage, err)
     142 + }
     143 + 
     144 + ctx, cancel := context.WithCancel(signalutil.InterruptibleContext(ctx))
     145 + defer cancel()
     146 + 
     147 + for {
     148 + cont, err := runLocalPortForward(ctx, cli, client, opts)
     149 + if err != nil {
     150 + return err
     151 + }
     152 + if !cont || ctx.Err() != nil {
     153 + cli.PrintAux("Forwarding's done. Exiting...\n")
     154 + return nil
     155 + }
     156 + 
     157 + cli.PrintAux("Giving target %s to get up and running again...\n", opts.runningTimeout)
     158 + }
     159 +}
     160 + 
     161 +func runLocalPortForward(
     162 + ctx context.Context,
     163 + cli cliutil.CLI,
     164 + client dockerclient.CommonAPIClient,
     165 + opts *options,
     166 +) (bool, error) {
     167 + target, err := getRunningTarget(ctx, client, opts.target, opts.runningTimeout)
     168 + if err != nil {
     169 + return false, err
     170 + }
     171 + 
    147 172   if err := validateTarget(target); err != nil {
    148  - return err
     173 + return false, err
    149 174   }
    150 175   
    151 176   locals, err := parseLocalForwardings(target, opts.locals)
    152 177   if err != nil {
    153  - return err
     178 + return false, err
    154 179   }
    155 180   
    156  - cli.PrintAux("Pulling forwarder image...\n")
    157  - if err := client.ImagePullEx(ctx, forwarderImage, types.ImagePullOptions{}); err != nil {
    158  - return fmt.Errorf("cannot pull forwarder image %q: %w", forwarderImage, err)
    159  - }
     181 + // Start a new context bound to a single target lifecycle.
     182 + // It'll be used mostly to terminate the forwarders if a
     183 + // given instance of the target terminates.
     184 + ctx, cancel := context.WithCancel(ctx)
     185 + defer cancel()
    160 186   
    161  - ctx, cancel := context.WithCancel(signalutil.InterruptibleContext(ctx))
     187 + fwdersErrorCh := startLocalForwarders(ctx, cli, client, target, locals)
    162 188   
    163  - var wg sync.WaitGroup
    164  - for _, fwd := range locals {
    165  - wg.Add(1)
     189 + targetStatusCh, targetErrorCh := client.ContainerWait(
     190 + ctx,
     191 + target.ID,
     192 + container.WaitConditionNotRunning,
     193 + )
    166 194   
    167  - go func() {
    168  - defer wg.Done()
     195 + select {
     196 + case err := <-fwdersErrorCh:
     197 + // Couldn't start or keep one or more forwarders running.
     198 + // All forwarders must be down (best effort) at this time.
     199 + return false, err
    169 200   
    170  - if err := runLocalForwarder(ctx, client, target, fwd); err != nil {
    171  - logger.Warnf("forwading error: %s", err)
    172  - }
    173  - }()
     201 + case <-targetStatusCh:
     202 + // Target exited/restarting.
     203 + cli.PrintAux("Target exited\n")
     204 + 
     205 + case err := <-targetErrorCh:
     206 + // No idea what happened to the target, but better restart the forwarders
     207 + // (or exit while trying because the target is already gone).
     208 + logrus.Debugf("Target error: %s", err)
    174 209   }
    175  - wg.Wait()
    176 210   
    177  - return nil
     211 + cli.PrintAux("Stopping the forwarders...\n")
     212 + cancel() // Tell the forwarders it's time to stop.
     213 + if err := <-fwdersErrorCh; err != nil {
     214 + logrus.Debugf("Error stopping forwarder(s): %s", err)
     215 + }
     216 + 
     217 + if opts.runningTimeout == 0 {
     218 + return false, nil
     219 + }
     220 + 
     221 + return true, nil
    178 222  }
    179 223   
    180 224  func getRunningTarget(
    181 225   ctx context.Context,
     226 + client dockerclient.CommonAPIClient,
    182 227   target string,
    183 228   runningTimeout time.Duration,
    184  -) (target types.ContainerJSON, err error) {
     229 +) (types.ContainerJSON, error) {
    185 230   ctx, cancel := context.WithTimeout(ctx, runningTimeout)
    186 231   defer cancel()
    187 232   
    188 233   for {
    189  - target, err = client.ContainerInspect(ctx, opts.target)
     234 + cont, err := client.ContainerInspect(ctx, target)
    190 235   if err != nil {
    191  - return target, err
     236 + return cont, err
    192 237   }
    193  - if target.State != nil && target.State.Running {
    194  - return target, nil
     238 + if cont.State != nil && cont.State.Running {
     239 + return cont, nil
    195 240   }
    196 241   
    197 242   select {
    198 243   case <-ctx.Done():
    199  - return target, fmt.Errorf("target is not running after %s", runningTimeout)
     244 + return cont, fmt.Errorf("target is not running after %s", runningTimeout)
    200 245   case <-time.After(100 * time.Millisecond):
    201 246   }
    202 247   }
    skipped 79 lines
    282 327   
    283 328   if len(parts) == 3 {
    284 329   // Either LOCAL_PORT:REMOTE_HOST:REMOTE_PORT or (LOCAL_HOST:LOCAL_PORT:REMOTE_PORT | LOCAL_HOST::REMOTE_PORT)
    285  - if _, err := nat.ParsePort(parts[3]); err != nil {
     330 + if _, err := nat.ParsePort(parts[2]); err != nil {
    286 331   return forwarding{}, errBadRemotePort
    287 332   }
    288 333   
    289 334   if _, err := nat.ParsePort(parts[0]); err == nil {
    290 335   // Case 4: LOCAL_PORT:REMOTE_HOST:REMOTE_PORT
    291  - if _, err := nat.ParsePort(parts[2]); err != nil {
    292  - return forwarding{}, errBadRemotePort
    293  - }
    294 336   if len(parts[1]) == 0 {
    295 337   return forwarding{}, errBadRemoteHost
    296 338   }
    skipped 76 lines
    373 415   return "", errors.New("cannot derive remote host")
    374 416  }
    375 417   
     418 +func lookupPortBindings(target types.ContainerJSON, targetPort string) []nat.PortBinding {
     419 + for port, bindings := range target.NetworkSettings.Ports {
     420 + if targetPort == port.Port() {
     421 + return bindings
     422 + }
     423 + }
     424 + return nil
     425 +}
     426 + 
    376 427  func targetNetworkByIP(target types.ContainerJSON, ip string) (string, error) {
    377 428   for name, net := range target.NetworkSettings.Networks {
    378 429   if net.IPAddress == ip {
    skipped 3 lines
    382 433   return "", errors.New("cannot deduce target network by IP")
    383 434  }
    384 435   
    385  -func startLocalForwarder(
     436 +func startLocalForwarders(
     437 + ctx context.Context,
     438 + cli cliutil.CLI,
     439 + client dockerclient.CommonAPIClient,
     440 + target types.ContainerJSON,
     441 + locals []forwarding,
     442 +) <-chan error {
     443 + doneCh := make(chan error, 1)
     444 + 
     445 + go func() {
     446 + var errored bool
     447 + var wg sync.WaitGroup
     448 + 
     449 + for _, fwd := range locals {
     450 + wg.Add(1)
     451 + 
     452 + go func() {
     453 + defer wg.Done()
     454 + 
     455 + if err := runLocalForwarder(ctx, cli, client, target, fwd); err != nil {
     456 + logrus.Debugf("Forwarding error: %s", err)
     457 + errored = true
     458 + }
     459 + }()
     460 + }
     461 + 
     462 + wg.Wait()
     463 + if errored {
     464 + doneCh <- errors.New("one or more forwarders failed")
     465 + }
     466 + close(doneCh)
     467 + }()
     468 + 
     469 + return doneCh
     470 +}
     471 + 
     472 +func runLocalForwarder(
    386 473   ctx context.Context,
     474 + cli cliutil.CLI,
    387 475   client dockerclient.CommonAPIClient,
    388 476   target types.ContainerJSON,
    389 477   fwd forwarding,
    390 478  ) error {
    391 479   if len(fwd.remoteHost) == 0 {
    392  - forwarder, err := startLocalForwarderToTargetPublicPort(
     480 + remoteIP, err := unambiguousIP(target)
     481 + if err != nil {
     482 + return err
     483 + }
     484 + return runLocalForwarderToTargetPublicPort(
    393 485   ctx,
     486 + cli,
    394 487   client,
    395 488   target,
    396 489   fwd.localHost,
    397 490   fwd.localPort,
     491 + remoteIP,
    398 492   fwd.remotePort,
    399 493   )
    400  - if err != nil {
    401  - return err
     494 + }
     495 + 
     496 + if remoteIP, err := lookupTargetIP(target, fwd.remoteHost); err == nil {
     497 + return runLocalForwarderToTargetPublicPort(
     498 + ctx,
     499 + cli,
     500 + client,
     501 + target,
     502 + fwd.localHost,
     503 + fwd.localPort,
     504 + remoteIP,
     505 + fwd.remotePort,
     506 + )
     507 + }
     508 + 
     509 + // TODO: runLocalForwarderThroughTargetNetns()
     510 + return errors.New("implement me!")
     511 +}
     512 + 
     513 +func runLocalForwarderToTargetPublicPort(
     514 + ctx context.Context,
     515 + cli cliutil.CLI,
     516 + client dockerclient.CommonAPIClient,
     517 + target types.ContainerJSON,
     518 + localHost string,
     519 + localPort string,
     520 + remoteIP string,
     521 + remotePort string,
     522 +) error {
     523 + // TODO: Try start() N times.
     524 + forwarder, err := startLocalForwarderToTargetPublicPort(
     525 + ctx,
     526 + cli,
     527 + client,
     528 + target,
     529 + localHost,
     530 + localPort,
     531 + remoteIP,
     532 + remotePort,
     533 + )
     534 + if err != nil {
     535 + return err
     536 + }
     537 + 
     538 + killForwarder := func() {
     539 + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
     540 + defer cancel()
     541 + 
     542 + if err := client.ContainerKill(ctx, forwarder.ID, "KILL"); err != nil {
     543 + logrus.Debugf("Cannot kill forwarder container: %s", err)
    402 544   }
    403  - return maintainLocalForwarderToTargetPublicPort(ctx, client, target, forwarder)
    404 545   }
    405 546   
    406  - // TODO: startLocalForwarderThroughTargetNetns()
    407  - return errors.New("Implement me!")
     547 + fwderStatusCh, fwderErrCh := client.ContainerWait(
     548 + ctx,
     549 + forwarder.ID,
     550 + container.WaitConditionNotRunning,
     551 + )
     552 + 
     553 + // TODO: If a forwarder was alive long enough, we may want to restart it w/o
     554 + // decreasing the number of attempts.
     555 + select {
     556 + case <-ctx.Done():
     557 + killForwarder()
     558 + return nil
     559 + 
     560 + case status := <-fwderStatusCh:
     561 + return fmt.Errorf(
     562 + "forwarder %s exited with code %d: %w",
     563 + forwarder.ID, status.StatusCode, status.Error,
     564 + )
     565 + 
     566 + case err := <-fwderErrCh:
     567 + logrus.Debugf("Forwarder error: %s", err)
     568 + killForwarder() // Just in case...
     569 + return fmt.Errorf("forwarder %s hiccuped: %w", forwarder.ID, err)
     570 + }
    408 571  }
    409 572   
    410 573  func startLocalForwarderToTargetPublicPort(
    411 574   ctx context.Context,
     575 + cli cliutil.CLI,
    412 576   client dockerclient.CommonAPIClient,
    413 577   target types.ContainerJSON,
    414 578   localHost string,
    415 579   localPort string,
     580 + remoteIP string,
    416 581   remotePort string,
    417  -) (forwarder types.ContainerJSON, err error) {
    418  - remoteIP, err := unambiguousIP(target)
    419  - if err != nil {
    420  - return forwarder, err
     582 +) (types.ContainerJSON, error) {
     583 + if len(localHost) == 0 {
     584 + localHost = "127.0.0.1"
    421 585   }
     586 + 
    422 587   network, err := targetNetworkByIP(target, remoteIP)
    423 588   if err != nil {
    424  - return forwarder, err
     589 + return types.ContainerJSON{}, err
    425 590   }
    426 591   
    427  - exposedPorts, portBindings, err := nat.ParsePortSpecs([]string{
    428  - localHost + ":" + localPort + ":" + remotePort,
    429  - })
     592 + portMapSpec := localHost + ":" + localPort + ":" + remotePort
     593 + exposedPorts, portBindings, err := nat.ParsePortSpecs([]string{portMapSpec})
    430 594   if err != nil {
    431  - return forwarder, err
     595 + return types.ContainerJSON{}, err
    432 596   }
    433 597   
    434 598   resp, err := client.ContainerCreate(
    skipped 17 lines
    452 616   "cdebug-fwd-"+uuid.ShortID(),
    453 617   )
    454 618   if err != nil {
    455  - return forwarder, fmt.Errorf("cannot create forwarder container: %w", err)
     619 + return types.ContainerJSON{}, fmt.Errorf("cannot create forwarder container: %w", err)
    456 620   }
    457 621   
    458 622   if err := client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
    459  - return forwarder, fmt.Errorf("cannot start forwarder container: %w", err)
     623 + return types.ContainerJSON{}, fmt.Errorf("cannot start forwarder container: %w", err)
    460 624   }
    461 625   
    462  - forwarder, err = client.ContainerInspect(ctx, resp.ID)
     626 + forwarder, err := client.ContainerInspect(ctx, resp.ID)
    463 627   if err != nil {
    464 628   return forwarder, fmt.Errorf("cannot inspect forwarder container: %w", err)
    465 629   }
    466  - if forwarder.State != nil && forwarder.State.Running {
    467  - return forwarder, fmt.Errorf("cannot inspect forwarder container: %w", err)
     630 + if forwarder.State == nil && !forwarder.State.Running {
     631 + return forwarder, fmt.Errorf("unexpected forwarder container state: %v", jsonutil.Dump(forwarder.State))
    468 632   }
    469 633   
    470  - return forwarder, nil
    471  -}
    472  - 
    473  -func maintainLocalForwarderToTargetPublicPort(
    474  - ctx context.Context,
    475  - client dockerclient.CommonAPIClient,
    476  - target types.ContainerJSON,
    477  - forwarder types.ContainerJSON,
    478  -) error {
    479  - targetStatusCh, targetErrCh := client.ContainerWait(ctx, target.ID, container.WaitConditionNotRunning)
    480  - fwderStatusCh, fwderErrCh := client.ContainerWait(ctx, forwarder.ID, container.WaitConditionNotRunning)
    481  - 
    482  - defer func() {
    483  - if err := client.ContainerKill(ctx, forwarder.ID, "KILL"); err != nil {
    484  - logrus.Debugf("Cannot kill forwarder container: %s", err)
     634 + if len(localPort) == 0 {
     635 + bindings := lookupPortBindings(forwarder, remotePort)
     636 + if len(bindings) == 0 {
     637 + logrus.Debugf("Empty bindings for port map spec %s in forwarder %s", portMapSpec, forwarder.ID)
     638 + localPort = "<unknown>"
     639 + } else {
     640 + // Every forwarder should have just one port exposed.
     641 + localPort = bindings[0].HostPort
    485 642   }
    486  - }()
     643 + }
    487 644   
    488  - // TODO: If target exits - kill forwarder, wait for N seconds, restart forwarder (or exit).
    489  - // If forwarder exits - try restarting it (< K times); otherwise, exit.
    490  - for {
    491  - select {
    492  - case err := <-targetErrCh:
    493  - if err != nil {
    494  - cli.PrintAux("Exiting...")
    495  - cancel()
    496  - return fmt.Errorf("waiting for target container failed: %w", err)
    497  - }
    498  - 
    499  - case <-targetStatusCh:
    500  - target, err = getRunningTarget(ctx, opts.target, opts.runningTimeout)
    501  - if err != nil {
    502  - cli.PrintAux("Exiting...")
    503  - cancel()
    504  - } else {
    505  - targetStatusCh, targetErrCh = client.ContainerWait(ctx, target.ID, container.WaitConditionNotRunning)
    506  - }
    507  - }
    508  - }
     645 + cli.PrintOut("Forwarding %s:%s to %s:%s\n", localHost, localPort, remoteIP, remotePort)
     646 + return forwarder, nil
    509 647  }
    510 648   
    511  -// func localForwardingsToJSON(fwds []forwarding) string {
    512  -// out := []map[string]string{}
    513  -// for _, f := range fwds {
    514  -// out = append(out, map[string]string{
    515  -// "localHost": f.localHost,
    516  -// "localPort": f.localPort,
    517  -// "remoteHost": f.remoteHost,
    518  -// "remotePort": f.remotePort,
    519  -// })
    520  -// }
    521  -// return jsonutil.DumpIndent(out)
    522  -// }
    523  -//
    524  -// func localForwardingsToText(fwds []forwarding) string {
    525  -// out := []string{}
    526  -// for _, f := range fwds {
    527  -// out = append(out, fmt.Sprintf(
    528  -// "Forwarding %s:%s to %s:%s",
    529  -// f.localHost, f.localPort, f.remoteHost, f.remotePort,
    530  -// ))
    531  -// }
    532  -// return strings.Join(out, "\n")
    533  -// }
    534  - 
Please wait...
Page is in error, reload to recover