Projects STRLCPY maigret Commits b269c4a8
🤬
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■ ■
    maigret/executors.py
     1 +import asyncio
     2 +import time
     3 +import tqdm
     4 +import sys
     5 +from typing import Iterable
     6 + 
     7 +from .types import QueryDraft
     8 + 
     9 + 
     10 +def create_task_func():
     11 + if sys.version_info.minor > 6:
     12 + create_asyncio_task = asyncio.create_task
     13 + else:
     14 + loop = asyncio.get_event_loop()
     15 + create_asyncio_task = loop.create_task
     16 + return create_asyncio_task
     17 + 
     18 + 
     19 +class AsyncExecutor:
     20 + def __init__(self, *args, **kwargs):
     21 + self.logger = kwargs['logger']
     22 + 
     23 + async def run(self, tasks: Iterable[QueryDraft]):
     24 + start_time = time.time()
     25 + results = await self._run(tasks)
     26 + self.execution_time = time.time() - start_time
     27 + self.logger.debug(f'Spent time: {self.execution_time}')
     28 + return results
     29 + 
     30 + async def _run(self, tasks: Iterable[QueryDraft]):
     31 + await asyncio.sleep(0)
     32 + 
     33 + 
     34 +class AsyncioSimpleExecutor(AsyncExecutor):
     35 + def __init__(self, *args, **kwargs):
     36 + super().__init__(*args, **kwargs)
     37 + 
     38 + async def _run(self, tasks: Iterable[QueryDraft]):
     39 + futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
     40 + return await asyncio.gather(*futures)
     41 + 
     42 + 
     43 +class AsyncioProgressbarExecutor(AsyncExecutor):
     44 + def __init__(self, *args, **kwargs):
     45 + super().__init__(*args, **kwargs)
     46 + 
     47 + async def _run(self, tasks: Iterable[QueryDraft]):
     48 + futures = [f(*args, **kwargs) for f, args, kwargs in tasks]
     49 + results = []
     50 + for f in tqdm.asyncio.tqdm.as_completed(futures):
     51 + results.append(await f)
     52 + return results
     53 + 
     54 + 
     55 +class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
     56 + def __init__(self, *args, **kwargs):
     57 + super().__init__(*args, **kwargs)
     58 + self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1))
     59 + 
     60 + async def _run(self, tasks: Iterable[QueryDraft]):
     61 + async def _wrap_query(q: QueryDraft):
     62 + async with self.semaphore:
     63 + f, args, kwargs = q
     64 + return await f(*args, **kwargs)
     65 + 
     66 + async def semaphore_gather(tasks: Iterable[QueryDraft]):
     67 + coros = [_wrap_query(q) for q in tasks]
     68 + results = []
     69 + for f in tqdm.asyncio.tqdm.as_completed(coros):
     70 + results.append(await f)
     71 + return results
     72 + 
     73 + return await semaphore_gather(tasks)
     74 + 
     75 + 
     76 +class AsyncioProgressbarQueueExecutor(AsyncExecutor):
     77 + def __init__(self, *args, **kwargs):
     78 + super().__init__(*args, **kwargs)
     79 + self.workers_count = kwargs.get('in_parallel', 10)
     80 + self.progress_func = kwargs.get('progress_func', tqdm.tqdm)
     81 + self.queue = asyncio.Queue(self.workers_count)
     82 + self.timeout = kwargs.get('timeout')
     83 + 
     84 + async def worker(self):
     85 + while True:
     86 + try:
     87 + f, args, kwargs = self.queue.get_nowait()
     88 + except asyncio.QueueEmpty:
     89 + return
     90 + 
     91 + query_future = f(*args, **kwargs)
     92 + query_task = create_task_func()(query_future)
     93 + try:
     94 + result = await asyncio.wait_for(query_task, timeout=self.timeout)
     95 + except asyncio.TimeoutError:
     96 + result = None
     97 + 
     98 + self.results.append(result)
     99 + self.progress.update(1)
     100 + self.queue.task_done()
     101 + 
     102 + async def _run(self, queries: Iterable[QueryDraft]):
     103 + self.results = []
     104 + 
     105 + queries_list = list(queries)
     106 + 
     107 + min_workers = min(len(queries_list), self.workers_count)
     108 + 
     109 + workers = [create_task_func()(self.worker())
     110 + for _ in range(min_workers)]
     111 + 
     112 + self.progress = self.progress_func(total=len(queries_list))
     113 + for t in queries_list:
     114 + await self.queue.put(t)
     115 + await self.queue.join()
     116 + for w in workers:
     117 + w.cancel()
     118 + self.progress.close()
     119 + return self.results
     120 + 
  • ■ ■ ■ ■ ■ ■
    maigret/types.py
     1 +from typing import Callable, Any, Tuple
     2 + 
     3 + 
     4 +# search query
     5 +QueryDraft = Tuple[Callable, Any, Any]
     6 + 
     7 +# error got as a result of completed search query
     8 +class CheckError:
     9 + _type = 'Unknown'
     10 + _desc = ''
     11 + 
     12 + def __init__(self, typename, desc=''):
     13 + self._type = typename
     14 + self._desc = desc
     15 + 
     16 + def __str__(self):
     17 + if not self._desc:
     18 + return f'{self._type} error'
     19 + 
     20 + return f'{self._type} error: {self._desc}'
     21 + 
     22 + @property
     23 + def type(self):
     24 + return self._type
     25 +
     26 + @property
     27 + def desc(self):
     28 + return self._desc
     29 + 
Please wait...
Page is in error, reload to recover