IW4MAdminDBParser/parse_db.py

730 lines
22 KiB
Python

'''
Title: parse_db.py
Created by: Ahrimdon (aided by GPT-4)
Description: This script parses key elements of IW4MAdmin's database into a single consolidated, easy to read database that allows for easy server administration and logging.
'''
import sqlite3
import argparse
import time
import os
import re
def setup_argparser():
parser = argparse.ArgumentParser(description="Accurately parses IW4MAdmin's database into a consolidated, easy-to-read format.")
parser.add_argument('-p', '--path', type=str, default="Database.db", help="Path to the IW4MAdmin database file.")
parser.add_argument('-o', '--output', type=str, help="Output directory for the parsed database file.")
parser.add_argument('-t', '--time', action='store_true', help="Time the script's execution.")
return parser.parse_args()
def delete_existing_db(output_db_path):
"""Check for an existing parsed DB and delete it if found."""
if os.path.exists(output_db_path):
os.remove(output_db_path)
print(f"Existing parsed database at {output_db_path} has been removed.")
def main():
args = setup_argparser()
if args.time:
start_time = time.time() # Start timing
db_path = args.path
output_dir = args.output if args.output else '.' # Use current directory if no output directory is specified
output_db_path = os.path.join(output_dir, "Database_parsed.db")
# Ensure the output directory exists
if not os.path.isdir(output_dir):
print(f"Creating output directory at {output_dir}")
os.makedirs(output_dir, exist_ok=True)
# Delete existing parsed database if it exists
delete_existing_db(output_db_path)
# Check if the specified Database.db exists
if not os.path.isfile(db_path):
print(f"No database file ({db_path}) found. Please ensure the file exists.")
return # Exit the script if file not found
existing_conn = sqlite3.connect(db_path)
existing_cur = existing_conn.cursor()
new_conn = sqlite3.connect(output_db_path)
new_cur = new_conn.cursor()
# Process each table
ip_address_table(existing_cur, new_cur)
audit_log_table(existing_cur, new_cur)
client_connection_table(existing_cur, new_cur)
messages_table(existing_cur, new_cur)
client_table(existing_cur, new_cur)
maps_table(existing_cur, new_cur)
meta_table(existing_cur, new_cur)
penalties_table(existing_cur, new_cur)
penalty_Identifiers_table(existing_cur, new_cur)
servers_table(existing_cur, new_cur)
inbox_messages_table(existing_cur, new_cur)
# Commit and close
new_conn.commit()
existing_conn.close()
new_conn.close()
if args.time:
end_time = time.time() # End timing
print(f"Script execution time: {end_time - start_time:.4f} seconds") # Print execution time
def ip_address_table(existing_cur, new_cur):
def fetch_client_info(src_cur):
src_cur.execute("""
SELECT Name, SearchableIPAddress, DateAdded FROM EFAlias
""")
client_info = []
for row in src_cur.fetchall():
name = row[0].replace('^7', '')
client_info.append((name, row[1], row[2]))
return client_info
client_info = fetch_client_info(existing_cur)
new_cur.execute("""
CREATE TABLE IF NOT EXISTS "IPAddresses" (
Name TEXT NOT NULL,
SearchableIPAddress TEXT,
DateAdded TEXT NOT NULL
)
""")
new_cur.executemany("""
INSERT INTO "IPAddresses" (
Name, SearchableIPAddress, DateAdded
) VALUES (?, ?, ?)
""", client_info)
def audit_log_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE "AuditLog" (
"ChangeHistoryId" INTEGER NOT NULL,
"TypeOfChange" TEXT NOT NULL,
"Time" TEXT NOT NULL,
"Data" TEXT,
"Command" TEXT,
"Origin" TEXT,
"Target" TEXT,
CONSTRAINT "PK_AuditLog" PRIMARY KEY("ChangeHistoryId" AUTOINCREMENT)
)
""")
existing_cur.execute("""
SELECT
EFChangeHistory.ChangeHistoryId,
EFChangeHistory.TypeOfChange,
EFChangeHistory.TimeChanged,
EFChangeHistory.Comment,
EFChangeHistory.CurrentValue,
EFChangeHistory.OriginEntityId,
EFChangeHistory.TargetEntityId
FROM
EFChangeHistory
""")
rows = existing_cur.fetchall()
client_name_map = {}
for row in rows:
origin_entity_id = row[5]
target_entity_id = row[6]
if origin_entity_id not in client_name_map:
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFClients
JOIN EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
WHERE
EFClients.ClientId = ?
""", (origin_entity_id,))
origin_name = existing_cur.fetchone()
if origin_name:
client_name_map[origin_entity_id] = origin_name[0].replace('^7', '')
else:
client_name_map[origin_entity_id] = 'Unknown'
if target_entity_id not in client_name_map:
if target_entity_id == 0:
client_name_map[target_entity_id] = None
else:
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFClients
JOIN EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
WHERE
EFClients.ClientId = ?
""", (target_entity_id,))
target_name = existing_cur.fetchone()
if target_name:
client_name_map[target_entity_id] = target_name[0].replace('^7', '')
else:
client_name_map[target_entity_id] = 'Unknown'
type_of_change_map = {0: "Console", 1: "Punishment", 2: "Client"}
type_of_change = type_of_change_map[row[1]]
new_row = (row[0], type_of_change, row[2], row[3], row[4], client_name_map[origin_entity_id], client_name_map[target_entity_id])
new_cur.execute("INSERT INTO \"AuditLog\" (ChangeHistoryId, TypeOfChange, Time, Data, Command, Origin, Target) VALUES (?, ?, ?, ?, ?, ?, ?)", new_row)
def client_connection_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE "ClientConnectionHistory" (
"ConnectionId" INTEGER NOT NULL,
"Client" TEXT NOT NULL,
"ConnectionTime" TEXT NOT NULL,
"ConnectionType" TEXT NOT NULL,
"Server" TEXT,
CONSTRAINT "PK_ClientConnectionHistory" PRIMARY KEY("ConnectionId" AUTOINCREMENT)
)
""")
existing_cur.execute("""
SELECT
EFClientConnectionHistory.ClientConnectionId,
EFClientConnectionHistory.ClientId,
EFClientConnectionHistory.CreatedDateTime,
EFClientConnectionHistory.ConnectionType,
EFClientConnectionHistory.ServerId
FROM
EFClientConnectionHistory
""")
rows = existing_cur.fetchall()
for row in rows:
client_id = row[1]
server_id = row[4]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFClients
JOIN EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
WHERE
EFClients.ClientId = ?
""", (client_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
existing_cur.execute("""
SELECT
EFServers.HostName
FROM
EFServers
WHERE
EFServers.ServerId = ?
""", (server_id,))
server_hostname = existing_cur.fetchone()
if server_hostname:
server_hostname = server_hostname[0]
else:
server_hostname = 'Unknown'
connection_type_map = {0: "Connect", 1: "Disconnect"}
connection_type = connection_type_map[row[3]]
new_row = (row[0], client_name, row[2], connection_type, server_hostname)
new_cur.execute("INSERT INTO ClientConnectionHistory (ConnectionId, Client, ConnectionTime, ConnectionType, Server) VALUES (?, ?, ?, ?, ?)", new_row)
def messages_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE "ClientMessages" (
"MessageId" INTEGER NOT NULL,
"Client" TEXT NOT NULL,
"Message" TEXT NOT NULL,
"TimeSent" TEXT NOT NULL,
"Server" TEXT,
CONSTRAINT "PK_ClientMessages" PRIMARY KEY("MessageId" AUTOINCREMENT)
)
""")
existing_cur.execute("""
SELECT
EFClientMessages.MessageId,
EFClientMessages.ClientId,
EFClientMessages.Message,
EFClientMessages.TimeSent,
EFClientMessages.ServerId
FROM
EFClientMessages
""")
rows = existing_cur.fetchall()
for row in rows:
client_id = row[1]
server_id = row[4]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFClients
JOIN EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
WHERE
EFClients.ClientId = ?
""", (client_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
existing_cur.execute("""
SELECT
EFServers.HostName
FROM
EFServers
WHERE
EFServers.ServerId = ?
""", (server_id,))
server_hostname = existing_cur.fetchone()
if server_hostname:
server_hostname = server_hostname[0]
else:
server_hostname = 'Unknown'
new_row = (row[0], client_name, row[2], row[3], server_hostname)
new_cur.execute("INSERT INTO ClientMessages (MessageId, Client, Message, TimeSent, Server) VALUES (?, ?, ?, ?, ?)", new_row)
def client_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE "Clients" (
"Connections" INTEGER NOT NULL,
"Name" TEXT NOT NULL,
"FirstConnection" TEXT NOT NULL,
"Game" TEXT NOT NULL,
"LastConnection" TEXT NOT NULL,
"Level" TEXT NOT NULL,
"Masked" INTEGER NOT NULL,
"TotalConnectionTime" INTEGER NOT NULL,
"IP" TEXT
)
""")
existing_cur.execute("""
SELECT
EFClients.Connections,
EFClients.CurrentAliasId,
EFClients.FirstConnection,
EFClients.GameName,
EFClients.LastConnection,
EFClients.Level,
EFClients.Masked,
EFClients.TotalConnectionTime,
EFAlias.SearchableIPAddress
FROM
EFClients
JOIN
EFAlias ON EFClients.CurrentAliasId = EFAlias.AliasId
""")
rows = existing_cur.fetchall()
for row in rows:
connections = row[0]
current_alias_id = row[1]
first_connection = row[2]
game_name = row[3]
last_connection = row[4]
level = row[5]
masked = row[6]
total_connection_time = row[7]
ip_address = row[8]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
WHERE
EFAlias.AliasId = ?
""", (current_alias_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
level_map = {-1: "Banned", 0: "User", 1: "Trusted", 2: "Moderator", 3: "Administrator", 4: "Senior Administrator", 5: "Owner", 6: "Creator", 7: "Console"}
level = level_map.get(level, f"Unknown Level ({level})")
game_map = {5: "WaW", 6: "BO", 7: "BO2", 3: "MW3"}
game = game_map.get(game_name, f"Unknown Game ({game_name})")
new_row = (connections, client_name, first_connection, game, last_connection, level, masked, total_connection_time, ip_address)
new_cur.execute("INSERT INTO Clients (Connections, Name, FirstConnection, Game, LastConnection, Level, Masked, TotalConnectionTime, IP) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", new_row)
def maps_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE IF NOT EXISTS "Maps" (
"MapId" INTEGER NOT NULL,
"CreatedDateTime" TEXT NOT NULL,
"Name" TEXT NOT NULL,
"Game" TEXT NOT NULL,
CONSTRAINT "PK_Maps" PRIMARY KEY("MapId" AUTOINCREMENT)
)
""")
existing_cur.execute("""
SELECT
MapId, CreatedDateTime, Name, Game
FROM
EFMaps
""")
rows = existing_cur.fetchall()
modified_rows = []
for row in rows:
game_map = {5: "WaW", 6: "BO", 7: "BO2", 3: "MW3"}
game = game_map.get(row[3], f"Unknown Game ({row[3]})")
modified_rows.append((row[0], row[1], row[2], game))
new_cur.executemany("""
INSERT INTO "Maps" (
MapId, CreatedDateTime, Name, Game
) VALUES (?, ?, ?, ?)
""", modified_rows)
def meta_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE "Metadata" (
"MetaId" INTEGER NOT NULL,
"Name" TEXT NOT NULL,
"Timestamp" TEXT NOT NULL,
"Note" TEXT NOT NULL,
"Value" TEXT NOT NULL
)
""")
existing_cur.execute("""
SELECT
EFMeta.MetaId,
EFMeta.ClientId,
EFMeta.Created,
EFMeta.Key,
EFMeta.Value
FROM
EFMeta
""")
rows = existing_cur.fetchall()
for row in rows:
meta_id = row[0]
client_id = row[1]
created = row[2]
key = row[3]
value = row[4]
existing_cur.execute("""
SELECT
EFClients.CurrentAliasId
FROM
EFClients
WHERE
EFClients.ClientId = ?
""", (client_id,))
current_alias_id = existing_cur.fetchone()
if current_alias_id:
current_alias_id = current_alias_id[0]
else:
current_alias_id = None
if current_alias_id:
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
WHERE
EFAlias.AliasId = ?
""", (current_alias_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
else:
client_name = 'Unknown'
new_row = (meta_id, client_name, created, key, value)
new_cur.execute("INSERT INTO Metadata (MetaId, Name, Timestamp, Note, Value) VALUES (?, ?, ?, ?, ?)", new_row)
def penalties_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE "Penalties" (
"PenaltyId" INTEGER NOT NULL,
"AutomatedOffense" TEXT NOT NULL,
"Expires" INTEGER, -- This line is modified to allow NULL values
"EvadedOffense" TEXT NOT NULL,
"Offender" TEXT NOT NULL,
"Offense" TEXT NOT NULL,
"Punisher" TEXT NOT NULL,
"Type" TEXT NOT NULL,
"Timestamp" INTEGER NOT NULL
)
""")
existing_cur.execute("""
SELECT
EFPenalties.PenaltyId,
EFPenalties.AutomatedOffense,
EFPenalties.Expires,
EFPenalties.IsEvadedOffense,
EFPenalties.OffenderId,
EFPenalties.Offense,
EFPenalties.PunisherId,
EFPenalties.Type,
EFPenalties."When"
FROM
EFPenalties
""")
rows = existing_cur.fetchall()
for row in rows:
penalty_id = row[0]
automated_offense = row[1]
expires = row[2]
evaded_offense = row[3]
offender_id = row[4]
offense = row[5]
punisher_id = row[6]
penalty_type = row[7]
timestamp = row[8]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
INNER JOIN
EFClients ON EFAlias.AliasId = EFClients.CurrentAliasId
WHERE
EFClients.ClientId = ?
""", (offender_id,))
offender_name = existing_cur.fetchone()
if offender_name:
offender_name = offender_name[0].replace('^7', '')
else:
offender_name = 'Unknown'
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
INNER JOIN
EFClients ON EFAlias.AliasId = EFClients.CurrentAliasId
WHERE
EFClients.ClientId = ?
""", (punisher_id,))
punisher_name = existing_cur.fetchone()
if punisher_name:
punisher_name = punisher_name[0].replace('^7', '')
else:
punisher_name = 'Unknown'
type_map = {0: "Report", 1: "Warning", 2: "Flag", 3: "Kick", 4: "Temp Ban", 5: "Perm Ban", 6: "Unban", 8: "Unflag"}
penalty_type = type_map.get(penalty_type, f"Unknown Type ({penalty_type})")
automated_offense_patterns = [r"VPNs are not allowed",
r"Ping is too high!",
r"name is not allowed"]
for pattern in automated_offense_patterns:
if re.search(pattern, offense):
automated_offense = "Yes"
break
else:
automated_offense = "No"
evaded_offense = "Yes" if evaded_offense == 1 else "No"
expires = "Never" if expires is None else expires
new_cur.execute("""
INSERT INTO Penalties (
PenaltyId,
AutomatedOffense,
Expires,
EvadedOffense,
Offender,
Offense,
Punisher,
Type,
Timestamp
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (penalty_id, automated_offense, expires, evaded_offense, offender_name, offense, punisher_name, penalty_type, timestamp))
def penalty_Identifiers_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE "PenaltyIdentifiers" (
"PenaltyIdentifierId" INTEGER NOT NULL,
"PenaltyId" INTEGER NOT NULL,
"Created" TEXT NOT NULL,
"Client" TEXT NOT NULL
)
""")
existing_cur.execute("""
SELECT
EFPenaltyIdentifiers.PenaltyIdentifierId,
EFPenaltyIdentifiers.PenaltyId,
EFPenaltyIdentifiers.CreatedDateTime,
EFPenaltyIdentifiers.NetworkId
FROM
EFPenaltyIdentifiers
""")
rows = existing_cur.fetchall()
for row in rows:
penalty_identifier_id = row[0]
penalty_id = row[1]
created = row[2]
network_id = row[3]
existing_cur.execute("""
SELECT
EFAlias.Name
FROM
EFAlias
INNER JOIN
EFClients ON EFAlias.AliasId = EFClients.CurrentAliasId
WHERE
EFClients.NetworkId = ?
""", (network_id,))
client_name = existing_cur.fetchone()
if client_name:
client_name = client_name[0].replace('^7', '')
else:
client_name = 'Unknown'
new_cur.execute("""
INSERT INTO PenaltyIdentifiers (
PenaltyIdentifierId,
PenaltyId,
Created,
Client
)
VALUES (?, ?, ?, ?)
""", (penalty_identifier_id, penalty_id, created, client_name))
def servers_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE "Servers" (
"ServerId" INTEGER NOT NULL,
"Active" INTEGER NOT NULL,
"Port" INTEGER NOT NULL,
"Endpoint" TEXT NOT NULL,
"Game" TEXT NOT NULL,
"ServerName" TEXT NOT NULL,
"Password" TEXT NOT NULL
)
""")
existing_cur.execute("""
SELECT ServerId, Active, Port, Endpoint, GameName, HostName, IsPasswordProtected
FROM EFServers
""")
rows = existing_cur.fetchall()
game_mapping = {5: "WaW", 7: "BO2", 6: "BO", 3: "MW3"}
for row in rows:
server_id = row[0]
active = row[1]
port = row[2]
endpoint = row[3]
game_name = row[4]
server_name = row[5]
is_password_protected = row[6]
game_name = game_mapping.get(game_name, game_name)
password = "Yes" if is_password_protected == 1 else "No"
new_cur.execute("""
INSERT INTO Servers (
ServerId,
Active,
Port,
Endpoint,
Game,
ServerName,
Password
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (server_id, active, port, endpoint, game_name, server_name, password))
def inbox_messages_table(existing_cur, new_cur):
new_cur.execute("""
CREATE TABLE InboxMessagesModified (
InboxMessageId INTEGER PRIMARY KEY,
Created TEXT,
Origin TEXT,
Target TEXT,
ServerId INTEGER,
Message TEXT,
Read TEXT
)
""")
existing_cur.execute("SELECT * FROM InboxMessages")
inbox_messages = existing_cur.fetchall()
for msg in inbox_messages:
msg_id, created, _, source_client_id, dest_client_id, server_id, message, is_delivered = msg
for client_id in (source_client_id, dest_client_id):
existing_cur.execute("SELECT CurrentAliasId FROM EFClients WHERE ClientId = ?", (client_id,))
alias_id = existing_cur.fetchone()[0]
existing_cur.execute("SELECT Name FROM EFAlias WHERE AliasId = ?", (alias_id,))
name = existing_cur.fetchone()[0].replace('^7', '')
if client_id == source_client_id:
origin = name
else:
target = name
read = "Yes" if is_delivered == 1 else "No"
new_cur.execute("""
INSERT INTO InboxMessagesModified (InboxMessageId, Created, Origin, Target, ServerId, Message, Read)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (msg_id, created, origin, target, server_id, message, read))
if __name__ == '__main__':
main()