■ ■ ■ ■ ■ ■
snscrape/modules/telegram.py
| skipped 8 lines |
9 | 9 | | import snscrape.base |
10 | 10 | | import typing |
11 | 11 | | import urllib.parse |
12 | | - | import base64 |
13 | 12 | | |
14 | 13 | | _logger = logging.getLogger(__name__) |
15 | 14 | | _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') |
| skipped 41 lines |
57 | 56 | | forwarded: typing.Optional['Channel'] = None |
58 | 57 | | forwardedUrl: typing.Optional[str] = None |
59 | 58 | | media: typing.Optional[typing.List['Medium']] = None |
60 | | - | views: typing.Optional[int] = None |
| 59 | + | views: typing.Optional[snscrape.base.IntWithGranularity] = None |
61 | 60 | | linkPreview: typing.Optional[LinkPreview] = None |
62 | 61 | | |
63 | 62 | | outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') |
| skipped 112 lines |
176 | 175 | | for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): |
177 | 176 | | audioUrl = voicePlayer.find('audio')['src'] |
178 | 177 | | durationStr = voicePlayer.find('time').text |
179 | | - | duration = durationStrToSeconds(durationStr) |
| 178 | + | duration = _durationStrToSeconds(durationStr) |
180 | 179 | | barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')] |
181 | 180 | | |
182 | 181 | | media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)) |
| skipped 18 lines |
201 | 200 | | else: |
202 | 201 | | cls = Video |
203 | 202 | | durationStr = videoPlayer.find('time').text |
204 | | - | mKwargs['duration'] = durationStrToSeconds(durationStr) |
| 203 | + | mKwargs['duration'] = _durationStrToSeconds(durationStr) |
205 | 204 | | media.append(cls(**mKwargs)) |
206 | 205 | | |
207 | 206 | | linkPreview = None |
| skipped 16 lines |
224 | 223 | | outlinks.remove(kwargs['href']) |
225 | 224 | | |
226 | 225 | | viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') |
227 | | - | views = None if viewsSpan is None else parse_num(viewsSpan.text) |
| 226 | + | views = None if viewsSpan is None else _parse_num(viewsSpan.text) |
| 227 | + | |
| 228 | + | outlinks = outlinks if outlinks else None |
| 229 | + | media = media if media else None |
| 230 | + | mentions = mentions if mentions else None |
| 231 | + | hashtags = hashtags if hashtags else None |
228 | 232 | | |
229 | 233 | | yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views) |
230 | 234 | | |
| skipped 22 lines |
253 | 257 | | else: |
254 | 258 | | break |
255 | 259 | | nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) |
256 | | - | r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback) |
| 260 | + | r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = _telegramResponseOkCallback) |
257 | 261 | | if r.status_code != 200: |
258 | 262 | | raise snscrape.base.ScraperException(f'Got status code {r.status_code}') |
259 | 263 | | soup = bs4.BeautifulSoup(r.text, 'lxml') |
| skipped 6 lines |
266 | 270 | | raise snscrape.base.ScraperException(f'Got status code {r.status_code}') |
267 | 271 | | soup = bs4.BeautifulSoup(r.text, 'lxml') |
268 | 272 | | membersDiv = soup.find('div', class_ = 'tgme_page_extra') |
269 | | - | if membersDiv.text.endswith((' members', ' subscribers')): |
270 | | - | kwargs['members'] = int(''.join(membersDiv.text.split(' ')[:-1])) |
| 273 | + | if membersDiv.text.split(',')[0].endswith((' members', ' subscribers')): |
| 274 | + | membersStr = ''.join(membersDiv.text.split(',')[0].split(' ')[:-1]) |
| 275 | + | if membersStr == 'no': |
| 276 | + | kwargs['members'] = 0 |
| 277 | + | else: |
| 278 | + | kwargs['members'] = int(membersStr) |
271 | 279 | | photoImg = soup.find('img', class_ = 'tgme_page_photo_image') |
272 | 280 | | if photoImg is not None: |
273 | 281 | | kwargs['photo'] = photoImg.attrs['src'] |
| skipped 20 lines |
294 | 302 | | kwargs['description'] = descriptionDiv.text |
295 | 303 | | |
296 | 304 | | for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'): |
297 | | - | value, granularity = parse_num(div.find('span', class_ = 'counter_value').text) |
| 305 | + | value, granularity = _parse_num(div.find('span', class_ = 'counter_value').text) |
298 | 306 | | type_ = div.find('span', class_ = 'counter_type').text |
299 | 307 | | if type_ == 'members': |
300 | 308 | | # Already extracted more accurately from /channel, skip |
| skipped 11 lines |
312 | 320 | | def _cli_from_args(cls, args): |
313 | 321 | | return cls._cli_construct(args, args.channel) |
314 | 322 | | |
315 | | - | def parse_num(s): |
| 323 | + | def _parse_num(s): |
316 | 324 | | s = s.replace(' ', '') |
317 | 325 | | if s.endswith('M'): |
318 | 326 | | return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) |
| skipped 1 lines |
320 | 328 | | return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) |
321 | 329 | | return int(s), 1 |
322 | 330 | | |
323 | | - | def durationStrToSeconds(durationStr): |
| 331 | + | def _durationStrToSeconds(durationStr): |
324 | 332 | | durationList = durationStr.split(':') |
325 | | - | return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))]) |
| 333 | + | return sum([int(s) * int(g) for s, g in zip([1, 60, 3600], reversed(durationList))]) |
326 | 334 | | |
327 | | - | def telegramResponseOkCallback(r): |
| 335 | + | def _telegramResponseOkCallback(r): |
328 | 336 | | if r.status_code == 200: |
329 | 337 | | return (True, None) |
330 | 338 | | return (False, f'{r.status_code=}') |
| skipped 1 lines |