Projects STRLCPY maigret Commits 314eb25d
🤬
  • Created async requests executors, some sites fixes

  • Loading...
  • Soxoj committed 4 years ago
    314eb25d
    1 parent faa03b62
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■
    maigret/checking.py
    skipped 2 lines
    3 3  import re
    4 4  import ssl
    5 5  import sys
     6 +import tqdm
     7 +import time
     8 +from typing import Callable, Any, Iterable, Tuple
    6 9   
    7 10  import aiohttp
    8 11  import tqdm.asyncio
    skipped 27 lines
    36 39  }
    37 40   
    38 41  unsupported_characters = '#'
     42 + 
     43 +QueryDraft = Tuple[Callable, Any, Any]
     44 +QueriesDraft = Iterable[QueryDraft]
     45 + 
     46 +class AsyncExecutor:
     47 + def __init__(self, *args, **kwargs):
     48 + self.logger = kwargs['logger']
     49 + 
     50 + async def run(self, tasks: QueriesDraft):
     51 + start_time = time.time()
     52 + results = await self._run(tasks)
     53 + self.execution_time = time.time() - start_time
     54 + self.logger.debug(f'Spent time: {self.execution_time}')
     55 + return results
     56 + 
     57 + async def _run(self, tasks: QueriesDraft):
     58 + await asyncio.sleep(0)
     59 + 
     60 + 
     61 +class AsyncioSimpleExecutor(AsyncExecutor):
     62 + def __init__(self, *args, **kwargs):
     63 + super().__init__(*args, **kwargs)
     64 + 
     65 + async def _run(self, tasks: QueriesDraft):
     66 + futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
     67 + return await asyncio.gather(*futures)
     68 + 
     69 + 
     70 +class AsyncioProgressbarExecutor(AsyncExecutor):
     71 + def __init__(self, *args, **kwargs):
     72 + super().__init__(*args, **kwargs)
     73 + 
     74 + async def _run(self, tasks: QueriesDraft):
     75 + futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
     76 + results = []
     77 + for f in tqdm.asyncio.tqdm.as_completed(futures):
     78 + results.append(await f)
     79 + return results
     80 + 
     81 + 
     82 +class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
     83 + def __init__(self, *args, **kwargs):
     84 + super().__init__(*args, **kwargs)
     85 + self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1))
     86 + 
     87 + async def _run(self, tasks: QueriesDraft):
     88 + async def _wrap_query(q: QueryDraft):
     89 + async with self.semaphore:
     90 + f, args, kwargs = q
     91 + return await f(*args, **kwargs)
     92 + 
     93 + async def semaphore_gather(tasks: QueriesDraft):
     94 + coros = [_wrap_query(q) for q in tasks]
     95 + results = []
     96 + for f in tqdm.asyncio.tqdm.as_completed(coros):
     97 + results.append(await f)
     98 + return results
     99 + 
     100 + return await semaphore_gather(tasks)
     101 + 
     102 + 
     103 +class AsyncioProgressbarQueueExecutor(AsyncExecutor):
     104 + def __init__(self, *args, **kwargs):
     105 + super().__init__(*args, **kwargs)
     106 + self.workers_count = kwargs.get('in_parallel', 10)
     107 + self.progress_func = kwargs.get('progress_func', tqdm.tqdm)
     108 + self.queue = asyncio.Queue(self.workers_count)
     109 + 
     110 + async def worker(self):
     111 + while True:
     112 + f, args, kwargs = await self.queue.get()
     113 + result = await f(*args, **kwargs)
     114 + self.results.append(result)
     115 + self.progress.update(1)
     116 + self.queue.task_done()
     117 + 
     118 + async def _run(self, tasks: QueriesDraft):
     119 + self.results = []
     120 + workers = [asyncio.create_task(self.worker())
     121 + for _ in range(self.workers_count)]
     122 + task_list = list(tasks)
     123 + self.progress = self.progress_func(total=len(task_list))
     124 + for t in task_list:
     125 + await self.queue.put(t)
     126 + await self.queue.join()
     127 + for w in workers:
     128 + w.cancel()
     129 + self.progress.close()
     130 + return self.results
    39 131   
    40 132   
    41 133  async def get_response(request_future, site_name, logger):
    skipped 45 lines
    87 179   return html_text, status_code, error_text, expection_text
    88 180   
    89 181   
    90  -async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify):
    91  - async with semaphore:
    92  - site_obj = site_dict[sitename]
    93  - future = site_obj.request_future
    94  - if not future:
    95  - # ignore: search by incompatible id type
    96  - return
     182 +async def update_site_dict_from_response(sitename, site_dict, results_info, logger, query_notify):
     183 + site_obj = site_dict[sitename]
     184 + future = site_obj.request_future
     185 + if not future:
     186 + # ignore: search by incompatible id type
     187 + return
    97 188   
    98  - response = await get_response(request_future=future,
    99  - site_name=sitename,
    100  - logger=logger)
     189 + response = await get_response(request_future=future,
     190 + site_name=sitename,
     191 + logger=logger)
    101 192   
    102  - site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj)
     193 + return sitename, process_site_result(response, query_notify, logger, results_info, site_obj)
    103 194   
    104 195   
    105 196  # TODO: move to separate class
    skipped 348 lines
    454 545   # Add this site's results into final dictionary with all of the other results.
    455 546   results_total[site_name] = results_site
    456 547   
    457  - # TODO: move into top-level function
    458  - 
    459  - sem = asyncio.Semaphore(max_connections)
    460  - 
    461  - tasks = []
     548 + coroutines = []
    462 549   for sitename, result_obj in results_total.items():
    463  - update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify)
    464  - future = asyncio.ensure_future(update_site_coro)
    465  - tasks.append(future)
     550 + coroutines.append((update_site_dict_from_response, [sitename, site_dict, result_obj, logger, query_notify], {}))
    466 551   
    467 552   if no_progressbar:
    468  - await asyncio.gather(*tasks)
     553 + executor = AsyncioSimpleExecutor(logger=logger)
    469 554   else:
    470  - for f in tqdm.asyncio.tqdm.as_completed(tasks, timeout=timeout):
    471  - try:
    472  - await f
    473  - except asyncio.exceptions.TimeoutError:
    474  - # TODO: write timeout to results
    475  - pass
     555 + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=max_connections, timeout=timeout+0.5)
     556 + 
     557 + results = await executor.run(coroutines)
    476 558   
    477 559   await session.close()
    478 560   
    479 561   # Notify caller that all queries are finished.
    480 562   query_notify.finish()
    481 563   
    482  - return results_total
     564 + data = {}
     565 + for result in results:
     566 + # TODO: still can be empty
     567 + if result:
     568 + try:
     569 + data[result[0]] = result[1]
     570 + except Exception as e:
     571 + logger.error(e, exc_info=True)
     572 + logger.info(result)
     573 + 
     574 + return data
    483 575   
    484 576   
    485 577  def timeout_check(value):
    skipped 130 lines
  • ■ ■ ■ ■ ■
    maigret/maigret.py
    skipped 260 lines
    261 261   print('Maigret sites database self-checking...')
    262 262   is_need_update = await self_check(db, site_data, logger, max_connections=args.connections)
    263 263   if is_need_update:
    264  - if input('Do you want to save changes permanently? [yYnN]\n').lower() == 'y':
     264 + if input('Do you want to save changes permanently? [Yn]\n').lower() == 'y':
    265 265   db.save_to_file(args.db_file)
    266 266   print('Database was successfully updated.')
    267 267   else:
    skipped 69 lines
    337 337   max_connections=args.connections,
    338 338   )
    339 339   
    340  - username_result = (username, id_type, results)
    341 340   general_results.append((username, id_type, results))
    342 341   
    343 342   # TODO: tests
    344 343   for website_name in results:
    345 344   dictionary = results[website_name]
    346 345   # TODO: fix no site data issue
    347  - if not dictionary:
     346 + if not dictionary or not recursive_search_enabled:
    348 347   continue
    349 348   
    350 349   new_usernames = dictionary.get('ids_usernames')
    skipped 64 lines
  • ■ ■ ■ ■ ■ ■
    maigret/resources/data.json
    skipped 2425 lines
    2426 2426   },
    2427 2427   "Ccmixter": {
    2428 2428   "tags": [
    2429  - "global",
    2430  - "in",
    2431 2429   "us"
    2432 2430   ],
    2433 2431   "checkType": "message",
    skipped 9 lines
    2443 2441   },
    2444 2442   "Cent": {
    2445 2443   "tags": [
    2446  - "in",
    2447  - "mx",
    2448  - "tw",
    2449  - "us"
     2444 + "us",
     2445 + "art",
     2446 + "writing"
    2450 2447   ],
     2448 + "urlProbe": "https://beta.cent.co/data/user/profile?userHandles={username}",
    2451 2449   "checkType": "message",
    2452  - "absenceStrs": "<title>Cent</title>",
     2450 + "presenseStrs": [
     2451 + "display_name"
     2452 + ],
     2453 + "absenceStrs": [
     2454 + "\"results\":[]"
     2455 + ],
    2453 2456   "alexaRank": 31175,
    2454 2457   "url": "https://beta.cent.co/@{username}",
    2455 2458   "urlMain": "https://cent.co/",
    skipped 9257 lines
    11713 11716   "usernameClaimed": "vitaline",
    11714 11717   "usernameUnclaimed": "noonewouldeverusethis7"
    11715 11718   },
    11716  - "Sevenforums": {
     11719 + "SevenForums": {
    11717 11720   "tags": [
    11718 11721   "gb",
    11719 11722   "us"
    11720 11723   ],
    11721  - "checkType": "message",
    11722  - "absenceStrs": "<title>Just a moment...</title>",
     11724 + "engine": "vBulletin",
    11723 11725   "alexaRank": 20828,
    11724  - "url": "https://www.sevenforums.com/members/{username}.html",
    11725 11726   "urlMain": "https://www.sevenforums.com",
    11726 11727   "usernameClaimed": "adam",
    11727 11728   "usernameUnclaimed": "noonewouldeverusethis7"
    skipped 621 lines
    12349 12350   "us"
    12350 12351   ],
    12351 12352   "headers": {
    12352  - "authorization": "Bearer BQA6sdhtUg3hadjln7DCoAK6sLn7KrHfsn2DObW2gr-W3HgF0h1KZGVYgwispRDR1tqRntVeTd0Duvb2q4g"
     12353 + "authorization": "Bearer BQBQDhCkzUqE4QBPyqrSyRZbBRp5pdttS7rj9J8qT7OllWuJazqP6CcE-1eGcNoRkxNl9Ds9JCdgY3soi6U"
    12353 12354   },
    12354 12355   "errors": {
    12355 12356   "Spotify is currently not available in your country.": "Access denied in your country, use proxy/vpn"
    skipped 418 lines
    12774 12775   "usernameUnclaimed": "noonewouldeverusethis7"
    12775 12776   },
    12776 12777   "TJournal": {
     12778 + "similarSearch": true,
    12777 12779   "tags": [
    12778 12780   "ru"
    12779 12781   ],
    skipped 120 lines
    12900 12902   "usernameClaimed": "taplink.ru",
    12901 12903   "usernameUnclaimed": "noonewouldeverusethis77777"
    12902 12904   },
    12903  - "Taringa": {
    12904  - "tags": [
    12905  - "ar"
    12906  - ],
    12907  - "checkType": "message",
    12908  - "absenceStrs": "Moved Permanently",
    12909  - "alexaRank": 4125,
    12910  - "url": "https://www.taringa.net/{username}",
    12911  - "urlMain": "https://taringa.net/",
    12912  - "usernameClaimed": "blue",
    12913  - "usernameUnclaimed": "noonewouldeverusethis7"
    12914  - },
    12915 12905   "TechPowerUp": {
    12916 12906   "tags": [
    12917 12907   "us"
    skipped 772 lines
    13690 13680   "sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"",
    13691 13681   "authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA",
    13692 13682   "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
    13693  - "x-guest-token": "1372637128920825857"
     13683 + "x-guest-token": "1373308975769391104"
    13694 13684   },
    13695 13685   "errors": {
    13696 13686   "Bad guest token": "x-guest-token update required"
    skipped 168 lines
    13865 13855   "usernameUnclaimed": "noonewouldeverusethis7"
    13866 13856   },
    13867 13857   "VC.ru": {
     13858 + "similarSearch": true,
    13868 13859   "tags": [
    13869 13860   "ru"
    13870 13861   ],
    skipped 191 lines
    14062 14053   "video"
    14063 14054   ],
    14064 14055   "headers": {
    14065  - "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYxMDcyNjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.kzWxBf1qCJwjpZYUP6w-Pf4VptBMKpKUaMw8VnYwtPU"
     14056 + "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYyNjMwODAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.YKtLE0-AGmaXJNF99dVKjPW8z5_-wDs6tnnjVOybDaQ"
    14066 14057   },
    14067 14058   "activation": {
    14068 14059   "url": "https://vimeo.com/_rv/viewer",
    skipped 22 lines
    14091 14082   "usernameClaimed": "blue",
    14092 14083   "usernameUnclaimed": "noonewouldeverusethis7"
    14093 14084   },
    14094  - "Virtualireland": {
     14085 + "VirtualIreland": {
    14095 14086   "tags": [
    14096 14087   "ie",
    14097 14088   "ru"
    skipped 9559 lines
    23657 23648   "urlMain": "https://linuxpip.org",
    23658 23649   "usernameClaimed": "diehard",
    23659 23650   "usernameUnclaimed": "noonewouldeverusethis7"
     23651 + },
     23652 + "Taringa": {
     23653 + "checkType": "message",
     23654 + "presenseStrs": [
     23655 + "User",
     23656 + " user-username",
     23657 + " UserFeed"
     23658 + ],
     23659 + "absenceStrs": [
     23660 + "problema"
     23661 + ],
     23662 + "url": "https://www.taringa.net/{username}",
     23663 + "urlMain": "https://www.taringa.net",
     23664 + "usernameClaimed": "UniversoGIA",
     23665 + "usernameUnclaimed": "noonewouldeverusethis7"
    23660 23666   }
    23661 23667   },
    23662 23668   "engines": {
    skipped 64 lines
    23727 23733   ],
    23728 23734   "checkType": "message",
    23729 23735   "errors": {
     23736 + "\u041f\u0440\u043e\u0441\u0442\u0438\u0442\u0435, \u043d\u043e \u0432\u0430\u0448 IP \u0432 \u0441\u043f\u0438\u0441\u043a\u0435 \u0437\u0430\u043f\u0440\u0435\u0449\u0435\u043d\u043d\u044b\u0445 \u0430\u0434\u043c\u0438\u043d\u0438\u0441\u0442\u0440\u0430\u0446\u0438\u0435\u0439 \u0444\u043e\u0440\u0443\u043c\u0430": "IP ban",
    23730 23737   "You have been banned": "IP ban",
    23731 23738   "The administrator has banned your IP address": "IP ban",
    23732 23739   "\u0418\u0437\u0432\u0438\u043d\u0438\u0442\u0435, \u0441\u0435\u0440\u0432\u0435\u0440 \u043f\u0435\u0440\u0435\u0433\u0440\u0443\u0436\u0435\u043d. \u041f\u043e\u0436\u0430\u043b\u0443\u0439\u0441\u0442\u0430, \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0439\u0442\u0435 \u0437\u0430\u0439\u0442\u0438 \u043f\u043e\u0437\u0436\u0435.": "Server is overloaded"
    skipped 29 lines
    23762 23769   "error404"
    23763 23770   ],
    23764 23771   "checkType": "message",
     23772 + "requestHeadOnly": false,
    23765 23773   "url": "{urlMain}/author/{username}/"
    23766 23774   },
    23767 23775   "presenseStrs": [
    23768 23776   "/wp-admin",
    23769 23777   "/wp-includes/wlwmanifest.xml"
     23778 + ]
     23779 + },
     23780 + "Flarum": {
     23781 + "name": "Flarum",
     23782 + "site": {
     23783 + "presenseStrs": [
     23784 + "\"attributes\":{\"username\""
     23785 + ],
     23786 + "absenceStrs": [
     23787 + "NotFound"
     23788 + ],
     23789 + "checkType": "message"
     23790 + },
     23791 + "presenseStrs": [
     23792 + "flarum-loading-error"
    23770 23793   ]
    23771 23794   },
    23772 23795   "engine404": {
    skipped 29 lines
  • ■ ■ ■ ■
    maigret/submit.py
    skipped 169 lines
    170 170   print(f'Sorry, we couldn\'t find params to detect account presence/absence in {site.name}.')
    171 171   print('Try to run this mode again and increase features count or choose others.')
    172 172   else:
    173  - if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [yY] ') in 'yY':
     173 + if input(f'Site {site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] ').lower() in 'y':
    174 174   db.update_site(site)
    175 175   return True
    176 176   
    skipped 2 lines
  • ■ ■ ■ ■ ■ ■
    tests/test_checking.py
     1 +"""Maigret checking logic test functions"""
     2 +import pytest
     3 +import asyncio
     4 +import logging
     5 +from maigret.checking import AsyncioSimpleExecutor, AsyncioProgressbarExecutor, AsyncioProgressbarSemaphoreExecutor, AsyncioProgressbarQueueExecutor
     6 + 
     7 +logger = logging.getLogger(__name__)
     8 + 
     9 +async def func(n):
     10 + await asyncio.sleep(0.1 * (n % 3))
     11 + return n
     12 + 
     13 + 
     14 +@pytest.mark.asyncio
     15 +async def test_simple_asyncio_executor():
     16 + tasks = [(func, [n], {}) for n in range(10)]
     17 + executor = AsyncioSimpleExecutor(logger=logger)
     18 + assert await executor.run(tasks) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
     19 + assert executor.execution_time > 0.2
     20 + assert executor.execution_time < 0.3
     21 + 
     22 +@pytest.mark.asyncio
     23 +async def test_asyncio_progressbar_executor():
     24 + tasks = [(func, [n], {}) for n in range(10)]
     25 + 
     26 + executor = AsyncioProgressbarExecutor(logger=logger)
     27 + # no guarantees for the results order
     28 + assert sorted(await executor.run(tasks)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
     29 + assert executor.execution_time > 0.2
     30 + assert executor.execution_time < 0.3
     31 + 
     32 + 
     33 +@pytest.mark.asyncio
     34 +async def test_asyncio_progressbar_semaphore_executor():
     35 + tasks = [(func, [n], {}) for n in range(10)]
     36 + 
     37 + executor = AsyncioProgressbarSemaphoreExecutor(logger=logger, in_parallel=5)
     38 + # no guarantees for the results order
     39 + assert sorted(await executor.run(tasks)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
     40 + assert executor.execution_time > 0.2
     41 + assert executor.execution_time < 0.4
     42 + 
     43 + 
     44 +@pytest.mark.asyncio
     45 +async def test_asyncio_progressbar_queue_executor():
     46 + tasks = [(func, [n], {}) for n in range(10)]
     47 + 
     48 + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=2)
     49 + assert await executor.run(tasks) == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
     50 + assert executor.execution_time > 0.5
     51 + assert executor.execution_time < 0.6
     52 + 
     53 + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=3)
     54 + assert await executor.run(tasks) == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8]
     55 + assert executor.execution_time > 0.4
     56 + assert executor.execution_time < 0.5
     57 + 
     58 + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=5)
     59 + assert await executor.run(tasks) == [0, 3, 6, 1, 4, 7, 9, 2, 5, 8]
     60 + assert executor.execution_time > 0.3
     61 + assert executor.execution_time < 0.4
     62 + 
     63 + executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=10)
     64 + assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
     65 + assert executor.execution_time > 0.2
     66 + assert executor.execution_time < 0.3
Please wait...
Page is in error, reload to recover