Projects STRLCPY snscrape Commits b276c3cc
🤬
  • fixed issue where some videos and photos weren't being scraped (because they weren't in a post containing a 'tgme_widget_message_text' div

  • Loading...
  • Tristan Lee committed 2 years ago
    b276c3cc
    1 parent 1e4e0c27
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■ ■
    snscrape/modules/telegram.py
    skipped 64 lines
    65 65  class Medium:
    66 66   pass
    67 67   
    68  - 
    69 68  @dataclasses.dataclass
    70 69  class Photo(Medium):
    71  - previewUrl: str
    72  - fullUrl: str
    73  - 
    74  -@dataclasses.dataclass
    75  -class Image(Medium):
    76 70   url: str
    77 71   
    78 72  @dataclasses.dataclass
    skipped 1 lines
    80 74   thumbnailUrl: str
    81 75   duration: float
    82 76   url: typing.Optional[str] = None
     77 + 
     78 +@dataclasses.dataclass
     79 +class VoiceMessage(Medium):
     80 + url: str
     81 + duration: str
     82 + bars:typing.List[float]
    83 83   
    84 84  @dataclasses.dataclass
    85 85  class Gif(Medium):
    skipped 31 lines
    117 117   url = rawUrl.replace('//t.me/', '//t.me/s/')
    118 118   date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
    119 119   media = []
     120 + outlinks = []
    120 121   forwarded = None
    121 122   forwardedUrl = None
     123 + 
    122 124   if (message := post.find('div', class_ = 'tgme_widget_message_text')):
    123 125   content = message.get_text(separator="\n")
    124 126   
    125  - for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
    126  - iTag = video_player.find('i')
    127  - if iTag is None:
    128  - videoUrl = None
    129  - videoThumbnailUrl = None
    130  - else:
    131  - style = iTag['style']
    132  - videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0]
    133  - videoTag = video_player.find('video')
    134  - if videoTag is None:
    135  - videoUrl = None
    136  - else:
    137  - videoUrl = videoTag['src']
    138  - mKwargs = {
    139  - 'thumbnailUrl': videoThumbnailUrl,
    140  - 'url': videoUrl,
    141  - }
    142  - timeTag = video_player.find('time')
    143  - if timeTag is None:
    144  - cls = Gif
    145  - else:
    146  - cls = Video
    147  - durationStr = video_player.find('time').text.split(':')
    148  - mKwargs['duration'] = sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))])
    149  - media.append(cls(**mKwargs))
    150 127   if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
    151 128   forwardedUrl = forward_tag['href']
    152 129   forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0]
    153 130   forwarded = Channel(username = forwardedName)
    154 131   
    155  - outlinks = []
    156  - for link in post.find_all('a'):
    157  - if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
    158  - # Author links at the top (avatar and name)
    159  - continue
    160  - if link['href'] == rawUrl or link['href'] == url:
    161  - style = link.attrs.get('style', '')
    162  - # Generic filter of links to the post itself, catches videos, photos, and the date link
    163  - if style != '':
    164  - imageUrls = re.findall('url\(\'(.*?)\'\)', style)
    165  - if len(imageUrls) == 1:
    166  - media.append(Image(url = imageUrls[0]))
    167  - continue
    168  - if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
    169  - style = link.attrs.get('style', '')
     132 + else:
     133 + content = None
     134 + 
     135 + outlinks = []
     136 + for link in post.find_all('a'):
     137 + if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
     138 + # Author links at the top (avatar and name)
     139 + continue
     140 + if link['href'] == rawUrl or link['href'] == url:
     141 + style = link.attrs.get('style', '')
     142 + # Generic filter of links to the post itself, catches videos, photos, and the date link
     143 + if style != '':
    170 144   imageUrls = re.findall('url\(\'(.*?)\'\)', style)
    171 145   if len(imageUrls) == 1:
    172  - media.append(Image(url = imageUrls[0]))
    173  - # resp = self._get(image[0])
    174  - # encoded_string = base64.b64encode(resp.content)
    175  - # Individual photo or video link
     146 + media.append(Photo(url = imageUrls[0]))
    176 147   continue
    177  - href = urllib.parse.urljoin(pageUrl, link['href'])
    178  - if href not in outlinks:
    179  - outlinks.append(href)
    180  - else:
    181  - content = None
    182  - outlinks = []
    183  - media = []
     148 + if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
     149 + style = link.attrs.get('style', '')
     150 + imageUrls = re.findall('url\(\'(.*?)\'\)', style)
     151 + if len(imageUrls) == 1:
     152 + media.append(Photo(url = imageUrls[0]))
     153 + # resp = self._get(image[0])
     154 + # encoded_string = base64.b64encode(resp.content)
     155 + # Individual photo or video link
     156 + continue
     157 + href = urllib.parse.urljoin(pageUrl, link['href'])
     158 + if (href not in outlinks) and (href != rawUrl):
     159 + outlinks.append(href)
     160 + 
     161 + for voice_player in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
     162 + audioUrl = voice_player.find('audio')['src']
     163 + durationStr = voice_player.find('time').text.split(':')
     164 + duration = durationStrToSeconds(durationStr)
     165 + barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voice_player.find('div', {'class': 'bar'}).find_all('s')]
     166 + 
     167 + media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
     168 + 
     169 + for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
     170 + iTag = video_player.find('i')
     171 + if iTag is None:
     172 + videoUrl = None
     173 + videoThumbnailUrl = None
     174 + else:
     175 + style = iTag['style']
     176 + videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0]
     177 + videoTag = video_player.find('video')
     178 + if videoTag is None:
     179 + videoUrl = None
     180 + else:
     181 + videoUrl = videoTag['src']
     182 + mKwargs = {
     183 + 'thumbnailUrl': videoThumbnailUrl,
     184 + 'url': videoUrl,
     185 + }
     186 + timeTag = video_player.find('time')
     187 + if timeTag is None:
     188 + cls = Gif
     189 + else:
     190 + cls = Video
     191 + durationStr = video_player.find('time').text.split(':')
     192 + mKwargs['duration'] = durationStrToSeconds(durationStr)
     193 + media.append(cls(**mKwargs))
     194 + 
    184 195   linkPreview = None
    185 196   if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
    186 197   kwargs = {}
    skipped 10 lines
    197 208   else:
    198 209   _logger.warning(f'Could not process link preview image on {url}')
    199 210   linkPreview = LinkPreview(**kwargs)
     211 + if kwargs['href'] in outlinks:
     212 + outlinks.remove(kwargs['href'])
     213 + 
    200 214   viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
    201 215   if viewsSpan is None:
    202 216   views = None
    skipped 17 lines
    220 234   else:
    221 235   break
    222 236   nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
    223  - print(f'nextPageUrl: {nextPageUrl}')
    224 237   r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback)
    225 238   if r.status_code != 200:
    226 239   raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
    skipped 61 lines
    288 301   return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
    289 302   else:
    290 303   return int(s), 1
     304 + 
     305 +def durationStrToSeconds(durationStr):
     306 + return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))])
    291 307   
    292 308  def telegramResponseOkCallback(r):
    293 309   if r.status_code == 200:
    skipped 5 lines
Please wait...
Page is in error, reload to recover