■ ■ ■ ■ ■ ■
snscrape/modules/telegram.py
| skipped 12 lines |
13 | 13 | | |
14 | 14 | | _logger = logging.getLogger(__name__) |
15 | 15 | | _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') |
16 | | - | |
| 16 | + | _STYLE_MEDIA_URL_PATTERN = re.compile(r'url\(\'(.*?)\'\)') |
17 | 17 | | |
18 | 18 | | @dataclasses.dataclass |
19 | 19 | | class LinkPreview: |
| skipped 25 lines |
45 | 45 | | def __str__(self): |
46 | 46 | | return f'https://t.me/s/{self.username}' |
47 | 47 | | |
| 48 | + | |
48 | 49 | | @dataclasses.dataclass |
49 | 50 | | class TelegramPost(snscrape.base.Item): |
50 | 51 | | url: str |
| skipped 12 lines |
63 | 64 | | |
64 | 65 | | def __str__(self): |
65 | 66 | | return self.url |
| 67 | + | |
66 | 68 | | |
67 | 69 | | class Medium: |
68 | 70 | | pass |
69 | 71 | | |
| 72 | + | |
70 | 73 | | @dataclasses.dataclass |
71 | 74 | | class Photo(Medium): |
72 | 75 | | url: str |
| 76 | + | |
73 | 77 | | |
74 | 78 | | @dataclasses.dataclass |
75 | 79 | | class Video(Medium): |
| skipped 1 lines |
77 | 81 | | duration: float |
78 | 82 | | url: typing.Optional[str] = None |
79 | 83 | | |
| 84 | + | |
80 | 85 | | @dataclasses.dataclass |
81 | 86 | | class VoiceMessage(Medium): |
82 | 87 | | url: str |
83 | 88 | | duration: str |
84 | 89 | | bars:typing.List[float] |
| 90 | + | |
85 | 91 | | |
86 | 92 | | @dataclasses.dataclass |
87 | 93 | | class Gif(Medium): |
88 | 94 | | thumbnailUrl: str |
89 | 95 | | url: typing.Optional[str] = None |
90 | 96 | | |
| 97 | + | |
91 | 98 | | class TelegramChannelScraper(snscrape.base.Scraper): |
92 | 99 | | name = 'telegram-channel' |
93 | 100 | | |
| skipped 26 lines |
120 | 127 | | date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') |
121 | 128 | | media = [] |
122 | 129 | | outlinks = [] |
| 130 | + | mentions = [] |
| 131 | + | hashtags = [] |
123 | 132 | | forwarded = None |
124 | 133 | | forwardedUrl = None |
125 | 134 | | |
126 | | - | if (forward_tag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): |
127 | | - | forwardedUrl = forward_tag['href'] |
| 135 | + | if (forwardTag := post.find('a', class_ = 'tgme_widget_message_forwarded_from_name')): |
| 136 | + | forwardedUrl = forwardTag['href'] |
128 | 137 | | forwardedName = forwardedUrl.split('t.me/')[1].split('/')[0] |
129 | 138 | | forwarded = Channel(username = forwardedName) |
130 | 139 | | |
| skipped 2 lines |
133 | 142 | | else: |
134 | 143 | | content = None |
135 | 144 | | |
136 | | - | outlinks = [] |
137 | | - | mentions = [] |
138 | | - | hashtags = [] |
139 | 145 | | for link in post.find_all('a'): |
140 | 146 | | if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): |
141 | 147 | | # Author links at the top (avatar and name) |
| skipped 2 lines |
144 | 150 | | style = link.attrs.get('style', '') |
145 | 151 | | # Generic filter of links to the post itself, catches videos, photos, and the date link |
146 | 152 | | if style != '': |
147 | | - | imageUrls = re.findall('url\(\'(.*?)\'\)', style) |
| 153 | + | imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) |
148 | 154 | | if len(imageUrls) == 1: |
149 | 155 | | media.append(Photo(url = imageUrls[0])) |
150 | 156 | | continue |
151 | 157 | | if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): |
152 | 158 | | style = link.attrs.get('style', '') |
153 | | - | imageUrls = re.findall('url\(\'(.*?)\'\)', style) |
| 159 | + | imageUrls = _STYLE_MEDIA_URL_PATTERN.findall(style) |
154 | 160 | | if len(imageUrls) == 1: |
155 | 161 | | media.append(Photo(url = imageUrls[0])) |
156 | 162 | | # resp = self._get(image[0]) |
| skipped 10 lines |
167 | 173 | | if (href not in outlinks) and (href != rawUrl) and (href != forwardedUrl): |
168 | 174 | | outlinks.append(href) |
169 | 175 | | |
170 | | - | for voice_player in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): |
171 | | - | audioUrl = voice_player.find('audio')['src'] |
172 | | - | durationStr = voice_player.find('time').text.split(':') |
| 176 | + | for voicePlayer in post.find_all('a', {'class': 'tgme_widget_message_voice_player'}): |
| 177 | + | audioUrl = voicePlayer.find('audio')['src'] |
| 178 | + | durationStr = voicePlayer.find('time').text |
173 | 179 | | duration = durationStrToSeconds(durationStr) |
174 | | - | barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voice_player.find('div', {'class': 'bar'}).find_all('s')] |
| 180 | + | barHeights = [float(s['style'].split(':')[-1].strip(';%')) for s in voicePlayer.find('div', {'class': 'bar'}).find_all('s')] |
175 | 181 | | |
176 | 182 | | media.append(VoiceMessage(url = audioUrl, duration = duration, bars = barHeights)) |
177 | 183 | | |
178 | | - | for video_player in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): |
179 | | - | iTag = video_player.find('i') |
| 184 | + | for videoPlayer in post.find_all('a', {'class': 'tgme_widget_message_video_player'}): |
| 185 | + | iTag = videoPlayer.find('i') |
180 | 186 | | if iTag is None: |
181 | 187 | | videoUrl = None |
182 | 188 | | videoThumbnailUrl = None |
183 | 189 | | else: |
184 | 190 | | style = iTag['style'] |
185 | | - | videoThumbnailUrl = re.findall('url\(\'(.*?)\'\)', style)[0] |
186 | | - | videoTag = video_player.find('video') |
187 | | - | if videoTag is None: |
188 | | - | videoUrl = None |
189 | | - | else: |
190 | | - | videoUrl = videoTag['src'] |
| 191 | + | videoThumbnailUrl = _STYLE_MEDIA_URL_PATTERN.findall(style)[0] |
| 192 | + | videoTag = videoPlayer.find('video') |
| 193 | + | videoUrl = None if videoTag is None else videoTag['src'] |
191 | 194 | | mKwargs = { |
192 | 195 | | 'thumbnailUrl': videoThumbnailUrl, |
193 | 196 | | 'url': videoUrl, |
194 | 197 | | } |
195 | | - | timeTag = video_player.find('time') |
| 198 | + | timeTag = videoPlayer.find('time') |
196 | 199 | | if timeTag is None: |
197 | 200 | | cls = Gif |
198 | 201 | | else: |
199 | 202 | | cls = Video |
200 | | - | durationStr = video_player.find('time').text.split(':') |
| 203 | + | durationStr = videoPlayer.find('time').text |
201 | 204 | | mKwargs['duration'] = durationStrToSeconds(durationStr) |
202 | 205 | | media.append(cls(**mKwargs)) |
203 | 206 | | |
| skipped 17 lines |
221 | 224 | | outlinks.remove(kwargs['href']) |
222 | 225 | | |
223 | 226 | | viewsSpan = post.find('span', class_ = 'tgme_widget_message_views') |
224 | | - | if viewsSpan is None: |
225 | | - | views = None |
226 | | - | else: |
227 | | - | views = parse_num(viewsSpan.text) |
| 227 | + | views = None if viewsSpan is None else parse_num(viewsSpan.text) |
228 | 228 | | |
229 | 229 | | yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, mentions = mentions, hashtags = hashtags, linkPreview = linkPreview, media = media, forwarded = forwarded, forwardedUrl = forwardedUrl, views = views) |
230 | 230 | | |
| skipped 87 lines |
318 | 318 | | return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) |
319 | 319 | | elif s.endswith('K'): |
320 | 320 | | return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) |
321 | | - | else: |
322 | | - | return int(s), 1 |
| 321 | + | return int(s), 1 |
323 | 322 | | |
324 | 323 | | def durationStrToSeconds(durationStr): |
325 | | - | return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationStr))]) |
| 324 | + | durationList = durationStr.split(':') |
| 325 | + | return sum([int(s) * int(g) for s, g in zip([1, 60, 360], reversed(durationList))]) |
326 | 326 | | |
327 | 327 | | def telegramResponseOkCallback(r): |
328 | 328 | | if r.status_code == 200: |
329 | 329 | | return (True, None) |
330 | | - | elif r.status_code // 100 == 5: |
331 | | - | return (False, f'status code: {r.status_code}') |
332 | | - | else: |
333 | | - | return (False, None) |
| 330 | + | return (False, f'{r.status_code=}') |
| 331 | + | |