Select Git revision
db_repository.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
db_repository.py 33.94 KiB
import warnings
from time import sleep
import mysql.connector
from git import Commit
from mysql.connector import errorcode
from Data.Database.constants import MYSQL_CONFIG
from Data.Utils.utils import get_config_codes_list
class DBRepository:
"""Class that abstracts DB operations"""
initial_commits = [
# Linux
"1da177e4c3f41524e886b7f1b8a0c1fc7321cac2",
# Chromium
"09911bf300f1a419907a9412154760efd0b7abc3",
# Chromium
"a814a8d55429605fe6d7045045cd25b6bf624580",
]
def __init__(self):
self._open()
def save_cve(self, cve: dict | str, config_code: str):
"""
Saves a CVE to the database. If the CVE is a string, only the ID is saved.
:param cve: The CVE to save
:param config_code: config code where the CVE was found
"""
if cve is None:
return
if isinstance(cve, str):
self._save_cve_id(cve, config_code)
else:
self._save_cve_dict(cve, config_code)
def _save_cve_dict(self, cve: dict, config_code: str):
"""
Saves a CVE to the database.
:param cve: The CVE to save (use the CVE dict from cve-search)
:param config_code: config code where the CVE was found
"""
if cve is None:
return
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
insert = (
"INSERT IGNORE INTO `cve` (`cve_id`, `cve_date_created`, `cve_date_last_modified`, `cve_cwe_id`, "
"`cve_cvss_score`, `cve_cvss_time`, `cve_cvss_vector`, `cve_summary`) "
"VALUES (%(cve_id)s, %(cve_created)s, %(cve_modified)s, %(cwe_id)s, %(cvss_score)s, %(cvss_time)s, "
"%(cvss_vector)s, %(cve_summary)s)"
)
insert_data = {
"cve_id": cve["id"],
"cve_created": cve.get("Published"),
"cve_modified": cve.get("Modified"),
"cvss_score": cve.get("cvss"),
"cvss_time": cve.get("cvss-time"),
"cvss_vector": cve.get("cvss-vector"),
"cve_summary": cve.get("summary"),
"cwe_id": cve.get("cwe"),
}
cursor.execute(insert, insert_data)
self._save_references(cursor, cve)
self._save_vul_product(cursor, cve)
self._save_config_code(cursor, cve["id"], config_code)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"{cve['id']} could not be saved - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def _save_cve_id(self, cve_id: str, config_code: str):
"""
Saves a CVE id to the database
:param cve_id: The CVE id to save
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
insert = """INSERT IGNORE INTO `cve` (`cve_id`) VALUES (%(cve_id)s)"""
cursor.execute(insert, {"cve_id": cve_id})
self._save_config_code(cursor, cve_id, config_code)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"{cve_id} could not be saved - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def save_commit(self, commit: Commit | str | None, config_code: str):
"""
Saves a commit to the database. If the commit is a string, only the sha is saved.
:param commit: The commit to save
:param config_code: config code where the commit was found
"""
if commit is None:
return
if isinstance(commit, str):
self._save_com_sha(commit, config_code)
else:
self._save_commit(commit, config_code)
def _save_commit(self, commit: Commit, config_code: str):
"""
Saves a commit to the database
:param commit: The commit to save
:param config_code: config code where the commit was found
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
insert = """INSERT IGNORE INTO `commit` (`com_sha`, `com_date_committed`, `com_author`, `com_message`, `com_config_code`)
VALUES (%(com_sha)s, %(com_date_committed)s, %(com_author)s, %(com_message)s, %(com_config_code)s)"""
cursor.execute(
insert,
{
"com_sha": commit.hexsha,
"com_date_committed": commit.committed_datetime,
"com_author": commit.author.name,
"com_message": commit.message,
"com_config_code": config_code,
},
)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"{commit.hexsha} could not be saved - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def _save_com_sha(self, com_sha: str, config_code: str):
"""
Saves a commit sha to the database. This is used if the commit is not known, but the sha is.
:param com_sha: The commit sha to save
:param config_code: config code where the commit sha belongs to
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
insert = """INSERT IGNORE INTO `commit` (`com_sha`, `com_config_code`) VALUES (%(com_sha)s, %(com_config_code)s)"""
cursor.execute(insert, {"com_sha": com_sha, "com_config_code": config_code})
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"{com_sha} could not be saved - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def get_all_vccs(self):
"""Returns all VCCs from the database"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
query = "SELECT distinct vcc_sha FROM `link_fixing_commit_vcc` where vcc_sha is not null;"
cursor.execute(query)
results = cursor.fetchall()
return [r[0] for r in results]
except mysql.connector.Error as err:
warnings.warn(f"Could not get all VCCs - {str(err)}")
finally:
self.close()
def get_all_mappings(self):
"""Returns all mappings from the database"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
try:
# Exclude non-processed ones
query = "SELECT * FROM `link_fixing_commit_vcc` WHERE fixing_config_code NOT LIKE '%/%' ORDER BY `cve_id`"
cursor.execute(query)
return cursor.fetchall()
except mysql.connector.Error as err:
warnings.warn(f"Could not get all mappings - {str(err)}")
finally:
self.close()
def get_mappings_with_dates(self):
"""Returns all mappings from the database"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
try:
query = "SELECT * FROM commit where YEAR(com_date_committed) = 2020;"
cursor.execute(query)
return cursor.fetchall()
except mysql.connector.Error as err:
warnings.warn(f"Could not get all mappings - {str(err)}")
finally:
self.close()
def get_all_commits(self):
"""Returns all mappings from the database"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
try:
query = "SELECT * FROM commit where com_date_committed is not null;"
cursor.execute(query)
return cursor.fetchall()
except mysql.connector.Error as err:
warnings.warn(f"Could not get all mappings - {str(err)}")
finally:
self.close()
def save_syzkaller_info(self, syzkaller_dict: dict):
"""
Saves syzkaller information to the database
:param syzkaller_dict: Includes all information about a syzkaller issue
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
insert_data = {
"syzkaller_id": syzkaller_dict["Id"],
"title": syzkaller_dict.get("Title"),
"repro": syzkaller_dict.get("Repro"),
"cause_bisect": syzkaller_dict.get("Cause bisect"),
"fix_bisect": syzkaller_dict.get("Fix bisect"),
"count": syzkaller_dict.get("Count"),
"last": syzkaller_dict.get("Last"),
"reported": syzkaller_dict.get("Reported"),
"patched": syzkaller_dict.get("Patched"),
"closed": syzkaller_dict.get("Closed"),
"patch": syzkaller_dict.get("Patch"),
"syzkaller_url": syzkaller_dict.get("Link"),
}
# Replace empty strings with None to save them as NULL in the database
for key, value in insert_data.items():
if value is not None and isinstance(value, str) and value.strip() == "":
insert_data[key] = None
try:
insert = (
"INSERT IGNORE INTO `syzkaller` (`syzkaller_id`, `title`, `repro`, `cause_bisect`, `fix_bisect`, "
"`count`, `last`, `reported`, `patched`, `closed`, `patch`, `syzkaller_url`) "
"VALUES (%(syzkaller_id)s, %(title)s, %(repro)s, %(cause_bisect)s, %(fix_bisect)s, "
"%(count)s, %(last)s, %(reported)s, %(patched)s, %(closed)s, %(patch)s, %(syzkaller_url)s)"
)
cursor.execute(insert, insert_data)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"{syzkaller_dict['Id']} could not be saved - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def save_syzkaller_bisect(self, syzkaller_dict: dict):
"""
Saves the bisection information of a syzkaller crash to the database
:param syzkaller_dict: The syzkaller crash dictionary
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
insert_data = {
"syzkaller_id": syzkaller_dict["Id"],
"cause_bisect_commit": syzkaller_dict.get("cause-bisection-commit"),
"fix_bisect_commit": syzkaller_dict.get("fix-bisection-commit"),
}
try:
insert = (
"INSERT INTO `syzkaller_bisect` (`syzkaller_id`, `cause_bisect_commit`, `fix_bisect_commit`) "
"VALUES (%(syzkaller_id)s, %(cause_bisect_commit)s, %(fix_bisect_commit)s)"
)
cursor.execute(insert, insert_data)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"{syzkaller_dict['Id']} could not be saved - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def get_all_oss_fuzz_bugs(self) -> list:
"""
Returns all OSS-Fuzz bugs from the database
:return: List of OSS-Fuzz bugs
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
try:
query = "SELECT * FROM `oss_fuzz`"
cursor.execute(query)
return cursor.fetchall()
except mysql.connector.Error as err:
warnings.warn(f"Could not get all OSS-Fuzz bugs - {str(err)}")
finally:
self.close()
def get_all_oss_fuzz_bugs_with_relevant_projects(
self, search_regressed_range: bool, search_fixed_range: bool
) -> list:
"""Returns all OSS-Fuzz bugs from the database
Only search those bugs that are in the projects mentioned in the config.xml (not including slashes)
:param search_regressed_range: a flag whether to search for the regressed commit range
:param search_fixed_range: a flag whether to search for the fixed commit range
:return: List of OSS-Fuzz bugs
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
config_codes = get_config_codes_list()
try:
query = (
"SELECT * FROM `oss_fuzz` WHERE `project`"
f"IN {str(tuple(config_codes))} "
f"{'AND `fixed_start_commit` IS NOT NULL ' if search_fixed_range else ''}"
f"{'AND `regressed_start_commit` IS NOT NULL' if search_regressed_range else ''}"
)
cursor.execute(query)
return cursor.fetchall()
except mysql.connector.Error as err:
warnings.warn(f"Could not get all OSS-Fuzz bugs - {str(err)}")
finally:
self.close()
def get_all_entries_already_having_oss_fuzz_id(self) -> list[int]:
"""Returns all entries from the database that already have an OSS-Fuzz id"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
query = "SELECT `ossfuzz_id` FROM `link_fixing_commit_vcc` WHERE `ossfuzz_id` IS NOT NULL"
cursor.execute(query)
result = cursor.fetchall()
return [r[0] for r in result]
except mysql.connector.Error as err:
warnings.warn(f"Could not get all entries already having an OSS-Fuzz id - {str(err)}")
finally:
self.close()
def save_oss_fuzz_bug(self, bug_dict: dict):
"""
Saves an OSS-Fuzz bug to the database
:param bug_dict: The OSS-Fuzz bug dictionary
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
insert_data = self._prepare_oss_fuzz_data_for_database(bug_dict)
try:
insert = (
"INSERT INTO `oss_fuzz` (`local_id`, `summary`, `status`, `bug_type`, `project`, `labelRefs`, "
"`regression_range_url`, `fixed_range_url`, `crash_type`, `regressed_start_commit`, "
"`regressed_end_commit`, `fixed_start_commit`, `fixed_end_commit`, `regressed_repo_url`, "
"`fixed_repo_url`, `status_modified`, `last_modified`, `crash_state`) "
"VALUES (%(local_id)s, %(summary)s, %(status)s, %(bug_type)s, %(project)s, %(labelRefs)s, "
"%(regression_range_url)s, %(fixed_range_url)s, %(crash_type)s, %(regressed_start_commit)s, "
"%(regressed_end_commit)s, %(fixed_start_commit)s, %(fixed_end_commit)s, %(regressed_repo_url)s, "
"%(fixed_repo_url)s, %(status_modified)s, %(last_modified)s, %(crash_state)s)"
)
cursor.execute(insert, insert_data)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"{bug_dict['localId']} could not be saved - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def update_oss_fuzz_bug(self, bug_dict: dict):
"""
Updates an OSS-Fuzz bug in the database.
Assuming that the bug exists in the database.
:param bug_dict: The OSS-Fuzz bug dictionary
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
update_data = self._prepare_oss_fuzz_data_for_database(bug_dict)
try:
update = (
"UPDATE `oss_fuzz` SET `summary` = %(summary)s, `status` = %(status)s, `bug_type` = %(bug_type)s, "
"`project` = %(project)s, `labelRefs` = %(labelRefs)s, `regression_range_url` = %(regression_range_url)s, "
"`fixed_range_url` = %(fixed_range_url)s, `crash_type` = %(crash_type)s, "
"`regressed_start_commit` = %(regressed_start_commit)s, `regressed_end_commit` = %(regressed_end_commit)s, "
"`fixed_start_commit` = %(fixed_start_commit)s, `fixed_end_commit` = %(fixed_end_commit)s, "
"`regressed_repo_url` = %(regressed_repo_url)s, `fixed_repo_url` = %(fixed_repo_url)s, "
"`status_modified` = %(status_modified)s, `last_modified` = %(last_modified)s, "
"`crash_state` = %(crash_state)s "
"WHERE `local_id` = %(local_id)s"
)
cursor.execute(update, update_data)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"{bug_dict['localId']} could not be updated - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def save_vcc_fixing_commit(
self,
cve_id: str | None,
fix_sha: str | None,
vcc_sha: str | None,
config_code: str,
mapping_type: str,
syzkaller_id: str | None,
oss_fuzz_id: int | None,
):
"""
Saves the mapping between a VCC and a fixing commit for a CVE to the database
:param syzkaller_id: syzkaller id for which the mapping is saved
:param cve_id: CVE id for which the mapping is saved
:param fix_sha: fixing commit
:param vcc_sha: VCC
:param config_code: config code where the mapping was found
:param mapping_type: the type of the mapping (e.g. "TypeCommonID")
:param oss_fuzz_id: OSS-Fuzz id for which the mapping is saved
.. note::
If the fixing commit and the VCC is not set, the mapping will not be saved
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
if fix_sha is None and vcc_sha is None:
return
# VCC can be None if only the fixing commit is known and vice versa
insert_data = {
"fixing_sha": fix_sha,
"fixing_config_code": config_code,
"vcc_sha": vcc_sha,
"vcc_config_code": config_code,
"cve_id": cve_id,
"mapping_type": mapping_type,
"determined_by_heuristic": False,
"syzkaller_id": syzkaller_id,
"oss_fuzz_id": oss_fuzz_id,
}
if vcc_sha is None or mapping_type == "VulnerabilityHistoryProject_automatic":
# If the VCC is None, the mapping will be determined by using a heuristic
# If the mapping type is "VulnerabilityHistoryProject_automatic,"
# the mapping was determined by a heuristic
insert_data["determined_by_heuristic"] = True
try:
# Insert the mapping if it does not exist yet
insert = (
"INSERT IGNORE INTO `link_fixing_commit_vcc` (`fixing_sha`, `fixing_config_code`, `vcc_sha`, "
"`vcc_config_code`, `cve_id`, `mapping_type`, `determined_by_heuristic`, `syzkaller_id`, `ossfuzz_id`) "
"SELECT %(fixing_sha)s, %(fixing_config_code)s, %(vcc_sha)s, %(vcc_config_code)s, "
"%(cve_id)s, %(mapping_type)s, %(determined_by_heuristic)s, %(syzkaller_id)s, %(oss_fuzz_id)s "
"WHERE NOT EXISTS(SELECT 1 FROM `link_fixing_commit_vcc` "
"WHERE (`cve_id` IS NULL OR `cve_id` = %(cve_id)s) "
"AND (`ossfuzz_id` IS NULL OR `ossfuzz_id` = %(oss_fuzz_id)s) "
"AND `mapping_type` = %(mapping_type)s "
"AND `fixing_sha` = %(fixing_sha)s "
"AND `fixing_config_code` = %(fixing_config_code)s "
"AND `determined_by_heuristic` = %(determined_by_heuristic)s "
"AND (`syzkaller_id` IS NULL OR `syzkaller_id` = %(syzkaller_id)s) "
"AND (`vcc_sha` IS NULL OR `vcc_sha` = %(vcc_sha)s) "
"AND (`vcc_config_code` IS NULL OR `vcc_config_code` = %(vcc_config_code)s));"
)
cursor.execute(insert, insert_data)
self.cnx.commit()
except mysql.connector.Error as err:
if err.errno == errorcode.ER_LOCK_DEADLOCK:
# In case of a deadlock, wait 3 seconds and try again
try:
warnings.warn(f"Deadlock occurred - {str(err)}... Trying again")
sleep(5)
cursor.execute(insert, insert_data)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"Deadlock could not be resolved - {str(err)}... Rolling back")
self.cnx.rollback()
else:
warnings.warn(
f"Mapping between vcc and fixing commit for {cve_id} could not be saved - {str(err)}... Rolling back"
)
self.cnx.rollback()
finally:
self.close()
def update_vcc_fixing_commit(
self,
mapping_id: int,
cve_id: str | None,
vcc_sha: str | None,
config_code: str,
confidence_value: bool,
heuristic_mapping: str | None,
):
"""
Updates the mapping between a VCC and a fixing commit for a CVE to the database
and adds the confidence value of the mapping as described in the paper by Wagner.
:param mapping_id: ID of the mapping.
:param heuristic_mapping: The mapping which is the result of the heuristic
:param cve_id: CVE id for which the mapping is saved
:param vcc_sha: VCC
:param config_code: config code where the mapping was found
:param confidence_value: confidence value of the mapping
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
# Update the mapping if it exists
update = (
"UPDATE `link_fixing_commit_vcc` "
"SET `vcc_sha` = %(vcc_sha)s, `vcc_config_code` = %(vcc_config_code)s, "
"`confidence_value` = %(confidence_value)s, `heuristic_mapping` = %(heuristic_mapping)s "
"WHERE `mapping_id` = %(mapping_id)s;"
)
cursor.execute(
update,
{
"vcc_sha": vcc_sha,
"vcc_config_code": config_code,
"confidence_value": confidence_value,
"heuristic_mapping": heuristic_mapping,
"mapping_id": mapping_id,
},
)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(
f"Mapping between vcc and fixing commit for {cve_id} could not be updated - {str(err)}... Rolling back"
)
self.cnx.rollback()
finally:
self.close()
def update_heuristic_mapping(
self, fixing_sha: str, fixing_config_code: str, heuristic_mapping: str | None, confidence_value: bool
):
"""
Updates the heuristic mapping for a fixing commit.
:param fixing_sha: Fixing commit
:param fixing_config_code: config code where the fixing commit was found
:param heuristic_mapping: Mapping which is the result of the heuristic.
:param confidence_value: Confidence value of the mapping
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
update = (
"UPDATE `link_fixing_commit_vcc` "
"SET `confidence_value` = %(confidence_value)s, `heuristic_mapping` = %(heuristic_mapping)s "
"WHERE `fixing_sha` = %(fixing_sha)s AND `fixing_config_code` = %(fixing_config_code)s;"
)
cursor.execute(
update,
{
"fixing_sha": fixing_sha,
"fixing_config_code": fixing_config_code,
"confidence_value": confidence_value,
"heuristic_mapping": heuristic_mapping,
},
)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(
f"Mapping between vcc and fixing commit for {fixing_sha} could not be updated - {str(err)}... Rolling back"
)
self.cnx.rollback()
finally:
self.close()
def update_vcc(self, fixing_sha: str, fixing_config_code, vcc_sha: str):
"""
Updates the VCC of a fixing commit.
:param fixing_sha: Fixing commit
:param fixing_config_code: config code where the fixing commit was found
:param vcc_sha: VCC
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
try:
update = (
"UPDATE `link_fixing_commit_vcc` "
"SET `vcc_sha` = %(vcc_sha)s "
"WHERE `fixing_sha` = %(fixing_sha)s AND `fixing_config_code` = %(fixing_config_code)s;"
)
cursor.execute(
update,
{
"fixing_sha": fixing_sha,
"fixing_config_code": fixing_config_code,
"vcc_sha": vcc_sha,
},
)
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(
f"Mapping between vcc and fixing commit for {fixing_sha} could not be updated - {str(err)}... Rolling back"
)
self.cnx.rollback()
finally:
self.close()
def get_all_syzkaller_ids(self) -> list:
"""
:return: a list of all syzkaller ids that are saved in the database
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
query = "SELECT `syzkaller_id` FROM `syzkaller`;"
cursor.execute(query)
result = [syzkaller_id[0] for syzkaller_id in cursor.fetchall()]
self.close()
return result
def get_all_syzkaller_ids_with_cve(self) -> list:
"""
Gets a list of cve_ids which are associated but not connected to a syzkaller id yet
:return: A list of all cve_ids, fixing_sha which are connected to a syzkaller id
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
query = (
"SELECT GROUP_CONCAT(DISTINCT `cve_id`) AS `cve_ids`, "
"`fixing_sha`, GROUP_CONCAT(`mapping_type`) AS `mapping_types` "
"FROM `link_fixing_commit_vcc` GROUP BY `fixing_sha` "
"HAVING `mapping_types` LIKE '%Syzkaller%' "
"AND `cve_ids` IS NOT NULL;"
)
cursor.execute(query)
result = cursor.fetchall()
self.close()
return result
def save_cve_id_for_syzkaller_crash(self, cve_id: str, fixing_sha: str):
"""
Saves the cve_id for a syzkaller crash to the database by using the fixing commit of the crash
:param fixing_sha: fixing commit of the crash
:param cve_id: cve_id of the crash
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
try:
query = (
"UPDATE `link_fixing_commit_vcc` SET `cve_id` = %(cve_id)s WHERE "
"`fixing_sha` = %(fixing_sha)s AND `mapping_type` = 'Syzkaller';"
)
cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
cursor.execute(query, {"cve_id": cve_id, "fixing_sha": fixing_sha})
cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(
f"Mapping between vcc and fixing commit for {cve_id} could not be updated - {str(err)}... Rolling back"
)
self.cnx.rollback()
finally:
self.close()
def get_mappings_without_vcc(self) -> list:
"""
:return: a list of commits that do not have a VCC for the given config code
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
query = (
"SELECT DISTINCT `fixing_sha`, `fixing_config_code`, `cve_id` "
"FROM `link_fixing_commit_vcc` WHERE `heuristic_mapping` IS NULL;"
)
cursor.execute(query)
result = cursor.fetchall()
self.close()
return result
def remove_initial_commits(self):
"""
Removes the initial commits from the database.
A significant portion of commits in the data point to the Linux kernel's initial commit.
This initial commit comprises over 6 million changes across more than 17 thousand files,
rendering it unsuitable for use as a practical VCC.
Consequently, remove this commit from the database where it was utilized.
This also counts for the initial commit of the Chromium project that also comprises a lot of changes.
In general removed all initial commits which are defined in the ``initial_commits`` list.
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor()
# Get the initial commit of the Chromium project
initial_commits = tuple(i for i in self.initial_commits)
try:
# We have to disable foreign key checks to be able to update the table
cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
update_vcc = (
"UPDATE `link_fixing_commit_vcc` SET `vcc_sha` = NULL, `determined_by_heuristic` = TRUE WHERE"
f"{'`vcc_sha` = %s OR ' * (len(initial_commits) - 1)} `vcc_sha` = %s;"
)
cursor.execute(
update_vcc,
initial_commits,
)
cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
self.cnx.commit()
except mysql.connector.Error as err:
warnings.warn(f"Initial commit could not be removed - {str(err)}... Rolling back")
self.cnx.rollback()
finally:
self.close()
def get_all_entries_with_heuristic_mapping(self):
"""
Returns all entries from the database that have a heuristic mapping.
However, it only returns the entries that have not a VCC yet.
"""
if not self.cnx.is_connected():
self._open()
cursor = self.cnx.cursor(dictionary=True)
query = (
"SELECT * FROM `link_fixing_commit_vcc` WHERE `vcc_sha` IS NULL "
"AND JSON_LENGTH(`heuristic_mapping`) != 0;"
)
cursor.execute(query)
result = cursor.fetchall()
self.close()
return result
@staticmethod
def _save_references(cursor, cve: dict):
"""
Saves the references of a CVE, such as the URL to the CVE.
Replaces existing references with the new ones.
:param cursor: The cursor to use
:param cve: The CVE to save
"""
delete = """DELETE FROM `cve_references` WHERE `cve_id` = %(cve_id)s"""
cursor.execute(delete, {"cve_id": cve["id"]})
insert = """INSERT IGNORE INTO `cve_references` (`cve_id`, `reference`) VALUES (%s, %s)"""
data = list([(cve["id"], x) for x in cve["references"]]) # Create a list of tuples
cursor.executemany(insert, data)
@staticmethod
def _save_vul_product(cursor, cve):
"""
Saves the vulnerable products of a CVE.
Replaces existing vulnerable products with the new ones.
:param cursor: The cursor to use
:param cve: The CVE to save
"""
delete = """DELETE FROM `cve_product` WHERE `cve_id` = %(cve_id)s"""
cursor.execute(delete, {"cve_id": cve["id"]})
insert = """INSERT IGNORE INTO `cve_product` (`cve_id`, `product`) VALUES (%s, %s)"""
data = list([(cve["id"], x) for x in cve["vulnerable_product"]]) # Create a list of tuples
cursor.executemany(insert, data)
@staticmethod
def _save_config_code(cursor, cve_id: str, config_code: str):
"""
Saves the config code of a CVE if it is not already saved.
:param cursor: The cursor to use
:param cve_id: The CVE id, which identifies the config code
:param config_code: The config code to save; defines where the CVE was found
"""
insert = (
"""INSERT IGNORE INTO `cve_config_code` (`cve_id`, `config_code`) VALUES (%(cve_id)s, %(config_code)s)"""
)
cursor.execute(insert, {"cve_id": cve_id, "config_code": config_code})
@staticmethod
def _prepare_oss_fuzz_data_for_database(bug_dict: dict) -> dict:
"""
Prepares the data from the OSS-Fuzz API for the database
:param bug_dict: The bug dict from the OSS-Fuzz API
:return: The prepared bug dict
"""
return {
"local_id": bug_dict["localId"],
"summary": bug_dict.get("summary"),
"status": bug_dict.get("status"),
"bug_type": bug_dict.get("bugType"),
"project": bug_dict.get("project"),
"labelRefs": bug_dict.get("labelRefs"),
"regression_range_url": bug_dict.get("regression_range_url"),
"fixed_range_url": bug_dict.get("fixed_range_url"),
"crash_type": bug_dict.get("crash_type"),
"regressed_start_commit": bug_dict.get("regressed_start_commit"),
"regressed_end_commit": bug_dict.get("regressed_end_commit"),
"fixed_start_commit": bug_dict.get("fixed_start_commit"),
"fixed_end_commit": bug_dict.get("fixed_end_commit"),
"regressed_repo_url": bug_dict.get("regressed_repo_url"),
"fixed_repo_url": bug_dict.get("fixed_repo_url"),
"status_modified": bug_dict.get("statusModifiedTimestamp"),
"last_modified": bug_dict.get("modifiedTimestamp"),
"crash_state": bug_dict.get("crash_state"),
}
def close(self):
"""Closes the current cursor"""
self.cnx.close()
def _open(self):
"""Opens a new cursor for execution"""
self.cnx = mysql.connector.connect(auth_plugin="mysql_native_password", **MYSQL_CONFIG)