Projects STRLCPY snscrape Commits 0a4bd39c
🤬
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■ ■
    snscrape/modules/telegram.py
    skipped 12 lines
    13 13   
    14 14  _logger = logging.getLogger(__name__)
    15 15  _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$')
    16  - 
     16 +_STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)')
    17 17   
    18 18  @dataclasses.dataclass
    19 19  class LinkPreview:
    skipped 5 lines
    25 25   
    26 26   
    27 27  @dataclasses.dataclass
    28  -class TelegramPost(snscrape.base.Item):
    29  - url: str
    30  - date: datetime.datetime
    31  - content: str
    32  - outlinks: list
    33  - images: list
    34  - videos: list
    35  - forwarded: str
    36  - linkPreview: typing.Optional[LinkPreview] = None
    37  - 
    38  - outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
    39  - 
    40  - def __str__(self):
    41  - return self.url
    42  - 
    43  - 
    44  -@dataclasses.dataclass
    45 28  class Channel(snscrape.base.Entity):
    46 29   username: str
    47  - title: str
    48  - verified: bool
    49  - photo: str
     30 + title: typing.Optional[str] = None
     31 + verified: typing.Optional[bool] = None
     32 + photo: typing.Optional[str] = None
    50 33   description: typing.Optional[str] = None
    51 34   members: typing.Optional[int] = None
    52 35   photos: typing.Optional[snscrape.base.IntWithGranularity] = None
    skipped 10 lines
    63 46   return f'https://t.me/s/{self.username}'
    64 47   
    65 48   
     49 +@dataclasses.dataclass
     50 +class TelegramPost(snscrape.base.Item):
     51 + url: str
     52 + date: datetime.datetime
     53 + content: str
     54 + outlinks: typing.List[str] = None
     55 + mentions: typing.List[str] = None
     56 + hashtags: typing.List[str] = None
     57 + forwarded: typing.Optional['Channel'] = None
     58 + forwardedUrl: typing.Optional[str] = None
     59 + media: typing.Optional[typing.List['Medium']] = None
     60 + views: typing.Optional[int] = None
     61 + linkPreview: typing.Optional[LinkPreview] = None
     62 + 
     63 + outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks')
     64 + 
     65 + def __str__(self):
     66 + return self.url
     67 + 
     68 + 
     69 +class Medium:
     70 + pass
     71 + 
     72 + 
     73 +@dataclasses.dataclass
     74 +class Photo(Medium):
     75 + url: str
     76 + 
     77 + 
     78 +@dataclasses.dataclass
     79 +class Video(Medium):
     80 + thumbnailUrl: str
     81 + duration: float
     82 + url: typing.Optional[str] = None
     83 + 
     84 + 
     85 +@dataclasses.dataclass
     86 +class VoiceMessage(Medium):
     87 + url: str
     88 + duration: str
     89 + bars:typing.List[float]
     90 + 
     91 + 
     92 +@dataclasses.dataclass
     93 +class Gif(Medium):
     94 + thumbnailUrl: str
     95 + url: typing.Optional[str] = None
     96 + 
     97 + 
    66 98  class TelegramChannelScraper(snscrape.base.Scraper):
    67 99   name = 'telegram-channel'
    68 100   
    skipped 24 lines
    93 125   _logger.warning(f'Possibly incorrect URL: {rawUrl!r}')
    94 126   url = rawUrl.replace('//t.me/', '//t.me/s/')
    95 127   date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z')
    96  - images = []
    97  - videos = []
     128 + media = []
     129 + outlinks = []
     130 + mentions = []
     131 + hashtags = []
    98 132   forwarded = None
     133 + forwardedUrl = None
     134 + 
     135 + if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
     136 + forwardedUrl = forwardTag['href']
     137 + forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0]
     138 + forwarded = Channel(username = forwardedName)
     139 + 
    99 140   if (message := post.find('div', class_ = 'tgme_widget_message_text')):
    100 141   content = message.get_text(separator="\n")
     142 + else:
     143 + content = None
    101 144   
    102  - for video_tag in post.find_all('video'):
    103  - videos.append(video_tag['src'])
     145 + for link in post.find_all('a'):
     146 + if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
     147 + # Author links at the top (avatar and name)
     148 + continue
     149 + if link['href'] == rawUrl or link['href'] == url:
     150 + style = link.attrs.get('style', '')
     151 + # Generic filter of links to the post itself, catches videos, photos, and the date link
     152 + if style != '':
     153 + imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
     154 + if len(imageUrls) == 1:
     155 + media.append(Photo(url = imageUrls[0]))
     156 + continue
     157 + if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
     158 + style = link.attrs.get('style', '')
     159 + imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style)
     160 + if len(imageUrls) == 1:
     161 + media.append(Photo(url = imageUrls[0]))
     162 + # resp = self._get(image[0])
     163 + # encoded_string = base64.b64encode(resp.content)
     164 + # Individual photo or video link
     165 + continue
     166 + if link.text.startswith('@'):
     167 + mentions.append(link.text.strip('@'))
     168 + continue
     169 + if link.text.startswith('#'):
     170 + hashtags.append(link.text.strip('#'))
     171 + continue
     172 + href = urllib.parse.urljoin(pageUrl, link['href'])
     173 + if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl):
     174 + outlinks.append(href)
     175 + 
     176 + for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}):
     177 + audioUrl = voicePlayer.find('audio')['src']
     178 + durationStr = voicePlayer.find('time').text
     179 + duration = durationStrToSeconds(durationStr)
     180 + barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')]
     181 + 
     182 + media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights))
    104 183   
    105  - if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')):
    106  - forwarded = forward_tag['href'].split('t.me/')[1].split('/')[0]
     184 + for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}):
     185 + iTag = videoPlayer.find('i')
     186 + if iTag is None:
     187 + videoUrl = None
     188 + videoThumbnailUrl = None
     189 + else:
     190 + style = iTag['style']
     191 + videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0]
     192 + videoTag = videoPlayer.find('video')
     193 + videoUrl = None if videoTag is None else videoTag['src']
     194 + mKwargs = {
     195 + 'thumbnailUrl': videoThumbnailUrl,
     196 + 'url': videoUrl,
     197 + }
     198 + timeTag = videoPlayer.find('time')
     199 + if timeTag is None:
     200 + cls = Gif
     201 + else:
     202 + cls = Video
     203 + durationStr = videoPlayer.find('time').text
     204 + mKwargs['duration'] = durationStrToSeconds(durationStr)
     205 + media.append(cls(**mKwargs))
    107 206   
    108  - outlinks = []
    109  - for link in post.find_all('a'):
    110  - if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')):
    111  - # Author links at the top (avatar and name)
    112  - continue
    113  - if link['href'] == rawUrl or link['href'] == url:
    114  - style = link.attrs.get('style', '')
    115  - # Generic filter of links to the post itself, catches videos, photos, and the date link
    116  - if style != '':
    117  - image = re.findall('url\(\'(.*?)\'\)', style)
    118  - if len(image) == 1:
    119  - images.append(image[0])
    120  - continue
    121  - if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']):
    122  - style = link.attrs.get('style', '')
    123  - image = re.findall('url\(\'(.*?)\'\)', style)
    124  - if len(image) == 1:
    125  - images.append(image[0])
    126  - # resp = self._get(image[0])
    127  - # encoded_string = base64.b64encode(resp.content)
    128  - # Individual photo or video link
    129  - continue
    130  - href = urllib.parse.urljoin(pageUrl, link['href'])
    131  - if href not in outlinks:
    132  - outlinks.append(href)
    133  - else:
    134  - content = None
    135  - outlinks = []
    136  - images = []
    137  - videos = []
    138 207   linkPreview = None
    139 208   if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')):
    140 209   kwargs = {}
    skipped 10 lines
    151 220   else:
    152 221   _logger.warning(f'Could not process link preview image on {url}')
    153 222   linkPreview = LinkPreview(**kwargs)
    154  - yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, images = images, videos = videos, forwarded = forwarded)
     223 + if kwargs['href'] in outlinks:
     224 + outlinks.remove(kwargs['href'])
     225 + 
     226 + viewsSpan = post.find('span', class_ = 'tgme_widget_message_views')
     227 + views = None if viewsSpan is None else parse_num(viewsSpan.text)
     228 +
     229 + yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views)
    155 230   
    156 231   def get_items(self):
    157 232   r, soup = self._initial_page()
    158 233   if '/s/' not in r.url:
    159 234   _logger.warning('No public post list for this user')
    160 235   return
     236 + nextPageUrl = ''
    161 237   while True:
    162 238   yield from self._soup_to_items(soup, r.url)
     239 + try:
     240 + if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1':
     241 + # if message 1 is the first message in the page, terminate scraping
     242 + break
     243 + except:
     244 + pass
    163 245   pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True})
    164 246   if not pageLink:
    165  - break
     247 + # some pages are missing a "tme_messages_more" tag, causing early termination
     248 + if '=' not in nextPageUrl:
     249 + nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href']
     250 + nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20
     251 + if nextPostIndex > 20:
     252 + pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'}
     253 + else:
     254 + break
    166 255   nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href'])
    167  - r = self._get(nextPageUrl, headers = self._headers)
     256 + r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback)
    168 257   if r.status_code != 200:
    169 258   raise snscrape.base.ScraperException(f'Got status code {r.status_code}')
    170 259   soup = bs4.BeautifulSoup(r.text, 'lxml')
    skipped 33 lines
    204 293   if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')):
    205 294   kwargs['description'] = descriptionDiv.text
    206 295   
    207  - def parse_num(s):
    208  - s = s.replace(' ', '')
    209  - if s.endswith('M'):
    210  - return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
    211  - elif s.endswith('K'):
    212  - return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
    213  - else:
    214  - return int(s), 1
    215  - 
    216 296   for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'):
    217 297   value, granularity = parse_num(div.find('span', class_ = 'counter_value').text)
    218 298   type_ = div.find('span', class_ = 'counter_type').text
    skipped 13 lines
    232 312   def _cli_from_args(cls, args):
    233 313   return cls._cli_construct(args, args.channel)
    234 314   
     315 +def parse_num(s):
     316 + s = s.replace(' ', '')
     317 + if s.endswith('M'):
     318 + return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1]))
     319 + elif s.endswith('K'):
     320 + return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1]))
     321 + return int(s), 1
     322 + 
     323 +def durationStrToSeconds(durationStr):
     324 + durationList = durationStr.split(':')
     325 + return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))])
     326 + 
     327 +def telegramResponseOkCallback(r):
     328 + if r.status_code == 200:
     329 + return (True, None)
     330 + return (False, f'{r.status_code=}')
     331 +
Please wait...
Page is in error, reload to recover