| skipped 16 lines |
17 | 17 | | |
18 | 18 | | "github.com/iximiuz/cdebug/pkg/cliutil" |
19 | 19 | | "github.com/iximiuz/cdebug/pkg/docker" |
| 20 | + | "github.com/iximiuz/cdebug/pkg/jsonutil" |
20 | 21 | | "github.com/iximiuz/cdebug/pkg/signalutil" |
21 | 22 | | "github.com/iximiuz/cdebug/pkg/uuid" |
22 | 23 | | ) |
| skipped 89 lines |
112 | 113 | | ) |
113 | 114 | | |
114 | 115 | | flags.DurationVar( |
115 | | - | &opts.output, |
116 | | - | "running-timeout", |
117 | 116 | | &opts.runningTimeout, |
| 117 | + | "running-timeout", |
| 118 | + | 10*time.Second, |
118 | 119 | | `How long to wait until the target is up and running`, |
119 | 120 | | ) |
120 | 121 | | |
| skipped 5 lines |
126 | 127 | | `Suppress verbose output`, |
127 | 128 | | ) |
128 | 129 | | |
129 | | - | flags.StringVarP( |
130 | | - | &opts.output, |
131 | | - | "output", |
132 | | - | "o", |
133 | | - | outFormatText, |
134 | | - | `Output format ("text" | "json")`, |
135 | | - | ) |
136 | | - | |
137 | 130 | | return cmd |
138 | 131 | | } |
139 | 132 | | |
| skipped 3 lines |
143 | 136 | | return err |
144 | 137 | | } |
145 | 138 | | |
146 | | - | target, err := getRunningTarget(ctx, opts.target, opts.runningTimeout) |
| 139 | + | cli.PrintAux("Pulling forwarder image...\n") |
| 140 | + | if err := client.ImagePullEx(ctx, forwarderImage, types.ImagePullOptions{}); err != nil { |
| 141 | + | return fmt.Errorf("cannot pull forwarder image %q: %w", forwarderImage, err) |
| 142 | + | } |
| 143 | + | |
| 144 | + | ctx, cancel := context.WithCancel(signalutil.InterruptibleContext(ctx)) |
| 145 | + | defer cancel() |
| 146 | + | |
| 147 | + | for { |
| 148 | + | cont, err := runLocalPortForward(ctx, cli, client, opts) |
| 149 | + | if err != nil { |
| 150 | + | return err |
| 151 | + | } |
| 152 | + | if !cont || ctx.Err() != nil { |
| 153 | + | cli.PrintAux("Forwarding's done. Exiting...\n") |
| 154 | + | return nil |
| 155 | + | } |
| 156 | + | |
| 157 | + | cli.PrintAux("Giving target %s to get up and running again...\n", opts.runningTimeout) |
| 158 | + | } |
| 159 | + | } |
| 160 | + | |
| 161 | + | func runLocalPortForward( |
| 162 | + | ctx context.Context, |
| 163 | + | cli cliutil.CLI, |
| 164 | + | client dockerclient.CommonAPIClient, |
| 165 | + | opts *options, |
| 166 | + | ) (bool, error) { |
| 167 | + | target, err := getRunningTarget(ctx, client, opts.target, opts.runningTimeout) |
| 168 | + | if err != nil { |
| 169 | + | return false, err |
| 170 | + | } |
| 171 | + | |
147 | 172 | | if err := validateTarget(target); err != nil { |
148 | | - | return err |
| 173 | + | return false, err |
149 | 174 | | } |
150 | 175 | | |
151 | 176 | | locals, err := parseLocalForwardings(target, opts.locals) |
152 | 177 | | if err != nil { |
153 | | - | return err |
| 178 | + | return false, err |
154 | 179 | | } |
155 | 180 | | |
156 | | - | cli.PrintAux("Pulling forwarder image...\n") |
157 | | - | if err := client.ImagePullEx(ctx, forwarderImage, types.ImagePullOptions{}); err != nil { |
158 | | - | return fmt.Errorf("cannot pull forwarder image %q: %w", forwarderImage, err) |
159 | | - | } |
| 181 | + | // Start a new context bound to a single target lifecycle. |
| 182 | + | // It'll be used mostly to terminate the forwarders if a |
| 183 | + | // given instance of the target terminates. |
| 184 | + | ctx, cancel := context.WithCancel(ctx) |
| 185 | + | defer cancel() |
160 | 186 | | |
161 | | - | ctx, cancel := context.WithCancel(signalutil.InterruptibleContext(ctx)) |
| 187 | + | fwdersErrorCh := startLocalForwarders(ctx, cli, client, target, locals) |
162 | 188 | | |
163 | | - | var wg sync.WaitGroup |
164 | | - | for _, fwd := range locals { |
165 | | - | wg.Add(1) |
| 189 | + | targetStatusCh, targetErrorCh := client.ContainerWait( |
| 190 | + | ctx, |
| 191 | + | target.ID, |
| 192 | + | container.WaitConditionNotRunning, |
| 193 | + | ) |
166 | 194 | | |
167 | | - | go func() { |
168 | | - | defer wg.Done() |
| 195 | + | select { |
| 196 | + | case err := <-fwdersErrorCh: |
| 197 | + | // Couldn't start or keep one or more forwarders running. |
| 198 | + | // All forwarders must be down (best effort) at this time. |
| 199 | + | return false, err |
169 | 200 | | |
170 | | - | if err := runLocalForwarder(ctx, client, target, fwd); err != nil { |
171 | | - | logger.Warnf("forwading error: %s", err) |
172 | | - | } |
173 | | - | }() |
| 201 | + | case <-targetStatusCh: |
| 202 | + | // Target exited/restarting. |
| 203 | + | cli.PrintAux("Target exited\n") |
| 204 | + | |
| 205 | + | case err := <-targetErrorCh: |
| 206 | + | // No idea what happened to the target, but better restart the forwarders |
| 207 | + | // (or exit while trying because the target is already gone). |
| 208 | + | logrus.Debugf("Target error: %s", err) |
174 | 209 | | } |
175 | | - | wg.Wait() |
176 | 210 | | |
177 | | - | return nil |
| 211 | + | cli.PrintAux("Stopping the forwarders...\n") |
| 212 | + | cancel() // Tell the forwarders it's time to stop. |
| 213 | + | if err := <-fwdersErrorCh; err != nil { |
| 214 | + | logrus.Debugf("Error stopping forwarder(s): %s", err) |
| 215 | + | } |
| 216 | + | |
| 217 | + | if opts.runningTimeout == 0 { |
| 218 | + | return false, nil |
| 219 | + | } |
| 220 | + | |
| 221 | + | return true, nil |
178 | 222 | | } |
179 | 223 | | |
180 | 224 | | func getRunningTarget( |
181 | 225 | | ctx context.Context, |
| 226 | + | client dockerclient.CommonAPIClient, |
182 | 227 | | target string, |
183 | 228 | | runningTimeout time.Duration, |
184 | | - | ) (target types.ContainerJSON, err error) { |
| 229 | + | ) (types.ContainerJSON, error) { |
185 | 230 | | ctx, cancel := context.WithTimeout(ctx, runningTimeout) |
186 | 231 | | defer cancel() |
187 | 232 | | |
188 | 233 | | for { |
189 | | - | target, err = client.ContainerInspect(ctx, opts.target) |
| 234 | + | cont, err := client.ContainerInspect(ctx, target) |
190 | 235 | | if err != nil { |
191 | | - | return target, err |
| 236 | + | return cont, err |
192 | 237 | | } |
193 | | - | if target.State != nil && target.State.Running { |
194 | | - | return target, nil |
| 238 | + | if cont.State != nil && cont.State.Running { |
| 239 | + | return cont, nil |
195 | 240 | | } |
196 | 241 | | |
197 | 242 | | select { |
198 | 243 | | case <-ctx.Done(): |
199 | | - | return target, fmt.Errorf("target is not running after %s", runningTimeout) |
| 244 | + | return cont, fmt.Errorf("target is not running after %s", runningTimeout) |
200 | 245 | | case <-time.After(100 * time.Millisecond): |
201 | 246 | | } |
202 | 247 | | } |
| skipped 79 lines |
282 | 327 | | |
283 | 328 | | if len(parts) == 3 { |
284 | 329 | | // Either LOCAL_PORT:REMOTE_HOST:REMOTE_PORT or (LOCAL_HOST:LOCAL_PORT:REMOTE_PORT | LOCAL_HOST::REMOTE_PORT) |
285 | | - | if _, err := nat.ParsePort(parts[3]); err != nil { |
| 330 | + | if _, err := nat.ParsePort(parts[2]); err != nil { |
286 | 331 | | return forwarding{}, errBadRemotePort |
287 | 332 | | } |
288 | 333 | | |
289 | 334 | | if _, err := nat.ParsePort(parts[0]); err == nil { |
290 | 335 | | // Case 4: LOCAL_PORT:REMOTE_HOST:REMOTE_PORT |
291 | | - | if _, err := nat.ParsePort(parts[2]); err != nil { |
292 | | - | return forwarding{}, errBadRemotePort |
293 | | - | } |
294 | 336 | | if len(parts[1]) == 0 { |
295 | 337 | | return forwarding{}, errBadRemoteHost |
296 | 338 | | } |
| skipped 76 lines |
373 | 415 | | return "", errors.New("cannot derive remote host") |
374 | 416 | | } |
375 | 417 | | |
| 418 | + | func lookupPortBindings(target types.ContainerJSON, targetPort string) []nat.PortBinding { |
| 419 | + | for port, bindings := range target.NetworkSettings.Ports { |
| 420 | + | if targetPort == port.Port() { |
| 421 | + | return bindings |
| 422 | + | } |
| 423 | + | } |
| 424 | + | return nil |
| 425 | + | } |
| 426 | + | |
376 | 427 | | func targetNetworkByIP(target types.ContainerJSON, ip string) (string, error) { |
377 | 428 | | for name, net := range target.NetworkSettings.Networks { |
378 | 429 | | if net.IPAddress == ip { |
| skipped 3 lines |
382 | 433 | | return "", errors.New("cannot deduce target network by IP") |
383 | 434 | | } |
384 | 435 | | |
385 | | - | func startLocalForwarder( |
| 436 | + | func startLocalForwarders( |
| 437 | + | ctx context.Context, |
| 438 | + | cli cliutil.CLI, |
| 439 | + | client dockerclient.CommonAPIClient, |
| 440 | + | target types.ContainerJSON, |
| 441 | + | locals []forwarding, |
| 442 | + | ) <-chan error { |
| 443 | + | doneCh := make(chan error, 1) |
| 444 | + | |
| 445 | + | go func() { |
| 446 | + | var errored bool |
| 447 | + | var wg sync.WaitGroup |
| 448 | + | |
| 449 | + | for _, fwd := range locals { |
| 450 | + | wg.Add(1) |
| 451 | + | |
| 452 | + | go func() { |
| 453 | + | defer wg.Done() |
| 454 | + | |
| 455 | + | if err := runLocalForwarder(ctx, cli, client, target, fwd); err != nil { |
| 456 | + | logrus.Debugf("Forwarding error: %s", err) |
| 457 | + | errored = true |
| 458 | + | } |
| 459 | + | }() |
| 460 | + | } |
| 461 | + | |
| 462 | + | wg.Wait() |
| 463 | + | if errored { |
| 464 | + | doneCh <- errors.New("one or more forwarders failed") |
| 465 | + | } |
| 466 | + | close(doneCh) |
| 467 | + | }() |
| 468 | + | |
| 469 | + | return doneCh |
| 470 | + | } |
| 471 | + | |
| 472 | + | func runLocalForwarder( |
386 | 473 | | ctx context.Context, |
| 474 | + | cli cliutil.CLI, |
387 | 475 | | client dockerclient.CommonAPIClient, |
388 | 476 | | target types.ContainerJSON, |
389 | 477 | | fwd forwarding, |
390 | 478 | | ) error { |
391 | 479 | | if len(fwd.remoteHost) == 0 { |
392 | | - | forwarder, err := startLocalForwarderToTargetPublicPort( |
| 480 | + | remoteIP, err := unambiguousIP(target) |
| 481 | + | if err != nil { |
| 482 | + | return err |
| 483 | + | } |
| 484 | + | return runLocalForwarderToTargetPublicPort( |
393 | 485 | | ctx, |
| 486 | + | cli, |
394 | 487 | | client, |
395 | 488 | | target, |
396 | 489 | | fwd.localHost, |
397 | 490 | | fwd.localPort, |
| 491 | + | remoteIP, |
398 | 492 | | fwd.remotePort, |
399 | 493 | | ) |
400 | | - | if err != nil { |
401 | | - | return err |
| 494 | + | } |
| 495 | + | |
| 496 | + | if remoteIP, err := lookupTargetIP(target, fwd.remoteHost); err == nil { |
| 497 | + | return runLocalForwarderToTargetPublicPort( |
| 498 | + | ctx, |
| 499 | + | cli, |
| 500 | + | client, |
| 501 | + | target, |
| 502 | + | fwd.localHost, |
| 503 | + | fwd.localPort, |
| 504 | + | remoteIP, |
| 505 | + | fwd.remotePort, |
| 506 | + | ) |
| 507 | + | } |
| 508 | + | |
| 509 | + | // TODO: runLocalForwarderThroughTargetNetns() |
| 510 | + | return errors.New("implement me!") |
| 511 | + | } |
| 512 | + | |
| 513 | + | func runLocalForwarderToTargetPublicPort( |
| 514 | + | ctx context.Context, |
| 515 | + | cli cliutil.CLI, |
| 516 | + | client dockerclient.CommonAPIClient, |
| 517 | + | target types.ContainerJSON, |
| 518 | + | localHost string, |
| 519 | + | localPort string, |
| 520 | + | remoteIP string, |
| 521 | + | remotePort string, |
| 522 | + | ) error { |
| 523 | + | // TODO: Try start() N times. |
| 524 | + | forwarder, err := startLocalForwarderToTargetPublicPort( |
| 525 | + | ctx, |
| 526 | + | cli, |
| 527 | + | client, |
| 528 | + | target, |
| 529 | + | localHost, |
| 530 | + | localPort, |
| 531 | + | remoteIP, |
| 532 | + | remotePort, |
| 533 | + | ) |
| 534 | + | if err != nil { |
| 535 | + | return err |
| 536 | + | } |
| 537 | + | |
| 538 | + | killForwarder := func() { |
| 539 | + | ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) |
| 540 | + | defer cancel() |
| 541 | + | |
| 542 | + | if err := client.ContainerKill(ctx, forwarder.ID, "KILL"); err != nil { |
| 543 | + | logrus.Debugf("Cannot kill forwarder container: %s", err) |
402 | 544 | | } |
403 | | - | return maintainLocalForwarderToTargetPublicPort(ctx, client, target, forwarder) |
404 | 545 | | } |
405 | 546 | | |
406 | | - | // TODO: startLocalForwarderThroughTargetNetns() |
407 | | - | return errors.New("Implement me!") |
| 547 | + | fwderStatusCh, fwderErrCh := client.ContainerWait( |
| 548 | + | ctx, |
| 549 | + | forwarder.ID, |
| 550 | + | container.WaitConditionNotRunning, |
| 551 | + | ) |
| 552 | + | |
| 553 | + | // TODO: If a forwarder was alive long enough, we may want to restart it w/o |
| 554 | + | // decreasing the number of attempts. |
| 555 | + | select { |
| 556 | + | case <-ctx.Done(): |
| 557 | + | killForwarder() |
| 558 | + | return nil |
| 559 | + | |
| 560 | + | case status := <-fwderStatusCh: |
| 561 | + | return fmt.Errorf( |
| 562 | + | "forwarder %s exited with code %d: %w", |
| 563 | + | forwarder.ID, status.StatusCode, status.Error, |
| 564 | + | ) |
| 565 | + | |
| 566 | + | case err := <-fwderErrCh: |
| 567 | + | logrus.Debugf("Forwarder error: %s", err) |
| 568 | + | killForwarder() // Just in case... |
| 569 | + | return fmt.Errorf("forwarder %s hiccuped: %w", forwarder.ID, err) |
| 570 | + | } |
408 | 571 | | } |
409 | 572 | | |
410 | 573 | | func startLocalForwarderToTargetPublicPort( |
411 | 574 | | ctx context.Context, |
| 575 | + | cli cliutil.CLI, |
412 | 576 | | client dockerclient.CommonAPIClient, |
413 | 577 | | target types.ContainerJSON, |
414 | 578 | | localHost string, |
415 | 579 | | localPort string, |
| 580 | + | remoteIP string, |
416 | 581 | | remotePort string, |
417 | | - | ) (forwarder types.ContainerJSON, err error) { |
418 | | - | remoteIP, err := unambiguousIP(target) |
419 | | - | if err != nil { |
420 | | - | return forwarder, err |
| 582 | + | ) (types.ContainerJSON, error) { |
| 583 | + | if len(localHost) == 0 { |
| 584 | + | localHost = "127.0.0.1" |
421 | 585 | | } |
| 586 | + | |
422 | 587 | | network, err := targetNetworkByIP(target, remoteIP) |
423 | 588 | | if err != nil { |
424 | | - | return forwarder, err |
| 589 | + | return types.ContainerJSON{}, err |
425 | 590 | | } |
426 | 591 | | |
427 | | - | exposedPorts, portBindings, err := nat.ParsePortSpecs([]string{ |
428 | | - | localHost + ":" + localPort + ":" + remotePort, |
429 | | - | }) |
| 592 | + | portMapSpec := localHost + ":" + localPort + ":" + remotePort |
| 593 | + | exposedPorts, portBindings, err := nat.ParsePortSpecs([]string{portMapSpec}) |
430 | 594 | | if err != nil { |
431 | | - | return forwarder, err |
| 595 | + | return types.ContainerJSON{}, err |
432 | 596 | | } |
433 | 597 | | |
434 | 598 | | resp, err := client.ContainerCreate( |
| skipped 17 lines |
452 | 616 | | "cdebug-fwd-"+uuid.ShortID(), |
453 | 617 | | ) |
454 | 618 | | if err != nil { |
455 | | - | return forwarder, fmt.Errorf("cannot create forwarder container: %w", err) |
| 619 | + | return types.ContainerJSON{}, fmt.Errorf("cannot create forwarder container: %w", err) |
456 | 620 | | } |
457 | 621 | | |
458 | 622 | | if err := client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { |
459 | | - | return forwarder, fmt.Errorf("cannot start forwarder container: %w", err) |
| 623 | + | return types.ContainerJSON{}, fmt.Errorf("cannot start forwarder container: %w", err) |
460 | 624 | | } |
461 | 625 | | |
462 | | - | forwarder, err = client.ContainerInspect(ctx, resp.ID) |
| 626 | + | forwarder, err := client.ContainerInspect(ctx, resp.ID) |
463 | 627 | | if err != nil { |
464 | 628 | | return forwarder, fmt.Errorf("cannot inspect forwarder container: %w", err) |
465 | 629 | | } |
466 | | - | if forwarder.State != nil && forwarder.State.Running { |
467 | | - | return forwarder, fmt.Errorf("cannot inspect forwarder container: %w", err) |
| 630 | + | if forwarder.State == nil && !forwarder.State.Running { |
| 631 | + | return forwarder, fmt.Errorf("unexpected forwarder container state: %v", jsonutil.Dump(forwarder.State)) |
468 | 632 | | } |
469 | 633 | | |
470 | | - | return forwarder, nil |
471 | | - | } |
472 | | - | |
473 | | - | func maintainLocalForwarderToTargetPublicPort( |
474 | | - | ctx context.Context, |
475 | | - | client dockerclient.CommonAPIClient, |
476 | | - | target types.ContainerJSON, |
477 | | - | forwarder types.ContainerJSON, |
478 | | - | ) error { |
479 | | - | targetStatusCh, targetErrCh := client.ContainerWait(ctx, target.ID, container.WaitConditionNotRunning) |
480 | | - | fwderStatusCh, fwderErrCh := client.ContainerWait(ctx, forwarder.ID, container.WaitConditionNotRunning) |
481 | | - | |
482 | | - | defer func() { |
483 | | - | if err := client.ContainerKill(ctx, forwarder.ID, "KILL"); err != nil { |
484 | | - | logrus.Debugf("Cannot kill forwarder container: %s", err) |
| 634 | + | if len(localPort) == 0 { |
| 635 | + | bindings := lookupPortBindings(forwarder, remotePort) |
| 636 | + | if len(bindings) == 0 { |
| 637 | + | logrus.Debugf("Empty bindings for port map spec %s in forwarder %s", portMapSpec, forwarder.ID) |
| 638 | + | localPort = "<unknown>" |
| 639 | + | } else { |
| 640 | + | // Every forwarder should have just one port exposed. |
| 641 | + | localPort = bindings[0].HostPort |
485 | 642 | | } |
486 | | - | }() |
| 643 | + | } |
487 | 644 | | |
488 | | - | // TODO: If target exits - kill forwarder, wait for N seconds, restart forwarder (or exit). |
489 | | - | // If forwarder exits - try restarting it (< K times); otherwise, exit. |
490 | | - | for { |
491 | | - | select { |
492 | | - | case err := <-targetErrCh: |
493 | | - | if err != nil { |
494 | | - | cli.PrintAux("Exiting...") |
495 | | - | cancel() |
496 | | - | return fmt.Errorf("waiting for target container failed: %w", err) |
497 | | - | } |
498 | | - | |
499 | | - | case <-targetStatusCh: |
500 | | - | target, err = getRunningTarget(ctx, opts.target, opts.runningTimeout) |
501 | | - | if err != nil { |
502 | | - | cli.PrintAux("Exiting...") |
503 | | - | cancel() |
504 | | - | } else { |
505 | | - | targetStatusCh, targetErrCh = client.ContainerWait(ctx, target.ID, container.WaitConditionNotRunning) |
506 | | - | } |
507 | | - | } |
508 | | - | } |
| 645 | + | cli.PrintOut("Forwarding %s:%s to %s:%s\n", localHost, localPort, remoteIP, remotePort) |
| 646 | + | return forwarder, nil |
509 | 647 | | } |
510 | 648 | | |
511 | | - | // func localForwardingsToJSON(fwds []forwarding) string { |
512 | | - | // out := []map[string]string{} |
513 | | - | // for _, f := range fwds { |
514 | | - | // out = append(out, map[string]string{ |
515 | | - | // "localHost": f.localHost, |
516 | | - | // "localPort": f.localPort, |
517 | | - | // "remoteHost": f.remoteHost, |
518 | | - | // "remotePort": f.remotePort, |
519 | | - | // }) |
520 | | - | // } |
521 | | - | // return jsonutil.DumpIndent(out) |
522 | | - | // } |
523 | | - | // |
524 | | - | // func localForwardingsToText(fwds []forwarding) string { |
525 | | - | // out := []string{} |
526 | | - | // for _, f := range fwds { |
527 | | - | // out = append(out, fmt.Sprintf( |
528 | | - | // "Forwarding %s:%s to %s:%s", |
529 | | - | // f.localHost, f.localPort, f.remoteHost, f.remotePort, |
530 | | - | // )) |
531 | | - | // } |
532 | | - | // return strings.Join(out, "\n") |
533 | | - | // } |
534 | | - | |