Source code for aigct.analyzer

import pandas as pd
import numpy as np
from scipy import stats
from pandas.api.types import is_string_dtype
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve, auc
from .model import VEAnalysisCalibrationResult, VEQueryCriteria, VEAnalysisResult
from .repository import (
    VariantEffectScoreRepository,
    VariantEffectLabelRepository,
    VariantEffectSourceRepository,
    VARIANT_PK_COLUMNS,
)
from .pd_util import filter_dataframe_by_list
from .query_util import validate_query_criteria

[docs] VARIANT_EFFECT_SCORE_COLS = ["SCORE_SOURCE"] + VARIANT_PK_COLUMNS + ["RANK_SCORE"]
[docs] class VEAnalyzer: def __init__( self, variant_effect_score_repo: VariantEffectScoreRepository, variant_effect_label_repo: VariantEffectLabelRepository, variant_effect_source_repo: VariantEffectSourceRepository, ):
[docs] self._variant_effect_score_repo = variant_effect_score_repo
[docs] self._variant_effect_label_repo = variant_effect_label_repo
[docs] self._variant_effect_source_repo = variant_effect_source_repo
[docs] def validate_user_ve_scores(self, user_ve_scores: pd.DataFrame): if ( user_ve_scores is not None and len(user_ve_scores) > 0 and not is_string_dtype(user_ve_scores["CHROMOSOME"]) ): raise Exception("CHROMOSOME column values must all be string type")
[docs] def get_analysis_scores_and_labels( self, task_code: str, user_ve_scores: pd.DataFrame = None, user_vep_name: str = "USER", column_name_map: dict = None, variant_effect_sources: list[str] = None, include_variant_effect_sources: bool = None, variant_query_criteria: VEQueryCriteria = None, vep_min_overlap_percent: float = 0, variant_vep_retention_percent: float = 0, ) -> pd.DataFrame: if vep_min_overlap_percent is None: vep_min_overlap_percent = 0 if variant_vep_retention_percent is None: variant_vep_retention_percent = 0 if user_ve_scores is not None and len(user_ve_scores) == 0: user_ve_scores = None if variant_effect_sources is not None and len(variant_effect_sources) == 0: variant_effect_sources = None # Get the full universe of variants for query criteria. The universe # is limited to those for which we have labels. variant_universe_pks_df = self._variant_effect_label_repo.get( task_code, variant_query_criteria )[VARIANT_PK_COLUMNS] # if user has specified variant scores then restrict user # variants to those in the universe(i.e. those for which we have # labels) and then reset the universe to the filtered user variant list if user_ve_scores is not None: if column_name_map is not None and len(column_name_map) > 0: user_ve_scores = user_ve_scores.rename(columns=column_name_map) self.validate_user_ve_scores(user_ve_scores) user_ve_scores = user_ve_scores.merge( variant_universe_pks_df, how="inner", on=VARIANT_PK_COLUMNS ) user_ve_scores = user_ve_scores.assign(SCORE_SOURCE=user_vep_name) variant_universe_pks_df = user_ve_scores[VARIANT_PK_COLUMNS] if ( user_ve_scores is not None and not include_variant_effect_sources and variant_effect_sources is None ): # We only do the analysis against the user scores. # We don't use any system veps. analysis_ve_scores_df = user_ve_scores[VARIANT_EFFECT_SCORE_COLS] analysis_labels_df = self._variant_effect_label_repo.get( task_code, VEQueryCriteria(variant_ids=user_ve_scores[VARIANT_PK_COLUMNS]), ) else: # We include system veps in the analysis # get the system vep scores based upon selection critera system_ve_scores_df = self._variant_effect_score_repo.get( task_code, variant_effect_sources, include_variant_effect_sources, variant_query_criteria, ) # First restrict the set of system vep scores to the variants # in the universe. Then compute how many variants there are for # each vep. Then we only keep the vep scores for veps where # the variant count is above the vep_min_overlap_count vep_min_overlap_count = ( len(variant_universe_pks_df) * vep_min_overlap_percent * 0.01 ) system_ve_scores_df = filter_dataframe_by_list( system_ve_scores_df, variant_universe_pks_df, VARIANT_PK_COLUMNS ) retained_veps = [] system_ve_scores_count_by_vep = system_ve_scores_df.groupby( "SCORE_SOURCE" ).size() retained_veps = system_ve_scores_count_by_vep[ system_ve_scores_count_by_vep >= vep_min_overlap_count ].index system_ve_scores_df = filter_dataframe_by_list( system_ve_scores_df, retained_veps, "SCORE_SOURCE" ) # Now for each variant we compute how many veps for which we have # scores. We then retain only those variants where the number of # veps is above variant_vep_retention_count variant_vep_retention_count = ( len(retained_veps) * variant_vep_retention_percent * 0.01 ) system_ve_scores_count_by_var = system_ve_scores_df.groupby( VARIANT_PK_COLUMNS ).size() retained_variants = system_ve_scores_count_by_var[ system_ve_scores_count_by_var >= variant_vep_retention_count ].reset_index() # If user specified scores append them to the system ones. Then # merge in the labels for all of the variants. if user_ve_scores is not None: analysis_ve_scores_df = pd.concat( [ system_ve_scores_df[VARIANT_EFFECT_SCORE_COLS], user_ve_scores[VARIANT_EFFECT_SCORE_COLS], ] ) else: analysis_ve_scores_df = system_ve_scores_df[VARIANT_EFFECT_SCORE_COLS] analysis_ve_scores_df = filter_dataframe_by_list( analysis_ve_scores_df, retained_variants, VARIANT_PK_COLUMNS ) analysis_labels_df = self._variant_effect_label_repo.get( task_code, VEQueryCriteria(variant_ids=retained_variants) ) analysis_ve_scores_labels_df = analysis_ve_scores_df.merge( analysis_labels_df, how="inner", on=VARIANT_PK_COLUMNS ) return analysis_ve_scores_labels_df
[docs] def _compute_pr( self, agg_level: str, grouped_ve_scores_labels, ) -> tuple[pd.DataFrame, pd.DataFrame]: score_sources = [] aucs = [] genes = [] precision_recall_score_sources = [] precision_recall_genes = [] precisions = [] recalls = [] thresholds_list = [] include_gene_metrics = agg_level == "vep_gene" for score_source_gene, scores_labels_df in grouped_ve_scores_labels: score_source = score_source_gene[0] score_sources.append(score_source) if include_gene_metrics: genes.append(score_source_gene[1]) prs, recs, thresholds = precision_recall_curve( scores_labels_df["BINARY_LABEL"], scores_labels_df["RANK_SCORE"], drop_intermediate=False, ) aucs.append(auc(recs, prs)) precision_recall_score_sources.extend([score_source] * len(thresholds)) if include_gene_metrics: precision_recall_genes.extend([score_source_gene[1]] * len(thresholds)) precisions.extend(prs[:-1]) recalls.extend(recs[:-1]) thresholds_list.extend(thresholds) pr_df = pd.DataFrame( {"SCORE_SOURCE": score_sources, "PR_AUC": aucs} ) pr_curve_coords_df = pd.DataFrame( { "SCORE_SOURCE": precision_recall_score_sources, "PRECISION": precisions, "RECALL": recalls, "THRESHOLD": thresholds_list, } ) if include_gene_metrics: pr_df["GENE_SYMBOL"] = genes pr_curve_coords_df["GENE_SYMBOL"] = precision_recall_genes return pr_df, pr_curve_coords_df
[docs] def _compute_roc( self, agg_level: str, grouped_ve_scores_labels, ) -> tuple[pd.DataFrame, pd.DataFrame]: score_sources = [] aucs = [] genes = [] fpr_tpr_score_sources = [] fpr_tpr_genes = [] false_positive_rates = [] true_positive_rates = [] thresholds_list = [] exceps = [] include_gene_metrics = agg_level == "vep_gene" for score_source_gene, scores_labels_df in grouped_ve_scores_labels: try: score_source = score_source_gene[0] score_sources.append(score_source) if include_gene_metrics: genes.append(score_source_gene[1]) if ( len(scores_labels_df) == sum( scores_labels_df["BINARY_LABEL"]) or sum(scores_labels_df["BINARY_LABEL"]) == 0 ): raise Exception( "Cannot compute roc metrics because all " "labels have same value" ) auc = roc_auc_score( scores_labels_df["BINARY_LABEL"], scores_labels_df[ "RANK_SCORE"] ) fpr, tpr, thresholds = roc_curve( scores_labels_df["BINARY_LABEL"], scores_labels_df[ "RANK_SCORE"] ) aucs.append(auc) fpr_tpr_score_sources.extend([score_source] * len(fpr)) if include_gene_metrics: fpr_tpr_genes.extend([score_source_gene[1]] * len(fpr)) false_positive_rates.extend(fpr) true_positive_rates.extend(tpr) thresholds_list.extend(thresholds) exceps.append(np.nan) except Exception as e: aucs.append(np.nan) exceps.append(str(e)) roc_df = pd.DataFrame( { "SCORE_SOURCE": score_sources, "ROC_AUC": aucs, "EXCEPTION": exceps, } ) roc_curve_coords_df = pd.DataFrame( { "SCORE_SOURCE": fpr_tpr_score_sources, "FALSE_POSITIVE_RATE": false_positive_rates, "TRUE_POSITIVE_RATE": true_positive_rates, "THRESHOLD": thresholds_list, } ) if include_gene_metrics: roc_df["GENE_SYMBOL"] = genes roc_curve_coords_df["GENE_SYMBOL"] = fpr_tpr_genes return roc_df, roc_curve_coords_df
@staticmethod
[docs] def _compute_general_metrics(group) -> pd.Series: return pd.Series( { "NUM_VARIANTS": len(group), "NUM_POSITIVE_LABELS": group["BINARY_LABEL"].sum(), "NUM_NEGATIVE_LABELS": (group["BINARY_LABEL"] ^ 1).sum(), } )
@staticmethod
[docs] def _compute_num_unique_variants(group) -> pd.Series: return pd.Series( { "NUM_UNIQUE_VARIANTS": group[VARIANT_PK_COLUMNS]. drop_duplicates().shape[0] })
[docs] def _add_info_to_metric_dataframes(self, *dfs): # -> list(pd.DataFrame): return_dfs = [] for df in dfs: if df is not None: df = df.merge( self._variant_effect_source_repo.get_all()[["CODE", "NAME"]], how="left", left_on="SCORE_SOURCE", right_on="CODE", ) df.loc[df["NAME"].isna(), "NAME"] = df.loc[ df["NAME"].isna(), "SCORE_SOURCE" ] df.rename(columns={"NAME": "SOURCE_NAME"}, inplace=True) df.drop(columns="CODE", inplace=True) return_dfs.append(df) return return_dfs
[docs] def _compute_mwu( self, agg_level: str, grouped_ve_scores_labels, ) -> pd.DataFrame: score_sources = [] neg_log10_mwu_pvals = [] genes = [] exceps = [] include_gene_metrics = agg_level == "vep_gene" for score_source_gene, scores_labels_df in grouped_ve_scores_labels: try: score_source = score_source_gene[0] score_sources.append(score_source) if include_gene_metrics: genes.append(score_source_gene[1]) positive_scores = np.array( scores_labels_df.query("BINARY_LABEL == 1")["RANK_SCORE"] ) negative_scores = np.array( scores_labels_df.query("BINARY_LABEL == 0")["RANK_SCORE"] ) if 0 in [len(positive_scores), len(negative_scores)]: raise Exception( "Cannot compute mann-whitney u values " + "because all labels have same value" ) neg_log10_mwu_pval = -np.log10( stats.mannwhitneyu(negative_scores, positive_scores).pvalue ) neg_log10_mwu_pvals.append(neg_log10_mwu_pval) exceps.append(np.nan) except Exception as e: neg_log10_mwu_pvals.append(np.nan) exceps.append(str(e)) mwu_df = pd.DataFrame( { "SCORE_SOURCE": score_sources, "NEG_LOG10_MWU_PVAL": neg_log10_mwu_pvals, "EXCEPTION": exceps, } ) if include_gene_metrics: mwu_df["GENE_SYMBOL"] = genes return mwu_df
[docs] def _remove_vep_genes_with_invalid_label_counts( self, ve_scores_labels_df: pd.DataFrame ) -> pd.DataFrame: """ Group by VEP and gene. Then filter out groups where all labels have the same value. For ROC, PR curves, and MWU tests, we need both positive and negative labels. This method removes combination where all labels are the same. Parameters ---------- ve_scores_labels_df : pd.DataFrame DataFrame containing scores and labels Returns ------- pd.DataFrame Filtered DataFrame with only valid VEP-gene combinations """ # Group by SCORE_SOURCE and GENE_SYMBOL grouped = ve_scores_labels_df.groupby(["SCORE_SOURCE", "GENE_SYMBOL"]) # Filter to keep only groups where both 0 and 1 labels exist valid_scores_labels_df = grouped.filter( lambda group: ( (group["BINARY_LABEL"].sum() > 0) and # Has at least one positive label (group["BINARY_LABEL"].sum() < len(group)) # Has at least one negative label ) ) return valid_scores_labels_df
[docs] def _compute_metrics( self, task_code: str, ve_scores_labels_df: pd.DataFrame, compute_gene_metrics: bool, metrics: list[str], list_variants: bool = False, ): grouped_ve_scores_labels = ve_scores_labels_df.groupby( ["SCORE_SOURCE"]) general_metrics_df = grouped_ve_scores_labels.apply( self._compute_general_metrics, include_groups=False ).reset_index() gene_general_metrics_df = None if compute_gene_metrics: grouped_ve_gene_scores_labels = ve_scores_labels_df.groupby( ["SCORE_SOURCE", "GENE_SYMBOL"]) gene_general_metrics_df = grouped_ve_gene_scores_labels.apply( self._compute_general_metrics, include_groups=False ).reset_index() roc_df = None roc_curve_coords_df = None pr_df = None pr_curve_coords_df = None mwu_df = None gene_roc_df = None gene_roc_curve_coords_df = None gene_pr_df = None gene_pr_curve_coords_df = None gene_mwu_df = None if "roc" in metrics: roc_df, roc_curve_coords_df = self._compute_roc( "vep", grouped_ve_scores_labels) if compute_gene_metrics: gene_roc_df, gene_roc_curve_coords_df = self._compute_roc( "vep_gene", grouped_ve_gene_scores_labels) if "pr" in metrics: pr_df, pr_curve_coords_df = self._compute_pr( "vep", grouped_ve_scores_labels) if compute_gene_metrics: gene_pr_df, gene_pr_curve_coords_df = self._compute_pr( "vep_gene", grouped_ve_gene_scores_labels) if "mwu" in metrics: mwu_df = self._compute_mwu("vep", grouped_ve_scores_labels) if compute_gene_metrics: gene_mwu_df = self._compute_mwu( "vep_gene", grouped_ve_gene_scores_labels) if list_variants: included_variants_df = ve_scores_labels_df[ ["SCORE_SOURCE"] + VARIANT_PK_COLUMNS ] else: included_variants_df = None metric_dataframes = self._add_info_to_metric_dataframes( general_metrics_df, roc_df, pr_df, mwu_df, gene_general_metrics_df, gene_roc_df, gene_pr_df, gene_mwu_df ) metric_dataframes.extend( [roc_curve_coords_df, pr_curve_coords_df, gene_roc_curve_coords_df, gene_pr_curve_coords_df, included_variants_df] ) return metric_dataframes
[docs] def _get_gene_unique_variant_counts( self, scores_and_labels_df: pd.DataFrame, num_top_genes: int ) -> pd.DataFrame: gene_unique_variant_counts_df = scores_and_labels_df.groupby( ["GENE_SYMBOL"]).apply( self._compute_num_unique_variants, include_groups=False ).reset_index() gene_unique_variant_counts_df = gene_unique_variant_counts_df.sort_values( by="NUM_UNIQUE_VARIANTS", ascending=False) if num_top_genes is not None: gene_unique_variant_counts_df = gene_unique_variant_counts_df.iloc[ :num_top_genes] return gene_unique_variant_counts_df
[docs] def _filter_scores_and_labels_by_genes( self, scores_and_labels_df: pd.DataFrame, gene_unique_variant_counts_df: pd.DataFrame ) -> pd.DataFrame: scores_and_labels_df = scores_and_labels_df.merge( gene_unique_variant_counts_df, how="inner", on="GENE_SYMBOL" ) return scores_and_labels_df
[docs] def compute_metrics( self, task_code: str, user_ve_scores: pd.DataFrame = None, user_vep_name: str = "USER", compute_gene_metrics: bool = False, num_top_genes: int = None, column_name_map: dict = None, variant_effect_sources: list[str] = None, include_variant_effect_sources: bool = True, variant_query_criteria: VEQueryCriteria = None, vep_min_overlap_percent: float = 0, variant_vep_retention_percent: float = 0, metrics: str | list[str] = ["roc", "pr", "mwu"], list_variants: bool = False, ) -> VEAnalysisResult: """ Generates performance metrics for an optional user supplied set of vep scores and for system supplied vep's. If the user doesn't provide vep scores, will only generate metrics for system veps. Returns an object containing all the metrics which can then be used to generate plots, reports, or csv data files. Parameters ---------- task_code : str Task code user_ve_scores : DataFrame, optional An optional dataframe of user variant effect prediction scores. Expected to have the following columns: GENOME_ASSEMBLY, CHROMOSOME, POSITION, REFERENCE_NUCLEOTIDE, ALTERNATE_NUCLEOTIDE, RANK_SCORE. The GENOME_ASSEMBLY must be hg38 in current release. RANK_SCORE is a numeric prediction score. It does not have to be standardized or normalized. user_vep_name : str, optional If user_ve_scores are provided, then this is the label to be used for them in the analysis output. compute_gene_metrics: bool Whether to compute vep/gene level metrics along with vep level metrics. It will increase the runtime of the analysis. vep/gene combinations where all the variants have the same label are removed from the analysis. vep/gene level metrics include the number of unique variants in each gene, the number of positive and negative labels in each gene, and the ROC AUC, Precision/ Recall AUC, and Mann-Whitney U p-value for each gene. The genes are ranked by the number of unique variants in the analysis in the gene. num_top_genes: int If compute_gene_metrics is True and this parameter is specified, only consider the top N genes by number of unique variants that satisfy the selection criteria indicated by the other parameters. column_name_map : dict, optional If the column names in user_ve_scores are not the expected names, then this maps the column names to the expected names. variant_effect_sources : list, optional If specified it would restrict the analysis to the system supplied vep's in this list. include_variant_effect_sources : bool, optional If variant_effect_sources is specified, indicates whether to limit the analysis to the system supplied vep's specified or to exclude the system supplied vep's specified. If variant_effect_sources is not specified a value of False indicates that all system variant effect sources should be excluded from the analysis. variant_query_criteria : VEQueryCriteria, optional See description of VEQueryCriteria in model package. Specifies criteria that would limit the set of variants to be included in the analysis. vep_min_overlap_percent : float In order for a system supplied vep to be included in the analysis the set of variants for which it has scores must overlap the variants in user_ve_scores by at least this percentage amount. If the user_ve_scores is not specified, then it must overlap the entire universe of variants in the system for which we have labels by at least this percentage amount. A value of 0 means that it can overlap by any amount to be included. variant_vep_retention_percent : float In order for a variant to be included in the analysis there must exist scores for the variant in at least this percent of the system vep's included in the analysis based on the value of vep_min_overlap_percent. For example, if a value of 50 is specified and 10 system vep's qualified for inclusion, then a variant must be in at least 5 veps to be included in the analysis. metrics : str or list[str] Specifies which metrics to compute. Can be a string indicating a single metric or a list of strings for multiple metrics. The metrics are: roc, pr, mwu. list_variants: bool Include the list of variants that were included in the analysis in the return result object. There is a separate list for the user variants as well as for each system vep. Returns ------- VEAnalysisResult Object containing computed metrics """ validate_query_criteria(variant_query_criteria) scores_and_labels_df = self.get_analysis_scores_and_labels( task_code, user_ve_scores, user_vep_name, column_name_map, variant_effect_sources, include_variant_effect_sources, variant_query_criteria, vep_min_overlap_percent, variant_vep_retention_percent, ) if not compute_gene_metrics: gene_unique_variant_counts_df = None else: scores_and_labels_df = \ self._remove_vep_genes_with_invalid_label_counts( scores_and_labels_df ) gene_unique_variant_counts_df = \ self._get_gene_unique_variant_counts( scores_and_labels_df, num_top_genes) if num_top_genes is not None: scores_and_labels_df = self._filter_scores_and_labels_by_genes( scores_and_labels_df, gene_unique_variant_counts_df) if type(metrics) is str: metrics = [metrics] ( general_metrics_df, roc_df, pr_df, mwu_df, gene_general_metrics_df, gene_roc_df, gene_pr_df, gene_mwu_df, roc_curve_coords_df, pr_curve_coords_df, gene_roc_curve_coords_df, gene_pr_curve_coords_df, included_variants_df, ) = self._compute_metrics( task_code, scores_and_labels_df, compute_gene_metrics, metrics, list_variants ) num_variants = len( scores_and_labels_df[VARIANT_PK_COLUMNS].drop_duplicates()) num_user_variants = None if user_ve_scores is None else len( user_ve_scores) return VEAnalysisResult( num_variants, num_user_variants, user_vep_name, general_metrics_df, roc_df, pr_df, mwu_df, gene_general_metrics_df, gene_roc_df, gene_pr_df, gene_mwu_df, roc_curve_coords_df, pr_curve_coords_df, gene_roc_curve_coords_df, gene_pr_curve_coords_df, included_variants_df, gene_unique_variant_counts_df )
[docs] def _compute_pr_calibration_curves( self, task_code: str, scores_and_labels_df: pd.DataFrame, ): """ Obsolute method. To be removed in future releases. """ positive_ve_scores_and_labels_df = scores_and_labels_df.query( "BINARY_LABEL == 1") grouped_ve_scores_labels = positive_ve_scores_and_labels_df.groupby( ["SCORE_SOURCE"]) pr_df, positive_pr_curve_coords_df = self._compute_pr( "vep", grouped_ve_scores_labels) negative_ve_scores_and_labels_df = scores_and_labels_df.query( "BINARY_LABEL == 0") grouped_ve_scores_labels = negative_ve_scores_and_labels_df.groupby( ["SCORE_SOURCE"]) pr_df, negative_pr_curve_coords_df = self._compute_pr( "vep", grouped_ve_scores_labels) return positive_pr_curve_coords_df, negative_pr_curve_coords_df
[docs] def _compute_metrics_vs_threshold( self, scores_and_labels_df: pd.DataFrame, ): grouped_ve_scores_labels = scores_and_labels_df.groupby( ["SCORE_SOURCE"]) pr_df, pr_curve_coords_df = self._compute_pr( "vep", grouped_ve_scores_labels) roc_df, roc_curve_coords_df = self._compute_roc( "vep", grouped_ve_scores_labels) f1_curve_coords_df = pd.DataFrame() f1_curve_coords_df["F1_SCORE"] = \ 2 * ((pr_curve_coords_df["PRECISION"] * pr_curve_coords_df["RECALL"]) / (pr_curve_coords_df["PRECISION"] + pr_curve_coords_df["RECALL"] + 1e-8)) f1_curve_coords_df["THRESHOLD"] = pr_curve_coords_df["THRESHOLD"] # f1_scores = 2 * (precision * recall) / (precision + recall + 1e-8) return pr_curve_coords_df, roc_curve_coords_df, f1_curve_coords_df
[docs] def compute_calibration_metrics( self, task_code: str, user_ve_scores: pd.DataFrame = None, user_vep_name: str = "USER", column_name_map: dict = None, variant_effect_source: str = None, pathogenic_fraction_bins: int = 30, variant_query_criteria: VEQueryCriteria = None, ) -> VEAnalysisCalibrationResult: """ Generates calibration related metrics for either a user supplied set of vep scores or for a system supplied vep. These metrics are used to determine how accurate a vep predicts pathogenic variants. Returns an object containing all the metrics which can then be used to generate plots, reports, or csv data files by calling the VEPlotter.plot_calibration_curves method. Parameters ---------- task_code : str Task code user_ve_scores : DataFrame, optional An optional dataframe of user variant effect prediction scores. Expected to have the following columns: GENOME_ASSEMBLY, CHROMOSOME, POSITION, REFERENCE_NUCLEOTIDE, ALTERNATE_NUCLEOTIDE, RANK_SCORE. The GENOME_ASSEMBLY must be hg38 in current release. RANK_SCORE is a numeric prediction score. It does not have to be standardized or normalized. If specified the analysis would be performed on these scores instead of a system supplied vep. user_vep_name : str, optional If user_ve_scores are provided, then this is the label to be used for them in the analysis output. column_name_map : dict, optional If the column names in user_ve_scores are not the expected names, then this maps the column names to the expected names. variant_effect_source : str, optional You specify either user_ve_scores or variant_effect_source. If specified it would perform the analysis on this system supplied vep. pathogenic_fraction_bins : int, optional The number of bins to use when grouping the scores into bins. It will split the variants into equal sized bins based on the score and then compute the mean score and pathogenic fraction for each bin. variant_query_criteria : VEQueryCriteria, optional See description of VEQueryCriteria in model package. Specifies criteria that would limit the set of variants to be included in the analysis. Returns ------- VEAnalysisCalibrationResult Object containing computed metrics """ if user_ve_scores is not None and len(user_ve_scores) > 0: if variant_effect_source is not None: raise Exception( "Cannot provide a variant_effect_source when user scores" "are provided" ) elif variant_effect_source is None: raise Exception( "Must provide a variant_effect_source when user scores" "are not provided" ) vep_name = variant_effect_source if variant_effect_source is not None \ else user_vep_name validate_query_criteria(variant_query_criteria) scores_and_labels_df = self.get_analysis_scores_and_labels( task_code, user_ve_scores, user_vep_name, column_name_map, variant_effect_sources=[variant_effect_source] if variant_effect_source is not None else None, include_variant_effect_sources=True if variant_effect_source is not None else None, variant_query_criteria=variant_query_criteria, ) binned_scores_df = \ self._group_scores_into_bins(scores_and_labels_df, pathogenic_fraction_bins) """ positive_pr_curve_coords_df, negative_pr_curve_coords_df = \ self._compute_pr_calibration_curves(task_code, scores_and_labels_df) """ pr_curve_coords_df, roc_curve_coords_df, \ f1_curve_coords_df = self._compute_metrics_vs_threshold( scores_and_labels_df) return VEAnalysisCalibrationResult( len(scores_and_labels_df), vep_name, pr_curve_coords_df, f1_curve_coords_df, binned_scores_df, scores_and_labels_df[VARIANT_PK_COLUMNS + ["BINARY_LABEL", "RANK_SCORE"]] )
[docs] def _group_scores_into_bins( self, scores_and_labels_df: pd.DataFrame, num_bins: int = 10 ) -> pd.DataFrame: """ Groups the scores and labels into equal width bins based on the score. Parameters ---------- scores_and_labels_df : pd.DataFrame DataFrame containing scores and labels num_bins : int, default 10 Number of bins to divide the score range into Returns ------- pd.DataFrame DataFrame with bins and pathogenic fractions """ # Create a copy to avoid modifying the original df = scores_and_labels_df.copy() # Create bins using quantile-based discretization df['SCORE_BIN'], bin_edges = pd.cut( df['RANK_SCORE'], num_bins, retbins=True, labels=False, duplicates='drop' ) bin_boundaries = [(bin_edges[i], bin_edges[i+1]) for i in range(len(bin_edges)-1)] # Create bin labels from bin edges bin_labels = [f"{boundary[0]:.2f}-{boundary[1]:.2f}" for boundary in bin_boundaries] # Compute statistics for each bin bin_stats = df.groupby('SCORE_BIN').agg( NUM_VARIANTS=('BINARY_LABEL', 'count'), NUM_POSITIVE_LABELS=('BINARY_LABEL', 'sum'), NUM_NEGATIVE_LABELS=('BINARY_LABEL', lambda x: (1 - x).sum()), MEAN_SCORE=('RANK_SCORE', 'mean') ).reset_index() # Label bins bin_stats['SCORE_RANGE'] = bin_stats['SCORE_BIN'].apply( lambda x: bin_labels[int(x)]) bin_stats['LEFT_BOUNDARY_EXCLUSIVE'] = bin_stats['SCORE_BIN'].apply( lambda x: bin_boundaries[int(x)][0]) bin_stats['RIGHT_BOUNDARY_INCLUSIVE'] = bin_stats['SCORE_BIN'].apply( lambda x: bin_boundaries[int(x)][1]) # Select and reorder columns result_df = bin_stats[[ 'SCORE_RANGE', 'LEFT_BOUNDARY_EXCLUSIVE', 'RIGHT_BOUNDARY_INCLUSIVE', 'MEAN_SCORE', 'NUM_VARIANTS', 'NUM_POSITIVE_LABELS', 'NUM_NEGATIVE_LABELS' ]] return result_df