Projects STRLCPY maigret Commits 009d51c3
🤬
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■
    maigret/checking.py
    skipped 36 lines
    37 37   "uidme_uguid",
    38 38  )
    39 39   
    40  -unsupported_characters = "#"
     40 +BAD_CHARS = "#"
    41 41   
    42 42   
    43 43  async def get_response(request_future, logger) -> Tuple[str, int, Optional[CheckError]]:
    skipped 683 lines
  • ■ ■ ■ ■ ■
    maigret/errors.py
    1 1  from typing import Dict, List, Any
    2 2   
    3 3  from .result import QueryResult
     4 +from .types import QueryResultWrapper
    4 5   
    5 6   
    6 7  # error got as a result of completed search query
    skipped 97 lines
    104 105   return ERRORS_TYPES.get(err_type, '')
    105 106   
    106 107   
    107  -def extract_and_group(search_res: dict) -> List[Dict[str, Any]]:
     108 +def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]:
    108 109   errors_counts: Dict[str, int] = {}
    109  - for r in search_res:
     110 + for r in search_res.values():
    110 111   if r and isinstance(r, dict) and r.get('status'):
    111 112   if not isinstance(r['status'], QueryResult):
    112 113   continue
    skipped 18 lines
  • ■ ■ ■ ■ ■ ■
    maigret/maigret.py
    skipped 7 lines
    8 8  import sys
    9 9  import platform
    10 10  from argparse import ArgumentParser, RawDescriptionHelpFormatter
     11 +from typing import List, Tuple
    11 12   
    12 13  import requests
    13 14  from socid_extractor import extract, parse, __version__ as socid_version
    skipped 2 lines
    16 17   timeout_check,
    17 18   SUPPORTED_IDS,
    18 19   self_check,
    19  - unsupported_characters,
     20 + BAD_CHARS,
    20 21   maigret,
    21 22  )
    22 23  from . import errors
    skipped 10 lines
    33 34  )
    34 35  from .sites import MaigretDatabase
    35 36  from .submit import submit_dialog
     37 +from .types import QueryResultWrapper
    36 38  from .utils import get_dict_ascii_tree
    37 39   
    38 40  __version__ = '0.2.1'
    39 41   
    40 42   
    41  -def notify_about_errors(search_results, query_notify):
    42  - errs = errors.extract_and_group(search_results.values())
     43 +def notify_about_errors(search_results: QueryResultWrapper, query_notify):
     44 + errs = errors.extract_and_group(search_results)
    43 45   was_errs_displayed = False
    44 46   for e in errs:
    45 47   if not errors.is_important(e):
    skipped 10 lines
    56 58   query_notify.warning(
    57 59   'You can see detailed site check errors with a flag `--print-errors`'
    58 60   )
     61 + 
     62 + 
     63 +def extract_ids_from_page(url, logger, timeout=5) -> dict:
     64 + results = {}
     65 + # url, headers
     66 + reqs: List[Tuple[str, set]] = [(url, set())]
     67 + try:
     68 + # temporary workaround for URL mutations MVP
     69 + from socid_extractor import mutate_url
     70 + 
     71 + reqs += list(mutate_url(url))
     72 + except Exception as e:
     73 + logger.warning(e)
     74 + 
     75 + for req in reqs:
     76 + url, headers = req
     77 + print(f'Scanning webpage by URL {url}...')
     78 + page, _ = parse(url, cookies_str='', headers=headers, timeout=timeout)
     79 + logger.debug(page)
     80 + info = extract(page)
     81 + if not info:
     82 + print('Nothing extracted')
     83 + else:
     84 + print(get_dict_ascii_tree(info.items(), new_line=False), ' ')
     85 + for k, v in info.items():
     86 + if 'username' in k:
     87 + results[v] = 'username'
     88 + if k in SUPPORTED_IDS:
     89 + results[v] = k
     90 + 
     91 + return results
     92 + 
     93 + 
     94 +def extract_ids_from_results(results: QueryResultWrapper, db: MaigretDatabase) -> dict:
     95 + ids_results = {}
     96 + for website_name in results:
     97 + dictionary = results[website_name]
     98 + # TODO: fix no site data issue
     99 + if not dictionary:
     100 + continue
     101 + 
     102 + new_usernames = dictionary.get('ids_usernames')
     103 + if new_usernames:
     104 + for u, utype in new_usernames.items():
     105 + ids_results[u] = utype
     106 + 
     107 + for url in dictionary.get('ids_links', []):
     108 + for s in db.sites:
     109 + u = s.detect_username(url)
     110 + if u:
     111 + ids_results[u] = 'username'
     112 + return ids_results
    59 113   
    60 114   
    61 115  def setup_arguments_parser():
    skipped 330 lines
    392 446   print("Using the proxy: " + args.proxy)
    393 447   
    394 448   if args.parse_url:
    395  - # url, headers
    396  - reqs = [(args.parse_url, set())]
    397  - try:
    398  - # temporary workaround for URL mutations MVP
    399  - from socid_extractor import mutate_url
    400  - 
    401  - reqs += list(mutate_url(args.parse_url))
    402  - except Exception as e:
    403  - logger.warning(e)
    404  - pass
    405  - 
    406  - for req in reqs:
    407  - url, headers = req
    408  - print(f'Scanning webpage by URL {url}...')
    409  - page, _ = parse(url, cookies_str='', headers=headers)
    410  - info = extract(page)
    411  - if not info:
    412  - print('Nothing extracted')
    413  - else:
    414  - print(get_dict_ascii_tree(info.items(), new_line=False), ' ')
    415  - for k, v in info.items():
    416  - if 'username' in k:
    417  - usernames[v] = 'username'
    418  - if k in SUPPORTED_IDS:
    419  - usernames[v] = k
     449 + extracted_ids = extract_ids_from_page(args.parse_url, logger, timeout=args.timeout)
     450 + usernames.update(extracted_ids)
    420 451   
    421 452   if args.tags:
    422 453   args.tags = list(set(str(args.tags).split(',')))
    skipped 48 lines
    471 502   print('Updates will be applied only for current search session.')
    472 503   print(db.get_scan_stats(site_data))
    473 504   
     505 + # Database statistics
    474 506   if args.stats:
    475 507   print(db.get_db_stats(db.sites_dict))
    476 508   
    skipped 3 lines
    480 512   # Define one report filename template
    481 513   report_filepath_tpl = os.path.join(args.folderoutput, 'report_{username}{postfix}')
    482 514   
    483  - # Database stats
    484  - # TODO: verbose info about filtered sites
    485  - # enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
    486  - # print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
    487  - 
    488 515   if usernames == {}:
    489 516   # magic params to exit after init
    490 517   query_notify.warning('No usernames to check, exiting.')
    skipped 2 lines
    493 520   if not site_data:
    494 521   query_notify.warning('No sites to check, exiting!')
    495 522   sys.exit(2)
    496  - else:
     523 + 
     524 + query_notify.warning(
     525 + f'Starting a search on top {len(site_data)} sites from the Maigret database...'
     526 + )
     527 + if not args.all_sites:
    497 528   query_notify.warning(
    498  - f'Starting a search on top {len(site_data)} sites from the Maigret database...'
     529 + 'You can run search by full list of sites with flag `-a`', '!'
    499 530   )
    500  - if not args.all_sites:
    501  - query_notify.warning(
    502  - 'You can run search by full list of sites with flag `-a`', '!'
    503  - )
    504 531   
    505 532   already_checked = set()
    506 533   general_results = []
    skipped 4 lines
    511 538   
    512 539   if username.lower() in already_checked:
    513 540   continue
    514  - else:
    515  - already_checked.add(username.lower())
     541 + 
     542 + already_checked.add(username.lower())
    516 543   
    517 544   if username in args.ignore_ids_list:
    518 545   query_notify.warning(
    skipped 2 lines
    521 548   continue
    522 549   
    523 550   # check for characters do not supported by sites generally
    524  - found_unsupported_chars = set(unsupported_characters).intersection(
    525  - set(username)
    526  - )
    527  - 
     551 + found_unsupported_chars = set(BAD_CHARS).intersection(set(username))
    528 552   if found_unsupported_chars:
    529 553   pretty_chars_str = ','.join(
    530 554   map(lambda s: f'"{s}"', found_unsupported_chars)
    skipped 27 lines
    558 582   general_results.append((username, id_type, results))
    559 583   
    560 584   # TODO: tests
    561  - for website_name in results:
    562  - dictionary = results[website_name]
    563  - # TODO: fix no site data issue
    564  - if not dictionary or not recursive_search_enabled:
    565  - continue
    566  - 
    567  - new_usernames = dictionary.get('ids_usernames')
    568  - if new_usernames:
    569  - for u, utype in new_usernames.items():
    570  - usernames[u] = utype
    571  - 
    572  - for url in dictionary.get('ids_links', []):
    573  - for s in db.sites:
    574  - u = s.detect_username(url)
    575  - if u:
    576  - usernames[u] = 'username'
     585 + if recursive_search_enabled:
     586 + extracted_ids = extract_ids_from_results(results, db)
     587 + usernames.update(extracted_ids)
    577 588   
    578 589   # reporting for a one username
    579 590   if args.xmind:
    skipped 56 lines
  • ■ ■ ■ ■ ■
    maigret/report.py
    skipped 2 lines
    3 3  import json
    4 4  import logging
    5 5  import os
    6  -from argparse import ArgumentTypeError
    7 6  from datetime import datetime
    8 7  from typing import Dict, Any
    9 8   
    skipped 353 lines
  • ■ ■ ■ ■
    tests/conftest.py
    skipped 11 lines
    12 12  CUR_PATH = os.path.dirname(os.path.realpath(__file__))
    13 13  JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json')
    14 14  TEST_JSON_FILE = os.path.join(CUR_PATH, 'db.json')
    15  -empty_mark = Mark('', [], {})
     15 +empty_mark = Mark('', (), {})
    16 16   
    17 17   
    18 18  def by_slow_marker(item):
    skipped 43 lines
  • ■ ■ ■ ■ ■ ■
    tests/test_maigret.py
    1 1  """Maigret main module test functions"""
    2 2  import asyncio
     3 +import copy
    3 4   
    4 5  import pytest
    5 6  from mock import Mock
    6 7   
    7  -from maigret.maigret import self_check, maigret
     8 +from maigret.maigret import self_check, maigret, extract_ids_from_page, extract_ids_from_results
    8 9  from maigret.sites import MaigretSite
    9 10  from maigret.result import QueryResult, QueryStatus
     11 + 
     12 + 
     13 +RESULTS_EXAMPLE = {
     14 + 'Reddit': {
     15 + 'cookies': None,
     16 + 'parsing_enabled': False,
     17 + 'url_main': 'https://www.reddit.com/',
     18 + 'username': 'Facebook',
     19 + },
     20 + 'GooglePlayStore': {
     21 + 'cookies': None,
     22 + 'http_status': 200,
     23 + 'is_similar': False,
     24 + 'parsing_enabled': False,
     25 + 'rank': 1,
     26 + 'url_main': 'https://play.google.com/store',
     27 + 'url_user': 'https://play.google.com/store/apps/developer?id=Facebook',
     28 + 'username': 'Facebook',
     29 + },
     30 +}
    10 31   
    11 32   
    12 33  @pytest.mark.slow
    skipped 100 lines
    113 134   assert results['Reddit'].get('future') is None
    114 135   del results['GooglePlayStore']['future']
    115 136   
    116  - assert results == {
    117  - 'Reddit': {
    118  - 'cookies': None,
    119  - 'parsing_enabled': False,
    120  - 'url_main': 'https://www.reddit.com/',
    121  - 'username': 'Facebook',
    122  - },
    123  - 'GooglePlayStore': {
    124  - 'cookies': None,
    125  - 'http_status': 200,
    126  - 'is_similar': False,
    127  - 'parsing_enabled': False,
    128  - 'rank': 1,
    129  - 'url_main': 'https://play.google.com/store',
    130  - 'url_user': 'https://play.google.com/store/apps/developer?id=Facebook',
    131  - 'username': 'Facebook',
    132  - },
    133  - }
     137 + assert results == RESULTS_EXAMPLE
     138 + 
     139 + 
     140 +@pytest.mark.slow
     141 +def test_extract_ids_from_page(test_db):
     142 + logger = Mock()
     143 + found_ids = extract_ids_from_page('https://www.reddit.com/user/test', logger)
     144 + assert found_ids == {'test': 'username'}
     145 + 
     146 + 
     147 +def test_extract_ids_from_results(test_db):
     148 + TEST_EXAMPLE = copy.deepcopy(RESULTS_EXAMPLE)
     149 + TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'}
     150 + TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2']
     151 + 
     152 + found_ids = extract_ids_from_results(TEST_EXAMPLE, test_db)
     153 + assert found_ids == {'test1': 'yandex_public_id', 'test2': 'username'}
    134 154   
Please wait...
Page is in error, reload to recover