Projects STRLCPY snscrape Commits 613395d1
🤬
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■ ■
    snscrape/modules/twitter.py
    skipped 2 lines
    3 3  import json
    4 4  import random
    5 5  import logging
     6 +import re
    6 7  import snscrape.base
    7 8  import typing
     9 +import urllib.parse
    8 10   
    9 11   
    10 12  logger = logging.getLogger(__name__)
    skipped 75 lines
    86 88  class TwitterSearchScraper(TwitterCommonScraper):
    87 89   name = 'twitter-search'
    88 90   
    89  - def __init__(self, query, maxPosition = None, **kwargs):
     91 + def __init__(self, query, cursor = None, **kwargs):
    90 92   super().__init__(**kwargs)
    91 93   self._query = query
    92  - self._maxPosition = maxPosition
     94 + self._cursor = cursor
     95 + self._userAgent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.{random.randint(0, 9999)} Safari/537.{random.randint(0, 99)}'
     96 + self._baseUrl = 'https://twitter.com/search?' + urllib.parse.urlencode({'f': 'live', 'lang': 'en', 'q': self._query, 'src': 'spelling_expansion_revert_click'})
     97 + 
     98 + def _get_guest_token(self):
     99 + logger.info(f'Retrieving guest token from search page')
     100 + r = self._get(self._baseUrl, headers = {'User-Agent': self._userAgent})
     101 + match = re.search(r'document\.cookie = decodeURIComponent\("gt=(\d+);', r.text)
     102 + if not match:
     103 + raise RuntimeError('Unable to find guest token')
     104 + return match.group(1)
    93 105   
    94  - def _get_feed_from_html(self, html, withMinPosition):
    95  - soup = bs4.BeautifulSoup(html, 'lxml')
    96  - feed = soup.find_all('li', 'js-stream-item')
    97  - if withMinPosition:
    98  - streamContainer = soup.find('div', 'stream-container')
    99  - if not streamContainer or not streamContainer.has_attr('data-min-position'):
    100  - if soup.find('div', 'SearchEmptyTimeline'):
    101  - # No results found
    102  - minPosition = None
    103  - else:
    104  - # Unknown error condition
    105  - raise RuntimeError('Unable to find min-position')
    106  - else:
    107  - minPosition = streamContainer['data-min-position']
    108  - else:
    109  - minPosition = None
    110  - return feed, minPosition
     106 + def _check_scroll_response(self, r):
     107 + if r.status_code == 429:
     108 + # Accept a 429 response as "valid" to prevent retries; handled explicitly in get_items
     109 + return True, None
     110 + if r.headers.get('content-type') != 'application/json;charset=utf-8':
     111 + return False, f'content type is not JSON'
     112 + if r.status_code != 200:
     113 + return False, f'non-200 status code'
     114 + return True, None
    111 115   
    112 116   def get_items(self):
    113  - headers = {'User-Agent': f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.{random.randint(1, 3500)}.{random.randint(1, 160)} Safari/537.36'}
    114  - 
    115  - # First page
    116  - if self._maxPosition is None:
    117  - logger.info(f'Retrieving search page for {self._query}')
    118  - r = self._get('https://twitter.com/search', params = {'f': 'tweets', 'vertical': 'default', 'lang': 'en', 'q': self._query, 'src': 'spxr', 'qf': 'off'}, headers = headers)
    119  - 
    120  - feed, maxPosition = self._get_feed_from_html(r.text, True)
    121  - if not feed:
    122  - logger.warning(f'No results for {self._query}')
    123  - return
    124  - yield from self._feed_to_items(feed)
    125  - else:
    126  - maxPosition = self._maxPosition
    127  - 
    128  - if not maxPosition:
    129  - return
    130  - 
     117 + headers = {
     118 + 'User-Agent': self._userAgent,
     119 + 'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs=1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
     120 + 'Referer': self._baseUrl,
     121 + }
     122 + guestToken = None
     123 + cursor = self._cursor
    131 124   while True:
    132  - logger.info(f'Retrieving scroll page {maxPosition}')
    133  - r = self._get('https://twitter.com/i/search/timeline',
    134  - params = {
    135  - 'f': 'tweets',
    136  - 'vertical': 'default',
    137  - 'lang': 'en',
    138  - 'q': self._query,
    139  - 'include_available_features': '1',
    140  - 'include_entities': '1',
    141  - 'reset_error_state': 'false',
    142  - 'src': 'spxr',
    143  - 'qf': 'off',
    144  - 'max_position': maxPosition,
    145  - },
    146  - headers = headers,
    147  - responseOkCallback = self._check_json_callback)
     125 + if not guestToken:
     126 + guestToken = self._get_guest_token()
     127 + headers['x-guest-token'] = guestToken
    148 128   
    149  - obj = json.loads(r.text)
    150  - feed, _ = self._get_feed_from_html(obj['items_html'], False)
    151  - if feed:
    152  - yield from self._feed_to_items(feed)
    153  - if obj['min_position'] == maxPosition:
    154  - return
    155  - maxPosition = obj['min_position']
     129 + logger.info(f'Retrieving scroll page {cursor}')
     130 + params = {
     131 + 'include_profile_interstitial_type': '1',
     132 + 'include_blocking': '1',
     133 + 'include_blocked_by': '1',
     134 + 'include_followed_by': '1',
     135 + 'include_want_retweets': '1',
     136 + 'include_mute_edge': '1',
     137 + 'include_can_dm': '1',
     138 + 'include_can_media_tag': '1',
     139 + 'skip_status': '1',
     140 + 'cards_platform': 'Web-12',
     141 + 'include_cards': '1',
     142 + 'include_composer_source': 'true',
     143 + 'include_ext_alt_text': 'true',
     144 + 'include_reply_count': '1',
     145 + 'tweet_mode': 'extended',
     146 + 'include_entities': 'true',
     147 + 'include_user_entities': 'true',
     148 + 'include_ext_media_color': 'true',
     149 + 'include_ext_media_availability': 'true',
     150 + 'send_error_codes': 'true',
     151 + 'simple_quoted_tweets': 'true',
     152 + 'q': self._query,
     153 + 'tweet_search_mode': 'live',
     154 + 'count': '100',
     155 + 'query_source': 'spelling_expansion_revert_click',
     156 + }
     157 + if cursor:
     158 + params['cursor'] = cursor
     159 + params['pc'] = '1'
     160 + params['spelling_corrections'] = '1'
     161 + params['ext'] = 'mediaStats%2CcameraMoment'
     162 + r = self._get('https://api.twitter.com/2/search/adaptive.json', params = params, headers = headers, responseOkCallback = self._check_scroll_response)
     163 + if r.status_code == 429:
     164 + guestToken = None
     165 + continue
     166 + try:
     167 + obj = r.json()
     168 + except json.JSONDecodeError as e:
     169 + logger.error(f'Received invalid JSON from Twitter: {e!s}')
     170 + raise RuntimeError('Received invalid JSON from Twitter') from e
     171 + 
     172 + # No data format test, just a hard and loud crash if anything's wrong :-)
     173 + newCursor = None
     174 + for instruction in obj['timeline']['instructions']:
     175 + if 'addEntries' in instruction:
     176 + entries = instruction['addEntries']['entries']
     177 + elif 'replaceEntry' in instruction:
     178 + entries = [instruction['replaceEntry']['entry']]
     179 + else:
     180 + continue
     181 + for entry in entries:
     182 + if entry['entryId'].startswith('sq-I-t-'):
     183 + tweet = obj['globalObjects']['tweets'][entry['content']['item']['content']['tweet']['id']]
     184 + tweetID = tweet['id']
     185 + content = tweet['full_text']
     186 + username = obj['globalObjects']['users'][tweet['user_id_str']]['screen_name']
     187 + date = datetime.datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo = datetime.timezone.utc)
     188 + outlinks = [u['expanded_url'] for u in tweet['entities']['urls']]
     189 + tcooutlinks = [u['url'] for u in tweet['entities']['urls']]
     190 + url = f'https://twitter.com/{username}/status/{tweetID}'
     191 + yield Tweet(url, date, content, tweetID, username, outlinks, ' '.join(outlinks), tcooutlinks, ' '.join(tcooutlinks))
     192 + elif entry['entryId'] == 'sq-cursor-bottom':
     193 + newCursor = entry['content']['operation']['cursor']['value']
     194 + if not newCursor or newCursor == cursor:
     195 + # End of pagination
     196 + break
     197 + cursor = newCursor
    156 198   
    157 199   @classmethod
    158 200   def setup_parser(cls, subparser):
    159  - subparser.add_argument('--max-position', metavar = 'POSITION', dest = 'maxPosition')
     201 + subparser.add_argument('--cursor', metavar = 'CURSOR')
    160 202   subparser.add_argument('query', help = 'A Twitter search string')
    161 203   
    162 204   @classmethod
    163 205   def from_args(cls, args):
    164  - return cls(args.query, maxPosition = args.maxPosition, retries = args.retries)
     206 + return cls(args.query, cursor = args.cursor, retries = args.retries)
    165 207   
    166 208   
    167 209  class TwitterUserScraper(TwitterSearchScraper):
    skipped 162 lines
Please wait...
Page is in error, reload to recover