| 1 | + | import os |
| 2 | + | import re |
| 3 | + | from collections import defaultdict |
| 4 | + | from dataclasses import dataclass |
| 5 | + | from enum import IntEnum |
| 6 | + | from typing import List, Set, Tuple |
| 7 | + | from zipfile import BadZipFile, ZipFile |
| 8 | + | |
| 9 | + | import easyargs |
| 10 | + | from colorama import Fore, Style |
| 11 | + | from jawa.classloader import ClassLoader |
| 12 | + | from jawa.constants import InterfaceMethodRef, MethodReference |
| 13 | + | from jawa.methods import Method |
| 14 | + | from tqdm import tqdm |
| 15 | + | |
| 16 | + | |
| 17 | + | class JVMOpcodes(IntEnum): |
| 18 | + | INVOKEVIRTUAL = 182 |
| 19 | + | INVOKESPECIAL = 183 |
| 20 | + | INVOKESTATIC = 184 |
| 21 | + | INVOKEINTERFACE = 185 |
| 22 | + | INVOKEDYNAMIC = 186 |
| 23 | + | |
| 24 | + | |
| 25 | + | INVOKE_OPCODES = { |
| 26 | + | JVMOpcodes.INVOKEVIRTUAL, |
| 27 | + | JVMOpcodes.INVOKESPECIAL, |
| 28 | + | JVMOpcodes.INVOKESTATIC, |
| 29 | + | JVMOpcodes.INVOKEINTERFACE, |
| 30 | + | JVMOpcodes.INVOKEDYNAMIC, |
| 31 | + | } |
| 32 | + | |
| 33 | + | |
| 34 | + | @dataclass(frozen=True) |
| 35 | + | class CallTarget: |
| 36 | + | class_name: str |
| 37 | + | method_name: str |
| 38 | + | method_type: str |
| 39 | + | |
| 40 | + | |
| 41 | + | def print_file_name(filename): |
| 42 | + | tqdm.write(f"\n\n\nProcessing .jar file:\n{filename}") |
| 43 | + | |
| 44 | + | |
| 45 | + | def print_caller_results(class_name, calling_methods_by_target): |
| 46 | + | if calling_methods_by_target: |
| 47 | + | # collapse method types: |
| 48 | + | calling_methods_collapsed = { |
| 49 | + | CallTarget(ct.class_name, ct.method_name, ""): calling_methods_by_target[ct] |
| 50 | + | for ct in calling_methods_by_target |
| 51 | + | } |
| 52 | + | |
| 53 | + | tqdm.write(f"\nClass: {Fore.GREEN}{class_name}{Style.RESET_ALL}") |
| 54 | + | tqdm.write("Vulnerable call | Methods") |
| 55 | + | tqdm.write("----------------+---------") |
| 56 | + | for callee, callers in calling_methods_collapsed.items(): |
| 57 | + | tqdm.write( |
| 58 | + | Fore.RED |
| 59 | + | + f"{callee.method_name:15}" |
| 60 | + | + Style.RESET_ALL |
| 61 | + | + " | " |
| 62 | + | + ", ".join(callers) |
| 63 | + | ) |
| 64 | + | |
| 65 | + | |
| 66 | + | def print_matching_classes(classes): |
| 67 | + | tqdm.write(f"\nClasses found:") |
| 68 | + | tqdm.write(", ".join([Fore.GREEN + c + Style.RESET_ALL for c in classes])) |
| 69 | + | |
| 70 | + | |
| 71 | + | def jar_quickmatch(filename, match_string): |
| 72 | + | try: |
| 73 | + | with ZipFile(filename) as jarfile: |
| 74 | + | classes = [name for name in jarfile.namelist() if name.endswith(".class")] |
| 75 | + | for c in classes: |
| 76 | + | classfile_content = jarfile.read(c) |
| 77 | + | if match_string in classfile_content: |
| 78 | + | return True |
| 79 | + | return False |
| 80 | + | except (IOError, BadZipFile): |
| 81 | + | tqdm.write(f"Could not open file: {filename}") |
| 82 | + | |
| 83 | + | |
| 84 | + | class XrefAnalysis: |
| 85 | + | def __init__(self, filename, class_regex, method_regex, caller_block): |
| 86 | + | self.class_regex_compiled = re.compile(class_regex) |
| 87 | + | self.method_regex_compiled = re.compile(method_regex) |
| 88 | + | self.caller_block_compiled = re.compile(caller_block) |
| 89 | + | self.class_loader = ClassLoader(filename) |
| 90 | + | self.methods, self.callers = self.traverse(self.class_loader) |
| 91 | + | |
| 92 | + | def get_matching_classes(self): |
| 93 | + | classes = { |
| 94 | + | class_name |
| 95 | + | for class_name in self.class_loader.classes |
| 96 | + | if self.class_regex_compiled.match(class_name) |
| 97 | + | } |
| 98 | + | return classes |
| 99 | + | |
| 100 | + | def get_calling_classes(self): |
| 101 | + | calling_classes = set() |
| 102 | + | for callee, caller_set in self.callers.items(): |
| 103 | + | if self.class_regex_compiled.match( |
| 104 | + | callee.class_name |
| 105 | + | ) and self.method_regex_compiled.match(callee.method_name): |
| 106 | + | calling_classes |= caller_set |
| 107 | + | return calling_classes |
| 108 | + | |
| 109 | + | @staticmethod |
| 110 | + | def method_ref_to_call_target(method_ref): |
| 111 | + | if method_ref and isinstance(method_ref, (MethodReference, InterfaceMethodRef)): |
| 112 | + | return CallTarget( |
| 113 | + | method_ref.class_.name.value, |
| 114 | + | method_ref.name_and_type.name.value, |
| 115 | + | method_ref.name_and_type.descriptor.value, |
| 116 | + | ) |
| 117 | + | return None |
| 118 | + | |
| 119 | + | def traverse(self, classloader: ClassLoader): |
| 120 | + | call_targets = {} |
| 121 | + | methods = {} |
| 122 | + | callers = defaultdict(set) |
| 123 | + | for class_name in classloader.classes: |
| 124 | + | try: |
| 125 | + | classloader[class_name] |
| 126 | + | except IndexError: |
| 127 | + | continue |
| 128 | + | ( |
| 129 | + | call_targets[class_name], |
| 130 | + | methods[class_name], |
| 131 | + | ) = self.summarize_class(classloader[class_name]) |
| 132 | + | |
| 133 | + | for class_name, class_call_targets in call_targets.items(): |
| 134 | + | for call_target in class_call_targets: |
| 135 | + | callers[call_target].add(class_name) |
| 136 | + | |
| 137 | + | return methods, callers |
| 138 | + | |
| 139 | + | def summarize_class(self, classfile) -> Tuple[Set[CallTarget], List[Method]]: |
| 140 | + | class_callees: Set[CallTarget] = set() |
| 141 | + | for const in classfile.constants: |
| 142 | + | call_target = self.method_ref_to_call_target(const) |
| 143 | + | if call_target: |
| 144 | + | class_callees.add(call_target) |
| 145 | + | methods = list(classfile.methods) |
| 146 | + | return class_callees, methods |
| 147 | + | |
| 148 | + | def analyze_class(self, classname): |
| 149 | + | all_xrefs = set() |
| 150 | + | xref_constants = defaultdict(set) |
| 151 | + | calling_methods_by_target = defaultdict(set) |
| 152 | + | for method in self.methods[classname]: |
| 153 | + | new_xrefs = callsites_in_method(method) |
| 154 | + | if not new_xrefs: |
| 155 | + | continue |
| 156 | + | for xref in new_xrefs: |
| 157 | + | xref_constants[xref].add(method.name.value) |
| 158 | + | all_xrefs |= new_xrefs |
| 159 | + | |
| 160 | + | interesting_xrefs = {} |
| 161 | + | for xref in all_xrefs: |
| 162 | + | call_target = self.method_ref_to_call_target( |
| 163 | + | self.class_loader[classname].constants.get(xref) |
| 164 | + | ) |
| 165 | + | if call_target: |
| 166 | + | if self.class_regex_compiled.match( |
| 167 | + | call_target.class_name |
| 168 | + | ) and self.method_regex_compiled.match(call_target.method_name): |
| 169 | + | interesting_xrefs[xref] = call_target |
| 170 | + | |
| 171 | + | for xref in interesting_xrefs: |
| 172 | + | calling_methods_by_target[interesting_xrefs[xref]] |= xref_constants[xref] |
| 173 | + | return calling_methods_by_target |
| 174 | + | |
| 175 | + | |
| 176 | + | def callsites_in_method(method: Method): |
| 177 | + | if not method.code: |
| 178 | + | return |
| 179 | + | method_code = method.code.disassemble() |
| 180 | + | return {op.operands[0].value for op in method_code if op.opcode in INVOKE_OPCODES} |
| 181 | + | |
| 182 | + | |
| 183 | + | def traverse_folder(root_dir, quickmatch_string, do_quickmatch): |
| 184 | + | if os.path.isdir(root_dir): |
| 185 | + | print(f"Walking {root_dir}...") |
| 186 | + | files_to_scan = [] |
| 187 | + | for root, dirs, files in os.walk(root_dir): |
| 188 | + | for f in files: |
| 189 | + | if f.endswith(".jar"): |
| 190 | + | full_name = os.path.join(root, f) |
| 191 | + | if not do_quickmatch or jar_quickmatch( |
| 192 | + | full_name, quickmatch_string |
| 193 | + | ): |
| 194 | + | files_to_scan.append(full_name) |
| 195 | + | return files_to_scan |
| 196 | + | elif os.path.isfile(root_dir): |
| 197 | + | return [root_dir] |
| 198 | + | return [] |
| 199 | + | |
| 200 | + | |
| 201 | + | def print_xrefs_analysis(xref_analysis, filename, caller_block): |
| 202 | + | first = True |
| 203 | + | for classname in xref_analysis.get_calling_classes(): |
| 204 | + | if caller_block.match(classname): |
| 205 | + | continue |
| 206 | + | calling_methods_by_target = xref_analysis.analyze_class(classname) |
| 207 | + | |
| 208 | + | if calling_methods_by_target: |
| 209 | + | if first: |
| 210 | + | first = False |
| 211 | + | print_file_name(filename) |
| 212 | + | print_caller_results(classname, calling_methods_by_target) |
| 213 | + | |
| 214 | + | |
| 215 | + | def print_class_existence(xref_analysis, filename): |
| 216 | + | matching_classes = xref_analysis.get_matching_classes() |
| 217 | + | if matching_classes: |
| 218 | + | print_file_name(filename) |
| 219 | + | print_matching_classes(matching_classes) |
| 220 | + | |
| 221 | + | |
| 222 | + | @easyargs |
| 223 | + | def run_scanner( |
| 224 | + | root_dir, |
| 225 | + | # regex for class name filtering |
| 226 | + | class_regex="(.*StringSubstitutor|.*StringLookup)", |
| 227 | + | # regex for method name filtering (ignored when looking for existence of classes) |
| 228 | + | method_regex="(lookup|replace|replaceIn)", |
| 229 | + | # if caller class matches this regex, it will *not* be displayed |
| 230 | + | caller_block=".*org/apache/commons/text", |
| 231 | + | # checking for existence of this string in classes unless no_quickmatch |
| 232 | + | quickmatch_string="StringSubstitutor", |
| 233 | + | # not set - looking for calls to specified methods, set - looking for existence of classes |
| 234 | + | class_existence=False, |
| 235 | + | # when set, do not do quick match |
| 236 | + | no_quickmatch=False, |
| 237 | + | ): |
| 238 | + | if not no_quickmatch: |
| 239 | + | tqdm.write(f"Precondition grep filter: {quickmatch_string}") |
| 240 | + | if class_existence: |
| 241 | + | tqdm.write(f"Looking for presence of classes: {class_regex}") |
| 242 | + | else: |
| 243 | + | tqdm.write( |
| 244 | + | f"Looking for calls to: class pattern {class_regex}, method name pattern: {method_regex}" |
| 245 | + | ) |
| 246 | + | tqdm.write("Scanning folder for .jar files") |
| 247 | + | |
| 248 | + | files_to_scan = traverse_folder( |
| 249 | + | root_dir, quickmatch_string.encode("utf-8"), not no_quickmatch |
| 250 | + | ) |
| 251 | + | |
| 252 | + | for filename in tqdm(files_to_scan): |
| 253 | + | try: |
| 254 | + | xref_analysis = XrefAnalysis( |
| 255 | + | filename, class_regex, method_regex, caller_block |
| 256 | + | ) |
| 257 | + | if class_existence: |
| 258 | + | print_class_existence( |
| 259 | + | xref_analysis, os.path.relpath(filename, root_dir) |
| 260 | + | ) |
| 261 | + | else: |
| 262 | + | print_xrefs_analysis( |
| 263 | + | xref_analysis, |
| 264 | + | os.path.relpath(filename, root_dir), |
| 265 | + | xref_analysis.caller_block_compiled, |
| 266 | + | ) |
| 267 | + | except ValueError as e: |
| 268 | + | tqdm.write(f"Parsing error in {filename}") |
| 269 | + | |
| 270 | + | |
| 271 | + | if __name__ == "__main__": |
| 272 | + | run_scanner() |
| 273 | + | |