IW4MAdminDBParser/parse_db.py
2024-03-03 04:42:35 -05:00

840 lines
26 KiB
Python

'''
Title: parse_db.py
Created by: Ahrimdon (aided by GPT-4)
Description: This script parses key elements of IW4MAdmin's database into a single consolidated, easy to read database that allows for easy server administration and logging.
'''
import sqlite3
import argparse
import time
import json
import os
import re
def setup_argparser():
parser = argparse.ArgumentParser(description="Accurately parses IW4MAdmin's database into a consolidated, easy-to-read format.")
parser.add_argument('-s', '--sort', type=str, choices=['ASC', 'DESC'], default='DESC', help="Sort order for the query results (ASC or DESC).")
parser.add_argument('-p', '--path', type=str, default="Database.db", help="Path to the IW4MAdmin database file.")
parser.add_argument('-o', '--output', type=str, help="Output directory for the parsed database file.")
parser.add_argument('-t', '--time', action='store_true', help="Time the script's execution.")
parser.add_argument('-c', '--config', action='store_true', help="Use default config.json for configuration.")
return parser.parse_args()
def delete_existing_db(output_db_path):
"""Check for an existing parsed DB and delete it if found."""
if os.path.exists(output_db_path):
os.remove(output_db_path)
print(f"Existing parsed database at {output_db_path} has been removed.")
def load_or_create_config():
config_path = "config.json" # Always use config.json
default_config = {
"tables": {
"AuditLog": 1000,
"ClientConnectionHistory": 1000,
"ClientMessages": 1000,
"Clients": 1000,
"IPAddresses": 1000,
"InboxMessagesModified": 1000,
"Maps": 1000,
"Metadata": 1000,
"Penalties": 1000,
"PenaltyIdentifiers": 1000,
"Servers": 1000,
}
}
if not os.path.isfile(config_path):
with open(config_path, 'w') as config_file:
json.dump(default_config, config_file, indent=4)
print(f"Default configuration file created at {config_path}")
else:
with open(config_path) as config_file:
return json.load(config_file)
return default_config
def main():
args = setup_argparser()
if args.time:
total_start_time = time.time() # Start timing for the entire script
config = {}
if args.config:
config = load_or_create_config()
db_path = args.path
output_dir = args.output if args.output else '.' # Use current directory if no output directory is specified
output_db_path = os.path.join(output_dir, "Database_parsed.db")
# Ensure the output directory exists
if not os.path.isdir(output_dir):
print(f"Creating output directory at {output_dir}")
os.makedirs(output_dir, exist_ok=True)
# Delete existing parsed database if it exists
delete_existing_db(output_db_path)
# Check if the specified Database.db exists
if not os.path.isfile(db_path):
print(f"No database file ({db_path}) found. Please ensure the file exists.")
return # Exit the script if file not found
existing_conn = sqlite3.connect(db_path)
existing_cur = existing_conn.cursor()
new_conn = sqlite3.connect(output_db_path)
new_cur = new_conn.cursor()
# Define a mapping of table names to their processing functions
table_functions = {
'AuditLog': audit_log_table,
'ClientConnectionHistory': client_connection_history_table,
'ClientMessages': client_messages_table,
'Clients': clients_table,
'IPAddresses': ip_address_table,
'InboxMessagesModified': inbox_messages_modified_table,
'Maps': maps_table,
'Metadata': metadata_table,
'Penalties': penalties_table,
'PenaltyIdentifiers': penalty_identifiers_table,
'Servers': servers_table,
}
sort_order = args.sort
if args.time:
sqlite_start_time = time.time() # Start timing for SQLite operations
# Process each table according to the configuration
for table_name, func in table_functions.items():
limit = config.get('tables', {}).get(table_name)
if limit is not None:
print(f"Processing {table_name} with limit: {limit}")
func(existing_cur, new_cur, limit=limit, sort_order=sort_order) # Assuming each function can accept a limit parameter
else:
print(f"Processing {table_name} without limit")
func(existing_cur, new_cur, sort_order=sort_order)
# Commit and close
new_conn.commit() # Commit changes before vacuuming
if args.time:
sqlite_end_time = time.time() # End timing for SQLite operations
print(f"SQLite execution time: {sqlite_end_time - sqlite_start_time:.4f} seconds")
# Vacuum process timing
if args.time:
vacuum_start_time = time.time() # Start timing for vacuuming
new_conn.execute("VACUUM;")
new_conn.commit()
if args.time:
vacuum_end_time = time.time() # End timing for vacuuming
print(f"Vacuuming duration: {vacuum_end_time - vacuum_start_time:.4f} seconds")
existing_conn.close()
new_conn.close()
if args.time:
total_end_time = time.time() # End timing for the entire script
print(f"Total script execution time: {total_end_time - total_start_time:.4f} seconds")
def audit_log_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE "AuditLog" (
"ChangeHistoryId" INTEGER NOT NULL,
"TypeOfChange" TEXT NOT NULL,
"Time" TEXT NOT NULL,
"Data" TEXT,
"Command" TEXT,
"Origin" TEXT,
"Target" TEXT,
CONSTRAINT "PK_AuditLog" PRIMARY KEY("ChangeHistoryId" AUTOINCREMENT)
)
""")
query = f"""
SELECT
EFChangeHistory.ChangeHistoryId,
EFChangeHistory.TypeOfChange,
EFChangeHistory.TimeChanged,
EFChangeHistory.Comment,
EFChangeHistory.CurrentValue,
EFChangeHistory.OriginEntityId,
EFChangeHistory.TargetEntityId
FROM
EFChangeHistory
ORDER BY ChangeHistoryId {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
client_name_map = {}
for row in rows:
origin_entity_id = row[5]
target_entity_id = row[6]
if origin_entity_id not in client_name_map:
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFClients
JOIN EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
WHERE
EFClients.ClientId = ?
""", (origin_entity_id,))
origin_name = existing_cur.fetchone()
if origin_name:
client_name_map[origin_entity_id] = origin_name[0].replace('^7', '')
else:
client_name_map[origin_entity_id] = 'Unknown'
if target_entity_id not in client_name_map:
if target_entity_id == 0:
client_name_map[target_entity_id] = None
else:
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFClients
JOIN EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
WHERE
EFClients.ClientId = ?
""", (target_entity_id,))
target_name = existing_cur.fetchone()
if target_name:
client_name_map[target_entity_id] = target_name[0].replace('^7', '')
else:
client_name_map[target_entity_id] = 'Unknown'
type_of_change_map = {0: "Console", 1: "Punishment", 2: "Client"}
type_of_change = type_of_change_map[row[1]]
new_row = (row[0], type_of_change, row[2], row[3], row[4], client_name_map[origin_entity_id], client_name_map[target_entity_id])
new_cur.execute("INSERT INTO \"AuditLog\" (ChangeHistoryId, TypeOfChange, Time, Data, Command, Origin, Target) VALUES (?, ?, ?, ?, ?, ?, ?)", new_row)
def client_connection_history_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE "ClientConnectionHistory" (
"ConnectionId" INTEGER NOT NULL,
"Client" TEXT NOT NULL,
"ConnectionTime" TEXT NOT NULL,
"ConnectionType" TEXT NOT NULL,
"Server" TEXT,
CONSTRAINT "PK_ClientConnectionHistory" PRIMARY KEY("ConnectionId" AUTOINCREMENT)
)
""")
query = f"""
SELECT
EFClientConnectionHistory.ClientConnectionId,
EFClientConnectionHistory.ClientId,
EFClientConnectionHistory.CreatedDateTime,
EFClientConnectionHistory.ConnectionType,
EFClientConnectionHistory.ServerId
FROM
EFClientConnectionHistory
ORDER BY ClientConnectionId {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
for row in rows:
client_id = row[1]
server_id = row[4]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFClients
JOIN EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
WHERE
EFClients.ClientId = ?
""", (client_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
existing_cur.execute("""
SELECT
EFServers.HostName
FROM
EFServers
WHERE
EFServers.ServerId = ?
""", (server_id,))
server_hostname = existing_cur.fetchone()
if server_hostname:
server_hostname = server_hostname[0]
else:
server_hostname = 'Unknown'
connection_type_map = {0: "Connect", 1: "Disconnect"}
connection_type = connection_type_map[row[3]]
new_row = (row[0], client_name, row[2], connection_type, server_hostname)
new_cur.execute("INSERT INTO ClientConnectionHistory (ConnectionId, Client, ConnectionTime, ConnectionType, Server) VALUES (?, ?, ?, ?, ?)", new_row)
def client_messages_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE "ClientMessages" (
"MessageId" INTEGER NOT NULL,
"Client" TEXT NOT NULL,
"Message" TEXT NOT NULL,
"TimeSent" TEXT NOT NULL,
"Server" TEXT,
CONSTRAINT "PK_ClientMessages" PRIMARY KEY("MessageId" AUTOINCREMENT)
)
""")
query = f"""
SELECT
EFClientMessages.MessageId,
EFClientMessages.ClientId,
EFClientMessages.Message,
EFClientMessages.TimeSent,
EFClientMessages.ServerId
FROM
EFClientMessages
ORDER BY MessageId {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
for row in rows:
client_id = row[1]
server_id = row[4]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFClients
JOIN EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
WHERE
EFClients.ClientId = ?
""", (client_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
existing_cur.execute("""
SELECT
EFServers.HostName
FROM
EFServers
WHERE
EFServers.ServerId = ?
""", (server_id,))
server_hostname = existing_cur.fetchone()
if server_hostname:
server_hostname = server_hostname[0]
else:
server_hostname = 'Unknown'
new_row = (row[0], client_name, row[2], row[3], server_hostname)
new_cur.execute("INSERT INTO ClientMessages (MessageId, Client, Message, TimeSent, Server) VALUES (?, ?, ?, ?, ?)", new_row)
def clients_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE "Clients" (
"Connections" INTEGER NOT NULL,
"Name" TEXT NOT NULL,
"FirstConnection" TEXT NOT NULL,
"Game" TEXT NOT NULL,
"LastConnection" TEXT NOT NULL,
"Level" TEXT NOT NULL,
"Masked" INTEGER NOT NULL,
"TotalConnectionTime" INTEGER NOT NULL,
"IP" TEXT
)
""")
query = f"""
SELECT
EFClients.Connections,
EFClients.CurrentAliasId,
EFClients.FirstConnection,
EFClients.GameName,
EFClients.LastConnection,
EFClients.Level,
EFClients.Masked,
EFClients.TotalConnectionTime,
EFAlias.SearchableIPAddress
FROM
EFClients
JOIN
EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
ORDER BY LastConnection {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
for row in rows:
connections = row[0]
current_alias_id = row[1]
first_connection = row[2]
game_name = row[3]
last_connection = row[4]
level = row[5]
masked = row[6]
total_connection_time = row[7]
ip_address = row[8]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
WHERE
EFAlias.AliasId = ?
""", (current_alias_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
level_map = {-1: "Banned", 0: "User", 1: "Trusted", 2: "Moderator", 3: "Administrator", 4: "Senior Administrator", 5: "Owner", 6: "Creator", 7: "Console"}
level = level_map.get(level, f"Unknown Level ({level})")
game_map = {5: "WaW", 6: "BO", 7: "BO2", 3: "MW3"}
game = game_map.get(game_name, f"Unknown Game ({game_name})")
new_row = (connections, client_name, first_connection, game, last_connection, level, masked, total_connection_time, ip_address)
new_cur.execute("INSERT INTO Clients (Connections, Name, FirstConnection, Game, LastConnection, Level, Masked, TotalConnectionTime, IP) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", new_row)
def ip_address_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
def fetch_client_info(src_cur):
src_cur.execute("""
SELECT Name, SearchableIPAddress, DateAdded FROM EFAlias
""")
client_info = []
for row in src_cur.fetchall():
name = row[0].replace('^7', '')
client_info.append((name, row[1], row[2]))
return client_info
client_info = fetch_client_info(existing_cur)
new_cur.execute("""
CREATE TABLE IF NOT EXISTS "IPAddresses" (
Name TEXT NOT NULL,
SearchableIPAddress TEXT,
DateAdded TEXT NOT NULL
)
""")
new_cur.executemany("""
INSERT INTO "IPAddresses" (
Name, SearchableIPAddress, DateAdded
) VALUES (?, ?, ?)
""", client_info)
def inbox_messages_modified_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE InboxMessagesModified (
InboxMessageId INTEGER PRIMARY KEY,
Created TEXT,
Origin TEXT,
Target TEXT,
ServerId INTEGER,
Message TEXT,
Read TEXT
)
""")
existing_cur.execute("SELECT * FROM InboxMessages")
inbox_messages = existing_cur.fetchall()
for msg in inbox_messages:
msg_id, created, _, source_client_id, dest_client_id, server_id, message, is_delivered = msg
for client_id in (source_client_id, dest_client_id):
existing_cur.execute("SELECT CurrentAliasId FROM EFClients WHERE ClientId = ?", (client_id,))
alias_id = existing_cur.fetchone()[0]
existing_cur.execute("SELECT Name FROM EFAlias WHERE AliasId = ?", (alias_id,))
name = existing_cur.fetchone()[0].replace('^7', '')
if client_id == source_client_id:
origin = name
else:
target = name
read = "Yes" if is_delivered == 1 else "No"
new_cur.execute("""
INSERT INTO InboxMessagesModified (InboxMessageId, Created, Origin, Target, ServerId, Message, Read)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (msg_id, created, origin, target, server_id, message, read))
def maps_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE IF NOT EXISTS "Maps" (
"MapId" INTEGER NOT NULL,
"CreatedDateTime" TEXT NOT NULL,
"Name" TEXT NOT NULL,
"Game" TEXT NOT NULL,
CONSTRAINT "PK_Maps" PRIMARY KEY("MapId" AUTOINCREMENT)
)
""")
query = f"""
SELECT
MapId, CreatedDateTime, Name, Game
FROM
EFMaps
ORDER BY MapId {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
modified_rows = []
for row in rows:
game_map = {5: "WaW", 6: "BO", 7: "BO2", 3: "MW3"}
game = game_map.get(row[3], f"Unknown Game ({row[3]})")
modified_rows.append((row[0], row[1], row[2], game))
new_cur.executemany("""
INSERT INTO "Maps" (
MapId, CreatedDateTime, Name, Game
) VALUES (?, ?, ?, ?)
""", modified_rows)
def metadata_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE "Metadata" (
"MetaId" INTEGER NOT NULL,
"Name" TEXT NOT NULL,
"Timestamp" TEXT NOT NULL,
"Note" TEXT NOT NULL,
"Value" TEXT NOT NULL
)
""")
query = f"""
SELECT
EFMeta.MetaId,
EFMeta.ClientId,
EFMeta.Created,
EFMeta.Key,
EFMeta.Value
FROM
EFMeta
ORDER BY MetaId {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
for row in rows:
meta_id = row[0]
client_id = row[1]
created = row[2]
key = row[3]
value = row[4]
existing_cur.execute("""
SELECT
EFClients.CurrentAliasId
FROM
EFClients
WHERE
EFClients.ClientId = ?
""", (client_id,))
current_alias_id = existing_cur.fetchone()
if current_alias_id:
current_alias_id = current_alias_id[0]
else:
current_alias_id = None
if current_alias_id:
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
WHERE
EFAlias.AliasId = ?
""", (current_alias_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
else:
client_name = 'Unknown'
new_row = (meta_id, client_name, created, key, value)
new_cur.execute("INSERT INTO Metadata (MetaId, Name, Timestamp, Note, Value) VALUES (?, ?, ?, ?, ?)", new_row)
def penalties_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE "Penalties" (
"PenaltyId" INTEGER NOT NULL,
"AutomatedOffense" TEXT NOT NULL,
"Expires" INTEGER, -- This line is modified to allow NULL values
"EvadedOffense" TEXT NOT NULL,
"Offender" TEXT NOT NULL,
"Offense" TEXT NOT NULL,
"Punisher" TEXT NOT NULL,
"Type" TEXT NOT NULL,
"Timestamp" INTEGER NOT NULL
)
""")
query = f"""
SELECT
EFPenalties.PenaltyId,
EFPenalties.AutomatedOffense,
EFPenalties.Expires,
EFPenalties.IsEvadedOffense,
EFPenalties.OffenderId,
EFPenalties.Offense,
EFPenalties.PunisherId,
EFPenalties.Type,
EFPenalties."When"
FROM
EFPenalties
ORDER BY PenaltyId {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
for row in rows:
penalty_id = row[0]
automated_offense = row[1]
expires = row[2]
evaded_offense = row[3]
offender_id = row[4]
offense = row[5]
punisher_id = row[6]
penalty_type = row[7]
timestamp = row[8]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
INNER JOIN
EFClients ON EFAlias.AliasId = EFClients.CurrentAliasId
WHERE
EFClients.ClientId = ?
""", (offender_id,))
offender_name = existing_cur.fetchone()
if offender_name:
offender_name = offender_name[0].replace('^7', '')
else:
offender_name = 'Unknown'
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
INNER JOIN
EFClients ON EFAlias.AliasId = EFClients.CurrentAliasId
WHERE
EFClients.ClientId = ?
""", (punisher_id,))
punisher_name = existing_cur.fetchone()
if punisher_name:
punisher_name = punisher_name[0].replace('^7', '')
else:
punisher_name = 'Unknown'
type_map = {0: "Report", 1: "Warning", 2: "Flag", 3: "Kick", 4: "Temp Ban", 5: "Perm Ban", 6: "Unban", 8: "Unflag"}
penalty_type = type_map.get(penalty_type, f"Unknown Type ({penalty_type})")
automated_offense_patterns = [r"VPNs are not allowed",
r"Ping is too high!",
r"name is not allowed"]
for pattern in automated_offense_patterns:
if re.search(pattern, offense):
automated_offense = "Yes"
break
else:
automated_offense = "No"
evaded_offense = "Yes" if evaded_offense == 1 else "No"
expires = "Never" if expires is None else expires
new_cur.execute("""
INSERT INTO Penalties (
PenaltyId,
AutomatedOffense,
Expires,
EvadedOffense,
Offender,
Offense,
Punisher,
Type,
Timestamp
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (penalty_id, automated_offense, expires, evaded_offense, offender_name, offense, punisher_name, penalty_type, timestamp))
def penalty_identifiers_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE "PenaltyIdentifiers" (
"PenaltyIdentifierId" INTEGER NOT NULL,
"PenaltyId" INTEGER NOT NULL,
"Created" TEXT NOT NULL,
"Client" TEXT NOT NULL
)
""")
query = f"""
SELECT
EFPenaltyIdentifiers.PenaltyIdentifierId,
EFPenaltyIdentifiers.PenaltyId,
EFPenaltyIdentifiers.CreatedDateTime,
EFPenaltyIdentifiers.NetworkId
FROM
EFPenaltyIdentifiers
ORDER BY PenaltyIdentifierId {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
for row in rows:
penalty_identifier_id = row[0]
penalty_id = row[1]
created = row[2]
network_id = row[3]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
INNER JOIN
EFClients ON EFAlias.AliasId = EFClients.CurrentAliasId
WHERE
EFClients.NetworkId = ?
""", (network_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
new_cur.execute("""
INSERT INTO PenaltyIdentifiers (
PenaltyIdentifierId,
PenaltyId,
Created,
Client
)
VALUES (?, ?, ?, ?)
""", (penalty_identifier_id, penalty_id, created, client_name))
def servers_table(existing_cur, new_cur, limit=None, sort_order='DESC'):
new_cur.execute("""
CREATE TABLE "Servers" (
"ServerId" INTEGER NOT NULL,
"Active" INTEGER NOT NULL,
"Port" INTEGER NOT NULL,
"Endpoint" TEXT NOT NULL,
"Game" TEXT NOT NULL,
"ServerName" TEXT NOT NULL,
"Password" TEXT NOT NULL
)
""")
query = f"""
SELECT ServerId, Active, Port, Endpoint, GameName, HostName, IsPasswordProtected
FROM EFServers
ORDER BY ServerId {sort_order}
"""
if limit:
query += f" LIMIT {limit}"
existing_cur.execute(query)
rows = existing_cur.fetchall()
game_mapping = {5: "WaW", 7: "BO2", 6: "BO", 3: "MW3"}
for row in rows:
server_id = row[0]
active = row[1]
port = row[2]
endpoint = row[3]
game_name = row[4]
server_name = row[5]
is_password_protected = row[6]
game_name = game_mapping.get(game_name, game_name)
password = "Yes" if is_password_protected == 1 else "No"
new_cur.execute("""
INSERT INTO Servers (
ServerId,
Active,
Port,
Endpoint,
Game,
ServerName,
Password
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (server_id, active, port, endpoint, game_name, server_name, password))
if __name__ == '__main__':
main()