| skipped 12 lines |
13 | 13 | | |
14 | 14 | | _logger = logging.getLogger(__name__) |
15 | 15 | | _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') |
16 | | - | |
| 16 | + | _STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)') |
17 | 17 | | |
18 | 18 | | @dataclasses.dataclass |
19 | 19 | | class LinkPreview: |
| skipped 5 lines |
25 | 25 | | |
26 | 26 | | |
27 | 27 | | @dataclasses.dataclass |
28 | | - | class TelegramPost(snscrape.base.Item): |
29 | | - | url: str |
30 | | - | date: datetime.datetime |
31 | | - | content: str |
32 | | - | outlinks: list |
33 | | - | images: list |
34 | | - | videos: list |
35 | | - | forwarded: str |
36 | | - | linkPreview: typing.Optional[LinkPreview] = None |
37 | | - | |
38 | | - | outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') |
39 | | - | |
40 | | - | def __str__(self): |
41 | | - | return self.url |
42 | | - | |
43 | | - | |
44 | | - | @dataclasses.dataclass |
45 | 28 | | class Channel(snscrape.base.Entity): |
46 | 29 | | username: str |
47 | | - | title: str |
48 | | - | verified: bool |
49 | | - | photo: str |
| 30 | + | title: typing.Optional[str] = None |
| 31 | + | verified: typing.Optional[bool] = None |
| 32 | + | photo: typing.Optional[str] = None |
50 | 33 | | description: typing.Optional[str] = None |
51 | 34 | | members: typing.Optional[int] = None |
52 | 35 | | photos: typing.Optional[snscrape.base.IntWithGranularity] = None |
| skipped 10 lines |
63 | 46 | | return f'https://t.me/s/{self.username}' |
64 | 47 | | |
65 | 48 | | |
| 49 | + | @dataclasses.dataclass |
| 50 | + | class TelegramPost(snscrape.base.Item): |
| 51 | + | url: str |
| 52 | + | date: datetime.datetime |
| 53 | + | content: str |
| 54 | + | outlinks: typing.List[str] = None |
| 55 | + | mentions: typing.List[str] = None |
| 56 | + | hashtags: typing.List[str] = None |
| 57 | + | forwarded: typing.Optional['Channel'] = None |
| 58 | + | forwardedUrl: typing.Optional[str] = None |
| 59 | + | media: typing.Optional[typing.List['Medium']] = None |
| 60 | + | views: typing.Optional[int] = None |
| 61 | + | linkPreview: typing.Optional[LinkPreview] = None |
| 62 | + | |
| 63 | + | outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') |
| 64 | + | |
| 65 | + | def __str__(self): |
| 66 | + | return self.url |
| 67 | + | |
| 68 | + | |
| 69 | + | class Medium: |
| 70 | + | pass |
| 71 | + | |
| 72 | + | |
| 73 | + | @dataclasses.dataclass |
| 74 | + | class Photo(Medium): |
| 75 | + | url: str |
| 76 | + | |
| 77 | + | |
| 78 | + | @dataclasses.dataclass |
| 79 | + | class Video(Medium): |
| 80 | + | thumbnailUrl: str |
| 81 | + | duration: float |
| 82 | + | url: typing.Optional[str] = None |
| 83 | + | |
| 84 | + | |
| 85 | + | @dataclasses.dataclass |
| 86 | + | class VoiceMessage(Medium): |
| 87 | + | url: str |
| 88 | + | duration: str |
| 89 | + | bars:typing.List[float] |
| 90 | + | |
| 91 | + | |
| 92 | + | @dataclasses.dataclass |
| 93 | + | class Gif(Medium): |
| 94 | + | thumbnailUrl: str |
| 95 | + | url: typing.Optional[str] = None |
| 96 | + | |
| 97 | + | |
66 | 98 | | class TelegramChannelScraper(snscrape.base.Scraper): |
67 | 99 | | name = 'telegram-channel' |
68 | 100 | | |
| skipped 24 lines |
93 | 125 | | _logger.warning(f'Possibly incorrect URL: {rawUrl!r}') |
94 | 126 | | url = rawUrl.replace('//t.me/', '//t.me/s/') |
95 | 127 | | date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') |
96 | | - | images = [] |
97 | | - | videos = [] |
| 128 | + | media = [] |
| 129 | + | outlinks = [] |
| 130 | + | mentions = [] |
| 131 | + | hashtags = [] |
98 | 132 | | forwarded = None |
| 133 | + | forwardedUrl = None |
| 134 | + | |
| 135 | + | if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): |
| 136 | + | forwardedUrl = forwardTag['href'] |
| 137 | + | forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0] |
| 138 | + | forwarded = Channel(username = forwardedName) |
| 139 | + | |
99 | 140 | | if (message := post.find('div', class_ = 'tgme_widget_message_text')): |
100 | 141 | | content = message.get_text(separator="\n") |
| 142 | + | else: |
| 143 | + | content = None |
101 | 144 | | |
102 | | - | for video_tag in post.find_all('video'): |
103 | | - | videos.append(video_tag['src']) |
| 145 | + | for link in post.find_all('a'): |
| 146 | + | if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): |
| 147 | + | # Author links at the top (avatar and name) |
| 148 | + | continue |
| 149 | + | if link['href'] == rawUrl or link['href'] == url: |
| 150 | + | style = link.attrs.get('style', '') |
| 151 | + | # Generic filter of links to the post itself, catches videos, photos, and the date link |
| 152 | + | if style != '': |
| 153 | + | imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) |
| 154 | + | if len(imageUrls) == 1: |
| 155 | + | media.append(Photo(url = imageUrls[0])) |
| 156 | + | continue |
| 157 | + | if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): |
| 158 | + | style = link.attrs.get('style', '') |
| 159 | + | imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) |
| 160 | + | if len(imageUrls) == 1: |
| 161 | + | media.append(Photo(url = imageUrls[0])) |
| 162 | + | # resp = self._get(image[0]) |
| 163 | + | # encoded_string = base64.b64encode(resp.content) |
| 164 | + | # Individual photo or video link |
| 165 | + | continue |
| 166 | + | if link.text.startswith('@'): |
| 167 | + | mentions.append(link.text.strip('@')) |
| 168 | + | continue |
| 169 | + | if link.text.startswith('#'): |
| 170 | + | hashtags.append(link.text.strip('#')) |
| 171 | + | continue |
| 172 | + | href = urllib.parse.urljoin(pageUrl, link['href']) |
| 173 | + | if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl): |
| 174 | + | outlinks.append(href) |
| 175 | + | |
| 176 | + | for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): |
| 177 | + | audioUrl = voicePlayer.find('audio')['src'] |
| 178 | + | durationStr = voicePlayer.find('time').text |
| 179 | + | duration = durationStrToSeconds(durationStr) |
| 180 | + | barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')] |
| 181 | + | |
| 182 | + | media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)) |
104 | 183 | | |
105 | | - | if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): |
106 | | - | forwarded = forward_tag['href'].split('t.me/')[1].split('/')[0] |
| 184 | + | for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): |
| 185 | + | iTag = videoPlayer.find('i') |
| 186 | + | if iTag is None: |
| 187 | + | videoUrl = None |
| 188 | + | videoThumbnailUrl = None |
| 189 | + | else: |
| 190 | + | style = iTag['style'] |
| 191 | + | videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0] |
| 192 | + | videoTag = videoPlayer.find('video') |
| 193 | + | videoUrl = None if videoTag is None else videoTag['src'] |
| 194 | + | mKwargs = { |
| 195 | + | 'thumbnailUrl': videoThumbnailUrl, |
| 196 | + | 'url': videoUrl, |
| 197 | + | } |
| 198 | + | timeTag = videoPlayer.find('time') |
| 199 | + | if timeTag is None: |
| 200 | + | cls = Gif |
| 201 | + | else: |
| 202 | + | cls = Video |
| 203 | + | durationStr = videoPlayer.find('time').text |
| 204 | + | mKwargs['duration'] = durationStrToSeconds(durationStr) |
| 205 | + | media.append(cls(**mKwargs)) |
107 | 206 | | |
108 | | - | outlinks = [] |
109 | | - | for link in post.find_all('a'): |
110 | | - | if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): |
111 | | - | # Author links at the top (avatar and name) |
112 | | - | continue |
113 | | - | if link['href'] == rawUrl or link['href'] == url: |
114 | | - | style = link.attrs.get('style', '') |
115 | | - | # Generic filter of links to the post itself, catches videos, photos, and the date link |
116 | | - | if style != '': |
117 | | - | image = re.findall('url\(\'(.*?)\'\)', style) |
118 | | - | if len(image) == 1: |
119 | | - | images.append(image[0]) |
120 | | - | continue |
121 | | - | if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): |
122 | | - | style = link.attrs.get('style', '') |
123 | | - | image = re.findall('url\(\'(.*?)\'\)', style) |
124 | | - | if len(image) == 1: |
125 | | - | images.append(image[0]) |
126 | | - | # resp = self._get(image[0]) |
127 | | - | # encoded_string = base64.b64encode(resp.content) |
128 | | - | # Individual photo or video link |
129 | | - | continue |
130 | | - | href = urllib.parse.urljoin(pageUrl, link['href']) |
131 | | - | if href not in outlinks: |
132 | | - | outlinks.append(href) |
133 | | - | else: |
134 | | - | content = None |
135 | | - | outlinks = [] |
136 | | - | images = [] |
137 | | - | videos = [] |
138 | 207 | | linkPreview = None |
139 | 208 | | if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): |
140 | 209 | | kwargs = {} |
| skipped 10 lines |
151 | 220 | | else: |
152 | 221 | | _logger.warning(f'Could not process link preview image on {url}') |
153 | 222 | | linkPreview = LinkPreview(**kwargs) |
154 | | - | yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview, images = images, videos = videos, forwarded = forwarded) |
| 223 | + | if kwargs['href'] in outlinks: |
| 224 | + | outlinks.remove(kwargs['href']) |
| 225 | + | |
| 226 | + | viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') |
| 227 | + | views = None if viewsSpan is None else parse_num(viewsSpan.text) |
| 228 | + | |
| 229 | + | yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views) |
155 | 230 | | |
156 | 231 | | def get_items(self): |
157 | 232 | | r, soup = self._initial_page() |
158 | 233 | | if '/s/' not in r.url: |
159 | 234 | | _logger.warning('No public post list for this user') |
160 | 235 | | return |
| 236 | + | nextPageUrl = '' |
161 | 237 | | while True: |
162 | 238 | | yield from self._soup_to_items(soup, r.url) |
| 239 | + | try: |
| 240 | + | if soup.find('a', attrs = {'class': 'tgme_widget_message_date'}, href = True)['href'].split('/')[-1] == '1': |
| 241 | + | # if message 1 is the first message in the page, terminate scraping |
| 242 | + | break |
| 243 | + | except: |
| 244 | + | pass |
163 | 245 | | pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True}) |
164 | 246 | | if not pageLink: |
165 | | - | break |
| 247 | + | # some pages are missing a "tme_messages_more" tag, causing early termination |
| 248 | + | if '=' not in nextPageUrl: |
| 249 | + | nextPageUrl = soup.find('link', attrs = {'rel': 'canonical'}, href = True)['href'] |
| 250 | + | nextPostIndex = int(nextPageUrl.split('=')[-1]) - 20 |
| 251 | + | if nextPostIndex > 20: |
| 252 | + | pageLink = {'href': nextPageUrl.split('=')[0] + f'={nextPostIndex}'} |
| 253 | + | else: |
| 254 | + | break |
166 | 255 | | nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) |
167 | | - | r = self._get(nextPageUrl, headers = self._headers) |
| 256 | + | r = self._get(nextPageUrl, headers = self._headers, responseOkCallback = telegramResponseOkCallback) |
168 | 257 | | if r.status_code != 200: |
169 | 258 | | raise snscrape.base.ScraperException(f'Got status code {r.status_code}') |
170 | 259 | | soup = bs4.BeautifulSoup(r.text, 'lxml') |
| skipped 33 lines |
204 | 293 | | if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')): |
205 | 294 | | kwargs['description'] = descriptionDiv.text |
206 | 295 | | |
207 | | - | def parse_num(s): |
208 | | - | s = s.replace(' ', '') |
209 | | - | if s.endswith('M'): |
210 | | - | return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) |
211 | | - | elif s.endswith('K'): |
212 | | - | return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) |
213 | | - | else: |
214 | | - | return int(s), 1 |
215 | | - | |
216 | 296 | | for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'): |
217 | 297 | | value, granularity = parse_num(div.find('span', class_ = 'counter_value').text) |
218 | 298 | | type_ = div.find('span', class_ = 'counter_type').text |
| skipped 13 lines |
232 | 312 | | def _cli_from_args(cls, args): |
233 | 313 | | return cls._cli_construct(args, args.channel) |
234 | 314 | | |
| 315 | + | def parse_num(s): |
| 316 | + | s = s.replace(' ', '') |
| 317 | + | if s.endswith('M'): |
| 318 | + | return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) |
| 319 | + | elif s.endswith('K'): |
| 320 | + | return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) |
| 321 | + | return int(s), 1 |
| 322 | + | |
| 323 | + | def durationStrToSeconds(durationStr): |
| 324 | + | durationList = durationStr.split(':') |
| 325 | + | return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))]) |
| 326 | + | |
| 327 | + | def telegramResponseOkCallback(r): |
| 328 | + | if r.status_code == 200: |
| 329 | + | return (True, None) |
| 330 | + | return (False, f'{r.status_code=}') |
| 331 | + | |