Projects STRLCPY snscrape Commits 5648e957
🤬
  • improved consistency of code formatting and added _STYLE_MEDIA_URL_PATTERN as variable

  • Loading...
  • Tristan Lee committed 2 years ago
    5648e957
    1 parent 21f7b620
  • ■ ■ ■ ■ ■ ■
    snscrape/modules/telegram.py
    skipped 12 lines
    13 13   
    14 14  _logger = logging.getLogger(__name__)
    15 15  _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
    16  - 
     16 +_STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)')
    17 17   
    18 18  @dataclasses.dataclass
    19 19  class LinkPreview:
    skipped 25 lines
    45 45   def __str__(self):
    46 46   return f'https://t.me/s/{self.username}'
    47 47   
     48 + 
    48 49  @dataclasses.dataclass
    49 50  class TelegramPost(snscrape.base.Item):
    50 51   url: str
    skipped 12 lines
    63 64   
    64 65   def __str__(self):
    65 66   return self.url
     67 + 
    66 68   
    67 69  class Medium:
    68 70   pass
    69 71   
     72 + 
    70 73  @dataclasses.dataclass
    71 74  class Photo(Medium):
    72 75   url: str
     76 + 
    73 77   
    74 78  @dataclasses.dataclass
    75 79  class Video(Medium):
    skipped 1 lines
    77 81   duration: float
    78 82   url: typing.Optional[str] = None
    79 83   
     84 + 
    80 85  @dataclasses.dataclass
    81 86  class VoiceMessage(Medium):
    82 87   url: str
    83 88   duration: str
    84 89   bars:typing.List[float]
     90 + 
    85 91   
    86 92  @dataclasses.dataclass
    87 93  class Gif(Medium):
    88 94   thumbnailUrl: str
    89 95   url: typing.Optional[str] = None
    90 96   
     97 + 
    91 98  class TelegramChannelScraper(snscrape.base.Scraper):
    92 99   name = 'telegram-channel'
    93 100   
    skipped 26 lines
    120 127   date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
    121 128   media = []
    122 129   outlinks = []
     130 + mentions = []
     131 + hashtags = []
    123 132   forwarded = None
    124 133   forwardedUrl = None
    125 134   
    126  - if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
    127  - forwardedUrl = forward_tag['href']
     135 + if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
     136 + forwardedUrl = forwardTag['href']
    128 137   forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0]
    129 138   forwarded = Channel(username = forwardedName)
    130 139   
    skipped 2 lines
    133 142   else:
    134 143   content = None
    135 144   
    136  - outlinks = []
    137  - mentions = []
    138  - hashtags = []
    139 145   for link in post.find_all('a'):
    140 146   if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
    141 147   # Author links at the top (avatar and name)
    skipped 2 lines
    144 150   style = link.attrs.get('style', '')
    145 151   # Generic filter of links to the post itself, catches videos, photos, and the date link
    146 152   if style != '':
    147  - imageUrls = re.findall('url\(\'(.*?)\'\)', style)
     153 + imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
    148 154   if len(imageUrls) == 1:
    149 155   media.append(Photo(url = imageUrls[0]))
    150 156   continue
    151 157   if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
    152 158   style = link.attrs.get('style', '')
    153  - imageUrls = re.findall('url\(\'(.*?)\'\)', style)
     159 + imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
    154 160   if len(imageUrls) == 1:
    155 161   media.append(Photo(url = imageUrls[0]))
    156 162   # resp = self._get(image[0])
    skipped 10 lines
    167 173   if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
    168 174   outlinks.append(href)
    169 175   
    170  - for voice_player in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
    171  - audioUrl = voice_player.find('audio')['src']
    172  - durationStr = voice_player.find('time').text.split(':')
     176 + for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
     177 + audioUrl = voicePlayer.find('audio')['src']
     178 + durationStr = voicePlayer.find('time').text
    173 179   duration = durationStrToSeconds(durationStr)
    174  - barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voice_player.find('div', {'class': 'bar'}).find_all('s')]
     180 + barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
    175 181   
    176 182   media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
    177 183   
    178  - for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
    179  - iTag = video_player.find('i')
     184 + for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
     185 + iTag = videoPlayer.find('i')
    180 186   if iTag is None:
    181 187   videoUrl = None
    182 188   videoThumbnailUrl = None
    183 189   else:
    184 190   style = iTag['style']
    185  - videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0]
    186  - videoTag = video_player.find('video')
    187  - if videoTag is None:
    188  - videoUrl = None
    189  - else:
    190  - videoUrl = videoTag['src']
     191 + videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
     192 + videoTag = videoPlayer.find('video')
     193 + videoUrl = None if videoTag is None else videoTag['src']
    191 194   mKwargs = {
    192 195   'thumbnailUrl': videoThumbnailUrl,
    193 196   'url': videoUrl,
    194 197   }
    195  - timeTag = video_player.find('time')
     198 + timeTag = videoPlayer.find('time')
    196 199   if timeTag is None:
    197 200   cls = Gif
    198 201   else:
    199 202   cls = Video
    200  - durationStr = video_player.find('time').text.split(':')
     203 + durationStr = videoPlayer.find('time').text
    201 204   mKwargs['duration'] = durationStrToSeconds(durationStr)
    202 205   media.append(cls(**mKwargs))
    203 206   
    skipped 17 lines
    221 224   outlinks.remove(kwargs['href'])
    222 225   
    223 226   viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
    224  - if viewsSpan is None:
    225  - views = None
    226  - else:
    227  - views = parse_num(viewsSpan.text)
     227 + views = None if viewsSpan is None else parse_num(viewsSpan.text)
    228 228  
    229 229   yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
    230 230   
    skipped 87 lines
    318 318   return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
    319 319   elif s.endswith('K'):
    320 320   return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
    321  - else:
    322  - return int(s), 1
     321 + return int(s), 1
    323 322   
    324 323  def durationStrToSeconds(durationStr):
    325  - return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))])
     324 + durationList = durationStr.split(':')
     325 + return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))])
    326 326   
    327 327  def telegramResponseOkCallback(r):
    328 328   if r.status_code == 200:
    329 329   return (True, None)
    330  - elif r.status_code // 100 == 5:
    331  - return (False, f'status code: {r.status_code}')
    332  - else:
    333  - return (False, None)
     330 + return (False, f'{r.status_code=}')
     331 +
Please wait...
Page is in error, reload to recover