Projects STRLCPY wifi_db Commits 36f286be
🤬
  • ■ ■ ■ ■ ■ ■
    database_utils.py
    skipped 16 lines
    17 17   return database
    18 18   
    19 19   
     20 +def create_database(database, verbose):
     21 + '''Function to create the tables in the database'''
     22 + script_path = os.path.dirname(os.path.abspath(__file__))
     23 + path = script_path+'/wifi_db_database.sql'
     24 + db_file = open(path, 'r')
     25 + views = db_file.read()
     26 + try:
     27 + cursor = database.cursor()
     28 + for statement in views.split(';'):
     29 + if statement:
     30 + cursor.execute(statement + ';')
     31 + database.commit()
     32 + if verbose:
     33 + print("Database created")
     34 + except sqlite3.IntegrityError as error:
     35 + print("create_database" + str(error))
     36 + db_file.close()
     37 + 
     38 + 
     39 + 
     40 +def create_views(database, verbose):
     41 + '''Function to create the Views in the database'''
     42 + script_path = os.path.dirname(os.path.abspath(__file__))
     43 + path = script_path+'/view.sql'
     44 + views_file = open(path, 'r')
     45 + views = views_file.read()
     46 + try:
     47 + cursor = database.cursor()
     48 + # cursor.executemany(views)
     49 + for statement in views.split(';'):
     50 + if statement:
     51 + cursor.execute(statement + ';')
     52 + database.commit()
     53 + if verbose:
     54 + print("Views created")
     55 + except sqlite3.IntegrityError as error:
     56 + print("create_views" + str(error))
     57 + views_file.close()
     58 + 
     59 + 
     60 + 
    20 61  def insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier,
    21 62   encryption, packets_total, lat, lon, cloaked):
    22 63   ''''''
    skipped 259 lines
    282 323   return int(1)
    283 324   
    284 325   
    285  -def create_database(database, verbose):
    286  - '''Function to create the tables in the database'''
    287  - script_path = os.path.dirname(os.path.abspath(__file__))
    288  - path = script_path+'/wifi_db_database.sql'
    289  - views_file = open(path, 'r')
    290  - views = views_file.read()
    291  - try:
    292  - cursor = database.cursor()
    293  - for statement in views.split(';'):
    294  - if statement:
    295  - cursor.execute(statement + ';')
    296  - database.commit()
    297  - if verbose:
    298  - print("Database created")
    299  - except sqlite3.IntegrityError as error:
    300  - print("create_database" + str(error))
    301  - 
    302  - 
    303  -def create_views(database, verbose):
    304  - '''Function to create the Views in the database'''
    305  - script_path = os.path.dirname(os.path.abspath(__file__))
    306  - path = script_path+'/view.sql'
    307  - views_file = open(path, 'r')
    308  - views = views_file.read()
    309  - try:
    310  - cursor = database.cursor()
    311  - # cursor.executemany(views)
    312  - for statement in views.split(';'):
    313  - if statement:
    314  - cursor.execute(statement + ';')
    315  - database.commit()
    316  - if verbose:
    317  - print("Views created")
    318  - except sqlite3.IntegrityError as error:
    319  - print("create_views" + str(error))
    320  - 
    321  - 
    322 326  def set_hashcat(cursor, verbose, bssid, mac, file, hash):
    323 327   try:
    324 328   cursor.execute('''INSERT OR REPLACE INTO Handshake VALUES(?,?,?,?)''',
    skipped 4 lines
    329 333   return int(1)
    330 334   
    331 335   
    332  -def fake_lat(database, lat):
     336 +def set_fake_lat(cursor, verbose, lat):
    333 337   try:
    334  - cursor = database.cursor()
    335  - 
    336 338   sql = "UPDATE AP SET lat_t = " + lat
    337 339   cursor.execute(sql)
    338 340   sql = "UPDATE SeenAP SET lat = " + lat
    skipped 6 lines
    345 347   print("fake_lat" + str(error))
    346 348   
    347 349   
    348  -def fake_lon(database, lon):
     350 +def set_fake_lon(cursor, verbose, lon):
    349 351   try:
    350  - cursor = database.cursor()
    351  - 
    352 352   sql = "UPDATE AP SET lon_t = " + lon
    353 353   cursor.execute(sql)
    354 354   sql = "UPDATE SeenAP SET lon = " + lon
    skipped 6 lines
    361 361   print("fake_lon" + str(error))
    362 362   
    363 363   
    364  -# obfuscated the database AA:BB:CC:XX:XX:XX-DEFG
    365  -def obfuscatedDB(database):
     364 +# obfuscated the database AA:BB:CC:XX:XX:XX-DEFG, needs database and not cursos to commit
     365 +def obfuscateDB(database, verbose ):
    366 366   # APs!
    367 367   try:
    368 368   # Get all APs
     369 + if verbose:
     370 + print("obfuscated APs")
    369 371   cursor = database.cursor()
    370 372   sql = "SELECT bssid from AP; "
    371 373   cursor.execute(sql)
    skipped 11 lines
    383 385   database.commit()
    384 386   
    385 387   database.commit()
     388 + 
    386 389   except sqlite3.IntegrityError as error:
    387  - print("obfuscatedDB" + str(error))
     390 + print("obfuscateDB" + str(error))
    388 391  
    389 392   # Clients!
    390 393   try:
    391  - # Get all APs
     394 + # Get all Clients
     395 + if verbose:
     396 + print("obfuscated clients")
    392 397   cursor = database.cursor()
    393 398   sql = "SELECT mac from Client; "
    394 399   cursor.execute(sql)
    skipped 10 lines
    405 410   database.commit()
    406 411   
    407 412   database.commit()
     413 + return int(0)
    408 414   except sqlite3.IntegrityError as error:
    409  - print("obfuscatedDB" + str(error))
    410  - 
     415 + print("obfuscateDB" + str(error))
     416 + return int(1)
    411 417   
    412 418  # exists = '11:22:33:44:55:77' in whitelist
    413  -def clear_whitelist(database, whitelist):
     419 +def clear_whitelist(database, verbose, whitelist):
    414 420   with open(whitelist) as f:
    415 421   whitelist = f.read().splitlines()
    416 422   cursor = database.cursor()
    skipped 27 lines
  • ■ ■ ■ ■ ■ ■
    unit_test.py
     1 +import os
     2 +import sqlite3
     3 +import unittest
     4 +from database_utils import *
     5 + 
     6 +class TestFunctions(unittest.TestCase):
     7 + 
     8 + def setUp(self):
     9 + self.verbose = False
     10 + self.database_name = 'test_database.db'
     11 + self.database = connect_database(self.database_name, self.verbose)
     12 + create_database(self.database, self.verbose)
     13 + create_views(self.database, self.verbose)
     14 + self.c = self.database.cursor()
     15 + self.bssid = "00:11:22:33:44:55"
     16 + self.mac = "55:44:33:22:11:00"
     17 + 
     18 + def tearDown(self):
     19 + self.database.close()
     20 + os.remove(self.database_name)
     21 + 
     22 + def test_connect_database(self):
     23 + self.assertIsNotNone(self.database)
     24 + 
     25 + def test_insertAP(self):
     26 + essid = "Test_AP"
     27 + manuf = "Test_Manufacturer"
     28 + channel = "6"
     29 + freqmhz = "2437"
     30 + carrier = "test"
     31 + encryption = "WPA2"
     32 + packets_total = "10"
     33 + lat = "37.7749"
     34 + lon = "-122.4194"
     35 + cloaked = False
     36 + # Insert new AP
     37 + result = insertAP(self.c, self.verbose, self.bssid, essid, manuf, channel, freqmhz, carrier, encryption, packets_total, lat, lon, cloaked)
     38 + self.assertEqual(result, 0)
     39 + # TODO Insert existing AP with new values
     40 + #manuf = "Updated_Manufacturer"
     41 + #result = insertAP(self.c, False, bssid, essid, manuf, channel, freqmhz, carrier, encryption, packets_total, lat, lon, cloaked)
     42 + #self.assertEqual(result, 0)
     43 + #self.c.execute("SELECT * FROM AP WHERE bssid=?", (bssid,))
     44 + #rows = self.c.fetchall()
     45 + #self.assertEqual(len(rows), 1)
     46 + #self.assertEqual(rows[0][3], "Updated_Manufacturer")
     47 +
     48 + def test_insertClients(self):
     49 + ssid = "Test_AP"
     50 + manuf = "Test_Manufacturer"
     51 + packets_total = "10"
     52 + power = "-70"
     53 + # Insert new client
     54 + result = insertClients(self.c, self.verbose, self.mac, ssid, manuf, packets_total, power, "Misc")
     55 + self.assertEqual(result, 0)
     56 + # TODO Insert existing client with new values
     57 + #manuf = "Updated_Manufacturer"
     58 + #result = insertClients(self.c, False, mac, ssid, manuf, packets_total, power, "Misc")
     59 + #self.assertEqual(result, 0)
     60 + #self.c.execute("SELECT * FROM CLIENT WHERE mac=?", (mac,))
     61 + #rows = self.c.fetchall()
     62 + #self.assertEqual(len(rows), 1)
     63 + #self.assertEqual(rows[0][2], "Updated_Manufacturer")
     64 + 
     65 + def test_insertWPS(self):
     66 + # Define WPS parameters
     67 + wlan_ssid = "Test_SSID"
     68 + wps_version = "1.0"
     69 + wps_device_name = "Test_Device"
     70 + wps_model_name = "Test_Model"
     71 + wps_model_number = "12345"
     72 + wps_config_methods = "1234"
     73 + wps_config_methods_keypad = True
     74 +
     75 + # Insert new WPS
     76 + result = insertWPS(self.c, self.verbose, self.bssid, wlan_ssid, wps_version, wps_device_name, wps_model_name, wps_model_number, wps_config_methods, wps_config_methods_keypad)
     77 + self.assertEqual(result, 0)
     78 +
     79 + # TODO Insert existing WPS with new values
     80 + #wps_device_name = "Updated_Device"
     81 + #result = insertWPS(self.c, self.verbose, bssid, wlan_ssid, wps_version, wps_device_name, wps_model_name, wps_model_number, wps_config_methods, wps_config_methods_keypad)
     82 + #self.assertEqual(result, 0)
     83 + #self.c.execute("SELECT * FROM WPS WHERE wlan_ssid=?", (wlan_ssid,))
     84 + #rows = self.c.fetchall()
     85 + #self.assertEqual(len(rows), 1)
     86 + #self.assertEqual(rows[0][3], "Updated_Device")
     87 + 
     88 + def test_insertConnected(self):
     89 + # add needed data
     90 + essid = "Test_AP"
     91 + manuf = "Test_Manufacturer"
     92 + channel = "6"
     93 + freqmhz = "2437"
     94 + carrier = "test"
     95 + encryption = "WPA2"
     96 + packets_total = "10"
     97 + lat = "37.7749"
     98 + lon = "-122.4194"
     99 + cloaked = False
     100 + # Insert new AP
     101 + result = insertAP(self.c, self.verbose, self.bssid, essid, manuf, channel, freqmhz, carrier, encryption, packets_total, lat, lon, cloaked)
     102 + self.assertEqual(result, 0)
     103 + 
     104 + ssid = ""
     105 + manuf = "Test_Manufacturer"
     106 + packets_total = "10"
     107 + power = "-70"
     108 + # Insert new client
     109 + result = insertClients(self.c, self.verbose, self.mac, ssid, manuf, packets_total, power, "Misc")
     110 + self.assertEqual(result, 0)
     111 + 
     112 + # Insert new connected device
     113 + result = insertConnected(self.c, self.verbose, self.bssid, self.mac)
     114 + self.assertEqual(result, 0)
     115 + 
     116 + 
     117 + def test_insertHandshake(self):
     118 + path = "file://path"
     119 + result = insertHandshake(self.c, self.verbose, self.bssid, self.mac, path)
     120 + self.assertEqual(result, 0)
     121 + 
     122 + self.c.execute("SELECT * FROM handshake WHERE bssid=?", (self.bssid,))
     123 + rows = self.c.fetchall()
     124 + self.assertEqual(len(rows), 1)
     125 + self.assertEqual(rows[0][2], path)
     126 + 
     127 + def test_insertIdentity(self):
     128 + identity = "DOMAIN\\username"
     129 + result = insertIdentity(self.c, self.verbose, self.bssid, self.mac, identity)
     130 + self.assertEqual(result, 0)
     131 + self.c.execute("SELECT * FROM Identity WHERE mac=?", (self.mac,))
     132 + row = self.c.fetchone()
     133 + self.assertEqual(row[2], identity)
     134 + 
     135 + def test_insertSeenClient(self):
     136 + # add needed data
     137 + ssid = ""
     138 + manuf = "Test_Manufacturer"
     139 + packets_total = "10"
     140 + power = "-70"
     141 + # Insert new client
     142 + result = insertClients(self.c, self.verbose, self.mac, ssid, manuf, packets_total, power, "Misc")
     143 + 
     144 + # Insert seenClient
     145 + station = "Test_Station"
     146 + time = "2022-02-23 10:00:00"
     147 + tool = "aircrack-ng"
     148 + power = -50
     149 + lat = "37.7749"
     150 + lon = "-122.4194"
     151 + alt = "10000"
     152 + result = insertSeenClient(self.c, self.verbose, self.mac, time, tool, power, lat, lon, alt)
     153 + self.assertEqual(result, 0)
     154 + self.c.execute("SELECT * FROM SeenClient WHERE mac=?", (self.mac,))
     155 + row = self.c.fetchone()
     156 + self.assertEqual(row[1], time)
     157 + self.assertEqual(row[2], tool)
     158 + self.assertEqual(row[3], power)
     159 + 
     160 + 
     161 + def test_insertSeenAP(self):
     162 + # add needed data
     163 + essid = "Test_AP"
     164 + manuf = "Test_Manufacturer"
     165 + channel = "6"
     166 + freqmhz = "2437"
     167 + carrier = "test"
     168 + encryption = "WPA2"
     169 + packets_total = "10"
     170 + lat = "37.7749"
     171 + lon = "-122.4194"
     172 + cloaked = False
     173 + # Insert new AP
     174 + result = insertAP(self.c, self.verbose, self.bssid, essid, manuf, channel, freqmhz, carrier, encryption, packets_total, lat, lon, cloaked)
     175 + self.assertEqual(result, 0)
     176 + 
     177 + # Insert SeenAP
     178 + time = "2032-02-23 10:00:00"
     179 + tool = "aircrack-ng"
     180 + signal_rsi = "-70"
     181 + lat = "37.7749"
     182 + lon = "-122.4194"
     183 + alt = "10000"
     184 + bsstimestamp = "2032-02-23 10:00:00"
     185 + result = insertSeenAP(self.c, self.verbose, self.bssid, time, tool, signal_rsi, lat, lon, alt, bsstimestamp)
     186 + self.assertEqual(result, 0)
     187 + self.c.execute("SELECT * FROM SeenAP WHERE bssid=?", (self.bssid,))
     188 + row = self.c.fetchone()
     189 + self.assertEqual(row[1], time)
     190 + self.assertEqual(row[2], tool)
     191 + 
     192 + 
     193 + 
     194 + def test_set_hashcat(self):
     195 + # add needed data
     196 + essid = "Test_AP"
     197 + manuf = "Test_Manufacturer"
     198 + channel = "6"
     199 + freqmhz = "2437"
     200 + carrier = "test"
     201 + encryption = "WPA2"
     202 + packets_total = "10"
     203 + lat = "37.7749"
     204 + lon = "-122.4194"
     205 + cloaked = False
     206 + # Insert new AP
     207 + result = insertAP(self.c, self.verbose, self.bssid, essid, manuf, channel, freqmhz, carrier, encryption, packets_total, lat, lon, cloaked)
     208 + self.assertEqual(result, 0)
     209 + 
     210 + ssid = ""
     211 + manuf = "Test_Manufacturer"
     212 + packets_total = "10"
     213 + power = "-70"
     214 + # Insert new client
     215 + result = insertClients(self.c, self.verbose, self.mac, ssid, manuf, packets_total, power, "Misc")
     216 + self.assertEqual(result, 0)
     217 + 
     218 + # insert Handshake
     219 + path = "file://path"
     220 + 
     221 + result = insertHandshake(self.c, self.verbose, self.bssid, self.mac, path)
     222 + self.assertEqual(result, 0)
     223 + 
     224 + # Insert hashcat HASH
     225 + path = "file://path"
     226 + test_hashcat = "aa:bb:cc:dd:ee:ff:11:22:33:44:55:66:77"
     227 + result = set_hashcat(self.c, self.verbose, self.bssid, self.mac, path, test_hashcat)
     228 + self.assertEqual(result, 0)
     229 + self.c.execute("SELECT * FROM handshake WHERE bssid=?", (self.bssid,))
     230 + rows = self.c.fetchall()
     231 + self.assertEqual(rows[0][2], path)
     232 + 
     233 + def test_obfuscateDB(self):
     234 + # add needed data
     235 + essid = "Test_AP"
     236 + manufAP = "Test_Manufacturer"
     237 + channel = "6"
     238 + freqmhz = "2437"
     239 + carrier = "test"
     240 + encryption = "WPA2"
     241 + packets_total = "10"
     242 + lat = "37.7749"
     243 + lon = "-122.4194"
     244 + cloaked = False
     245 + # Insert new AP
     246 + result = insertAP(self.c, self.verbose, self.bssid, essid, manufAP, channel, freqmhz, carrier, encryption, packets_total, lat, lon, cloaked)
     247 + self.assertEqual(result, 0)
     248 + 
     249 + ssid = "null_ssid"
     250 + manufClient = "Test_Manufacturer"
     251 + packets_total = "10"
     252 + power = "-70"
     253 + # Insert new client
     254 + result = insertClients(self.c, self.verbose, self.mac, ssid, manufClient, packets_total, power, "Misc")
     255 + self.assertEqual(result, 0)
     256 + 
     257 + # insert Handshake
     258 + path = "file://path"
     259 + 
     260 + result = insertHandshake(self.c, self.verbose, self.bssid, self.mac, path)
     261 + self.assertEqual(result, 0)
     262 + 
     263 + #obfuscateDB
     264 + result = obfuscateDB(self.database, self.verbose)
     265 + self.assertEqual(result, 0)
     266 + 
     267 + #self.c.execute("SELECT * FROM handshake WHERE bssid=?", (self.bssid,))
     268 + self.c.execute("SELECT * FROM AP WHERE ssid=?", (essid,))
     269 + rows = self.c.fetchall()
     270 + # Same ESSID but different BSSID
     271 + self.assertEqual(rows[0][1], essid)
     272 + self.assertEqual(rows[0][3], manufAP)
     273 + self.assertEqual(rows[0][4], int(channel))
     274 + self.assertNotEqual(rows[0][0], self.bssid)
     275 + 
     276 + self.c.execute("SELECT * FROM CLIENT WHERE ssid=?", (ssid,))
     277 + rows = self.c.fetchall()
     278 + self.assertEqual(len(rows), 1)
     279 + self.assertEqual(rows[0][1], ssid)
     280 + self.assertEqual(rows[0][2], manufClient)
     281 + self.assertEqual(rows[0][3], packets_total)
     282 + 
  • ■ ■ ■ ■ ■ ■
    wifi_db.py
    skipped 147 lines
    148 148   # Cleat whitelist MACs
    149 149   script_path = os.path.dirname(os.path.abspath(__file__))
    150 150   database_utils.clear_whitelist(
    151  - database, script_path+'/whitelist.txt')
     151 + database, verbose, script_path+'/whitelist.txt')
    152 152   
    153 153   # if obfuscated
    154 154   if obfuscated:
    155 155   print("-o is enable, so obfuscate. This may take a while")
    156  - database_utils.obfuscatedDB(database)
     156 + database_utils.obfuscateDB(database, verbose)
    157 157   
    158 158   
    159 159  def process_capture(ouiMap, capture, database,
    skipped 41 lines
Please wait...
Page is in error, reload to recover