"""
Classes and methods to query the variant repository.
This layer provides an abstraction layer that sits on top of the
data access layer in the repository module.
It uses the repository module to access the raw data and includes
methods to optionally transform the data to make it more meaningful
or presentation ready to the caller.
"""
from .repository import (
VariantEffectLabelRepository,
VariantRepository,
VariantTaskRepository,
VariantEffectSourceRepository,
VariantEffectScoreRepository,
VariantFilterRepository,
VARIANT_PK_COLUMNS
)
from .model import (
VEQueryCriteria,
VariantFilter
)
from .query_util import validate_query_criteria
import pandas as pd
[docs]
def cleanup_variant_query_params(params: VEQueryCriteria):
if params.gene_symbols is not None:
if len(params.gene_symbols) == 0:
params.gene_symbols = None
else:
if not isinstance(params.gene_symbols, pd.DataFrame):
params.gene_symbols = pd.DataFrame(
{"GENE_SYMBOL": params.gene_symbols})
if params.include_genes is None:
params.include_genes = True
if params.variant_ids is not None:
if len(params.variant_ids) == 0:
params.variant_ids = None
else:
if params.include_variant_ids is None:
params.include_genes = True
if params.column_name_map is not None:
if len(params.column_name) == 0:
params.column_name_map = None
if params.allele_frequency_operator is None:
params.allele_frequency_operator = "="
return params
[docs]
class VEBenchmarkQueryMgr:
"""
Methods to query the variant repository
"""
def __init__(self,
variant_effect_label_repo: VariantEffectLabelRepository,
variant_repo: VariantRepository,
variant_task_repo: VariantTaskRepository,
variant_effect_source_repo: VariantEffectSourceRepository,
variant_effect_score_repo: VariantEffectScoreRepository,
variant_filter_repo: VariantFilterRepository
):
[docs]
self._variant_effect_label_repo = variant_effect_label_repo
[docs]
self._variant_repo = variant_repo
[docs]
self._variant_task_repo = variant_task_repo
[docs]
self._variant_effect_source_repo = variant_effect_source_repo
[docs]
self._variant_effect_score_repo = variant_effect_score_repo
[docs]
self._variant_filter_repo = variant_filter_repo
[docs]
def get_tasks(self) -> pd.DataFrame:
"""Get all tasks"""
return self._variant_task_repo.get_all()
[docs]
def get_all_variants(self) -> pd.DataFrame:
return self._variant_repo.get_all()
[docs]
def get_variants(self, qry: VEQueryCriteria) -> pd.DataFrame:
"""
Fetch variants based on query criteria.
Parameters
----------
qry : VEQueryCriteria
See description of VEQueryCriteria in model package.
Specifies criteria that would limit the set of variants
to be retrieved. The filter_names attribute is ignored.
Returns
-------
DataFrame
"""
validate_query_criteria(qry)
return self._variant_repo.get(qry)
[docs]
def get_variant_effect_sources(
self, task_code: str = None) -> pd.DataFrame:
if task_code is None:
return self._variant_effect_source_repo.get_all()
return self._variant_effect_source_repo.get_by_task(task_code)
@staticmethod
[docs]
def _compute_variant_counts(group) -> pd.Series:
return pd.Series(
{"NUM_VARIANTS": len(group),
"NUM_POSITIVE_LABELS": group["BINARY_LABEL"].sum(),
"NUM_NEGATIVE_LABELS": (group["BINARY_LABEL"] ^ 1).sum(),
"NUM_GENES": group["GENE_SYMBOL"].nunique()
})
[docs]
def get_variant_effect_source_stats(
self, task_code: str, variant_effect_sources=None,
include_variant_effect_sources: bool = True,
qry: VEQueryCriteria = None) -> pd.DataFrame:
"""
Get all variant effect sources for a task along with the
number of variants, number of positive labels,
number of negative labels, number of genes for each source.
Parameters
----------
task_code : str
variant_effect_sources : list, optional
If specified it would restrict the results based on
system supplied vep's in this list.
include_variant_effect_sources : bool, optional
If variant_effect_source is specified, indicates whether to
limit the results to sources in variant_effect_sources or
not in variant_effect_sources.
qry : VEQueryCriteria, optional
See description of VEQueryCriteria in model package.
Specifies criteria that would limit the set of variants
to be retrieved.
Returns
-------
DataFrame
"""
validate_query_criteria(qry)
variant_labels = self._variant_effect_label_repo.get(
task_code, qry)
scores = self._variant_effect_score_repo.get(
task_code, variant_effect_sources,
include_variant_effect_sources, qry)
scores_labels = scores.merge(variant_labels, how="inner",
on=VARIANT_PK_COLUMNS)
grouped_scores = scores_labels.groupby("SCORE_SOURCE")
return grouped_scores.apply(
self._compute_variant_counts,
include_groups=False).reset_index()
[docs]
def get_all_variant_effect_source_stats(self) -> pd.DataFrame:
tasks_df = self._variant_task_repo.get_all()
stats_df = pd.DataFrame(columns=["TASK_NAME", "TASK_CODE",
"NUM_SCORE_SOURCES"])
for row in tasks_df.itertuples():
scores_df = self._variant_effect_score_repo.get_all_by_task( # _slim(
row.CODE)
num_unique_sources = scores_df['SCORE_SOURCE'].nunique()
new_row = pd.DataFrame({"TASK_NAME": [row.NAME],
"TASK_CODE": [row.CODE],
"NUM_SCORE_SOURCES": [num_unique_sources]})
stats_df = pd.concat([stats_df, new_row]).reset_index(drop=True)
return stats_df
[docs]
def get_all_task_variant_effect_label_stats(self) -> pd.DataFrame:
"""
Returns one row per task with number of variants,
number of positive labels, number of negative labels,
number of genes.
Returns
-------
DataFrame
"""
labels = self._variant_effect_label_repo.get_all_for_all_tasks()
grouped_labels = labels.groupby(["TASK_CODE", "TASK_NAME"])
return grouped_labels.apply(
self._compute_variant_counts,
include_groups=False).reset_index()
[docs]
def get_variant_effect_scores(self, task_code: str,
variant_effect_sources=None,
include_variant_effect_sources: bool = True,
qry: VEQueryCriteria = None) -> pd.DataFrame:
"""
Fetches variant effect scores for variant effect sources.
Parameters
----------
task_code : str
task code
variant_effect_sources : list, optional
If specified it would restrict the results based on
system supplied vep's in this list.
include_variant_effect_sources : bool, optional
If variant_effect_source is specified, indicates whether to
limit the results to sources in variant_effect_sources or
not in variant_effect_sources.
qry : VEQueryCriteria, optional
See description of VEQueryCriteria in model package.
Specifies criteria that would limit the set of variants
to be retrieved.
Returns
-------
DataFrame
"""
validate_query_criteria(qry)
return self._variant_effect_score_repo.get(
task_code,
variant_effect_sources,
include_variant_effect_sources,
qry)
[docs]
def get_variants_by_task(self, task_code: str,
qry: VEQueryCriteria = None
) -> pd.DataFrame:
"""
Fetches variants by task. The optional parameters are
filter criteria used to limit the set of variants returned.
Parameters
----------
task_code : str
qry : VEQueryCriteria, optional
See description of VEQueryCriteria in model package.
Specifies criteria that would limit the set of variants
to be retrieved.
Returns
-------
DataFrame
"""
validate_query_criteria(qry)
return self._variant_effect_label_repo.get(task_code,
qry)
[docs]
def get_variant_distribution(self, task_code: str,
by: str = "gene",
qry: VEQueryCriteria = None
) -> pd.DataFrame:
"""
Fetches the distribution of variants by gene or chromsome.
For each gene/chromosome lists number of variants for which we have
labels along with the number of positive and negative label counts.
Parameters
----------
task_code : str
Task code
by : str
Values are gene or chromosome. Specifies the type of distribution
to return.
qry : VEQueryCriteria, optional
See description of VEQueryCriteria in model package.
Specifies criteria that would limit the set of variants
to be retrieved.
Returns
-------
DataFrame
"""
validate_query_criteria(qry)
label_df = self._variant_effect_label_repo.get(task_code, qry)[
['CHROMOSOME', 'GENE_SYMBOL', 'BINARY_LABEL']]
label_df[['POSITIVE_LABEL', 'NEGATIVE_LABEL']] = label_df.apply(
lambda row: [row['BINARY_LABEL'], 1 ^ row['BINARY_LABEL']],
result_type="expand", axis=1
)
if by == 'chromosome':
grouped = label_df.groupby('CHROMOSOME')
else:
grouped = label_df.groupby('GENE_SYMBOL')
return grouped.agg(
NUM_POSITIVE_LABELS=pd.NamedAgg(column='POSITIVE_LABEL',
aggfunc='sum'),
NUM_NEGATIVE_LABELS=pd.NamedAgg(column='NEGATIVE_LABEL',
aggfunc='sum')
).reset_index()
[docs]
def get_variant_filter(
self, task_code: str, filter_name: str) -> VariantFilter:
"""
Return a variant filter for a task by name.
Returns
-------
VariantFilter
Object containing list of genes/variant id's included in the
filter. See description of the object.
"""
return self._variant_filter_repo.get_by_task_filter_name(
task_code, filter_name
)
[docs]
def get_all_variant_filters(self, task_code: str) ->\
dict[str, pd.DataFrame]:
"""
Return basic descriptive information about all variant filters
for a task.
Returns
-------
dict[str, pd.DataFrame]
A dictionary of 3 data frames with the following keys:
filter_df - Data frame of filters containing CODE, NAME,
DESCRIPTION, etc.
filter_gene_df - Data frame of genes associated with each
filter
filter_variant_df - Data frame of variants associated with
each filter
"""
return self._variant_filter_repo.get_by_task(task_code)