Source code for aigct.repository

"""
Data access layer methods for accessing variant repository.
Classes here provide an encapsulation layer to hide the internal
details of the repository structure.
"""

import os
import pandas as pd
import dask.dataframe as dd
from .util import ParameterizedSingleton
import threading
from dataclasses import dataclass, field
from .pd_util import (
    filter_dataframe_by_list,
    build_dataframe_where_clause,
    merge_by_chunks
)
from .model import VariantFilter, VEQueryCriteria

[docs] MERGE_CHUNK_SIZE = 500000
[docs] TASK_SUBFOLDER = { "CANCER": "CANCER" }
[docs] DATA_FOLDER = "data"
[docs] TASK_FOLDERS = [os.path.join(DATA_FOLDER, task) for task in ["CANCER", "ADRD", "CHD", "DDD"]]
@dataclass
[docs] class TableDef:
[docs] folder: str
[docs] file_name: str
[docs] pk_columns: list[str]
[docs] non_pk_columns: list[str]
[docs] columns: list[str] = field(init=False),
[docs] full_file_name: str = field(init=False)
[docs] def __post_init__(self): self.columns = self.pk_columns + self.non_pk_columns self.full_file_name = os.path.join(self.folder, self.file_name)
[docs] VARIANT_PK_COLUMNS = [ "GENOME_ASSEMBLY", "CHROMOSOME", "POSITION", "REFERENCE_NUCLEOTIDE", "ALTERNATE_NUCLEOTIDE" ]
[docs] VARIANT_NON_PK_COLUMNS = [ "PRIOR_GENOME_ASSEMBLY", "PRIOR_CHROMOSOME", "PRIOR_POSITION", "PRIOR_PRIOR_GENOME_ASSEMBLY", "PRIOR_PRIOR_CHROMOSOME", "PRIOR_PRIOR_POSITION", "REFERENCE_AMINO_ACID", "ALTERNATE_AMINO_ACID", "AMINO_ACID_POSITION", "RS_DBSNP", "GENE_SYMBOL", "ENSEMBL_GENE_ID", "ENSEMBL_TRANSCRIPT_ID", "ENSEMBL_PROTEIN_ID", "ALLELE_FREQUENCY_SOURCE", "ALLELE_FREQUENCY" ]
[docs] VARIANT_TABLE_DEF = TableDef(DATA_FOLDER, "variant.csv", VARIANT_PK_COLUMNS, VARIANT_NON_PK_COLUMNS)
[docs] VARIANT_LABEL_NON_PK_COLUMNS = [ "LABEL_SOURCE", "RAW_LABEL", "BINARY_LABEL" ]
[docs] VARIANT_EFFECT_LABEL_TABLE_DEF = TableDef(DATA_FOLDER, "variant_effect_label.csv", VARIANT_PK_COLUMNS, VARIANT_LABEL_NON_PK_COLUMNS)
[docs] VARIANT_EFFECT_SCORE_PK_COLUMNS = VARIANT_PK_COLUMNS + ["SCORE_SOURCE"]
[docs] VARIANT_EFFECT_SCORE_NON_PK_COLUMNS = [ "RAW_SCORE", "RANK_SCORE" ]
[docs] VARIANT_EFFECT_SCORE_TABLE_DEF = TableDef(DATA_FOLDER, "variant_effect_score.csv", VARIANT_EFFECT_SCORE_PK_COLUMNS, VARIANT_EFFECT_SCORE_NON_PK_COLUMNS)
[docs] VARIANT_TASK_TABLE_DEF = TableDef(DATA_FOLDER, "variant_task.csv", ["CODE"], ["NAME", "DESCRIPTION"])
[docs] VARIANT_EFFECT_SOURCE_TABLE_DEF =\ TableDef(DATA_FOLDER, "variant_effect_source.csv", ["CODE"], ["NAME", "SOURCE_TYPE", "DESCRIPTION"])
[docs] VARIANT_DATA_SOURCE_TABLE_DEF =\ TableDef(DATA_FOLDER, "variant_data_source.csv", ["CODE"], ["NAME", "DESCRIPTION"])
[docs] VARIANT_FILTER_TABLE_DEF =\ TableDef(DATA_FOLDER, "variant_filter.csv", ["CODE"], ["NAME", "DESCRIPTION", "INCLUDE_GENES", "INCLUDE_VARIANTS"])
[docs] VARIANT_FILTER_GENE_TABLE_DEF =\ TableDef(DATA_FOLDER, "variant_filter_gene.csv", ["FILTER_CODE", "GENE_SYMBOL"], [])
[docs] VARIANT_FILTER_VARIANT_TABLE_DEF =\ TableDef(DATA_FOLDER, "variant_filter_variant.csv", ["FILTER_CODE"] + VARIANT_PK_COLUMNS, [])
[docs] TABLE_DEFS = { "VARIANT_TASK": VARIANT_TASK_TABLE_DEF, "VARIANT_EFFECT_SOURCE": VARIANT_EFFECT_SOURCE_TABLE_DEF, "VARIANT": VARIANT_TABLE_DEF, "VARIANT_EFFECT_LABEL": VARIANT_EFFECT_LABEL_TABLE_DEF, "VARIANT_DATA_SOURCE": VARIANT_DATA_SOURCE_TABLE_DEF, "VARIANT_EFFECT_SCORE": VARIANT_EFFECT_SCORE_TABLE_DEF, "VARIANT_FILTER": VARIANT_FILTER_TABLE_DEF, "VARIANT_FILTER_GENE": VARIANT_FILTER_GENE_TABLE_DEF, "VARIANT_FILTER_VARIANT": VARIANT_FILTER_VARIANT_TABLE_DEF, }
[docs] def read_repo_csv(file: str) -> pd.DataFrame: return pd.read_csv(file, dtype={"CHROMOSOME": str, "BINARY_LABEL": int})
[docs] class RepoSessionContext: def __init__(self, data_folder_root: str, table_defs: dict[str, TableDef]):
[docs] self._data_folder_root = data_folder_root
[docs] self._table_defs = table_defs
@property
[docs] def data_folder_root(self): return self._data_folder_root
[docs] def table_def(self, table_name: str): return self._table_defs[table_name]
[docs] def table_file(self, table_name: str, task: str = None): if task: return os.path.join(self._data_folder_root, DATA_FOLDER, task, self._table_defs[table_name].file_name) else: return os.path.join(self._data_folder_root, DATA_FOLDER, self._table_defs[table_name].file_name)
[docs] class VariantEffectLabelCache(ParameterizedSingleton): """ Caches the variant csv file in a dataframe. Implements the singleton pattern to ensure there is only one instance of the cached dataframe. We use an _init_once method rather than the normal __init__ method as required by the ParameterizedSingleton class. """
[docs] def _init_once(self, data_folder_root: str): self._data_folder_root = data_folder_root self._lock = threading.Lock() self._cache = dict()
[docs] def get_data_frame(self, task_code: str): if task_code not in self._cache: with self._lock: if task_code not in self._cache: self._cache[task_code] = read_repo_csv( os.path.join(self._data_folder_root, DATA_FOLDER, task_code, VARIANT_EFFECT_LABEL_TABLE_DEF.file_name)) return self._cache[task_code]
[docs] class DataCache(ParameterizedSingleton): """ Caches a repository csv file in a dataframe. Implements the singleton pattern to ensure there is only one instance of the cached dataframe. We use an _init_once method rather than the normal __init__ method as required by the ParameterizedSingleton class. """
[docs] def _init_once(self, data_folder_root: str, table_def: TableDef): self._data_folder_root = data_folder_root self._table_def = table_def self._lock = threading.Lock() self._data_frame = None
@property
[docs] def data_frame(self): if self._data_frame is None: with self._lock: if self._data_frame is None: self._data_frame = read_repo_csv( os.path.join(self._data_folder_root, self._table_def.full_file_name)) return self._data_frame
[docs] class TaskBasedDataCache(ParameterizedSingleton): """ Caches a repository csv file in a dataframe. Maintains a separate cache for each task in a dict. Implements the singleton pattern to ensure there is only one instance of the cached dataframe. We use an _init_once method rather than the normal __init__ method as required by the ParameterizedSingleton class. """
[docs] def _init_once(self, data_folder_root: str, table_def: TableDef, disable_cache: bool = False): self._data_folder_root = data_folder_root self._table_def = table_def self._cache = dict() self._lock = threading.Lock() self._disable_cache = disable_cache
[docs] def get_data_frame(self, task_code: str): if task_code not in self._cache: with self._lock: if task_code not in self._cache: data_frame = read_repo_csv( os.path.join(self._data_folder_root, DATA_FOLDER, task_code, self._table_def.file_name)) if self._disable_cache: return data_frame self._cache[task_code] = data_frame return self._cache[task_code]
[docs] class TaskDataCache(DataCache): """ Caches the variant csv file in a dataframe. Implements the singleton pattern to ensure there is only one instance of the cached dataframe. We use an _init_once method rather than the normal __init__ method as required by the ParameterizedSingleton class. """
[docs] def _init_once(self, data_folder_root: str): super()._init_once(data_folder_root, VARIANT_TASK_TABLE_DEF)
[docs] class VariantEffectScoreCache(TaskBasedDataCache): """ Caches the variant csv file in a dataframe. Implements the singleton pattern to ensure there is only one instance of the cached dataframe. We use an _init_once method rather than the normal __init__ method as required by the ParameterizedSingleton class. """
[docs] def _init_once(self, data_folder_root: str, disable_cache: bool = False): super()._init_once(data_folder_root, VARIANT_EFFECT_SCORE_TABLE_DEF, disable_cache)
[docs] class VariantCache(DataCache): """ Caches the variant csv file in a dataframe. Implements the singleton pattern to ensure there is only one instance of the cached dataframe. We use an _init_once method rather than the normal __init__ method as required by the ParameterizedSingleton class. """
[docs] def _init_once(self, data_folder_root: str): super()._init_once(data_folder_root, VARIANT_TABLE_DEF)
[docs] class VariantTaskCache(DataCache): """ Caches the variant csv file in a dataframe. Implements the singleton pattern to ensure there is only one instance of the cached dataframe. We use an _init_once method rather than the normal __init__ method as required by the ParameterizedSingleton class. """
[docs] def _init_once(self, data_folder_root: str): super()._init_once(data_folder_root, VARIANT_TASK_TABLE_DEF)
[docs] class VariantEffectSourceCache(DataCache):
[docs] def _init_once(self, data_folder_root: str): super()._init_once(data_folder_root, VARIANT_EFFECT_SOURCE_TABLE_DEF)
[docs] class VariantFilterCache(ParameterizedSingleton):
[docs] def _init_once(self, data_folder_root: str): self._data_folder_root = data_folder_root self._lock = threading.Lock() self._cache = dict()
[docs] def get_data_frames(self, task_code: str) -> dict: if task_code not in self._cache: with self._lock: if task_code not in self._cache: folder = os.path.join(self._data_folder_root, DATA_FOLDER, task_code) cache_dict = dict() cache_dict["filter_df"] = read_repo_csv( os.path.join(folder, VARIANT_FILTER_TABLE_DEF.file_name)) cache_dict["filter_gene_df"] = read_repo_csv( os.path.join(folder, VARIANT_FILTER_GENE_TABLE_DEF.file_name)) cache_dict["filter_variant_df"] = read_repo_csv( os.path.join( folder, VARIANT_FILTER_VARIANT_TABLE_DEF.file_name)) self._cache[task_code] = cache_dict return self._cache[task_code]
[docs] class VariantEffectSourceRepository: def __init__(self, session_context: RepoSessionContext, variant_effect_score_repo):
[docs] self._cache = VariantEffectSourceCache( session_context.data_folder_root)
[docs] self._variant_effect_score_repo = variant_effect_score_repo
[docs] def get_all(self) -> pd.DataFrame: return self._cache.data_frame.copy(deep=True)
[docs] def get_by_task(self, task_code: str) -> pd.DataFrame: score_sources = self._variant_effect_score_repo.get_all_by_task( task_code)['SCORE_SOURCE'].unique() source_df = self._cache.data_frame.copy(deep=True) return source_df[source_df['CODE'].isin(score_sources)]
[docs] def get_by_code(self, codes: list[str]) -> pd.DataFrame: return filter_dataframe_by_list( self._cache.data_frame, codes, VARIANT_EFFECT_SOURCE_TABLE_DEF.pk_columns)
[docs] class VariantTaskRepository: def __init__(self, session_context: RepoSessionContext):
[docs] self._cache = VariantTaskCache(session_context.data_folder_root)
[docs] def get_all(self) -> pd.DataFrame: return self._cache.data_frame.copy(deep=True)
[docs] class VariantFilterRepository: def __init__(self, session_context: RepoSessionContext):
[docs] self._cache = VariantFilterCache(session_context.data_folder_root)
[docs] def get_by_task(self, task_code: str) -> dict[str, pd.DataFrame]: return self._cache.get_data_frames(task_code)
[docs] def get_by_task_filter_name( self, task_code: str, filter_name: str) -> VariantFilter: filter_dfs = self._cache.get_data_frames(task_code) filter = filter_dfs["filter_df"].query(f"NAME == '{filter_name}'") if len(filter) == 0: return None filter = filter.iloc[0] filter_code = filter["CODE"] filter_genes = filter_dfs["filter_gene_df"].query( f"FILTER_CODE == '{filter_code}'") filter_variants = filter_dfs["filter_variant_df"].query( f"FILTER_CODE == '{filter_code}'") if len(filter_genes) == 0 and len(filter_variants) == 0: return None return VariantFilter(filter, filter_genes, filter_variants)
[docs] def get_by_task_filter_names( self, task_code: str, filter_names: list[str]) -> list[ VariantFilter]: variant_filters = [] for filter_name in filter_names: variant_filter = self.get_by_task_filter_name(task_code, filter_name) if variant_filter is None: raise Exception(f"Invalid filter name: {filter_name}") variant_filters.append(variant_filter) return variant_filters
[docs] def query_by_filter(query_df: pd.DataFrame, filter: pd.Series, filter_gene_df: pd.DataFrame, filter_variant_df: pd.DataFrame) -> pd.DataFrame: if len(filter_gene_df) > 0: query_df = filter_dataframe_by_list(query_df, filter_gene_df, ['GENE_SYMBOL'], None, filter["INCLUDE_GENES"] == 'Y') if len(filter_variant_df) > 0: query_df = filter_dataframe_by_list(query_df, filter_variant_df, VARIANT_PK_COLUMNS, None, filter["INCLUDE_VARIANTS"] == 'Y') return query_df
[docs] def query_by_filters(query_df: pd.DataFrame, filters: list[VariantFilter] ) -> pd.DataFrame: results = [] for filter in filters: result = query_by_filter(query_df, filter.filter, filter.filter_genes, filter.filter_variants) results.append(result) return pd.concat(results).drop_duplicates()
[docs] class VariantRepository: def __init__(self, session_context: RepoSessionContext):
[docs] self._cache = VariantCache(session_context.data_folder_root)
[docs] def get_all(self) -> pd.DataFrame: return self._cache.data_frame.copy(deep=True)
[docs] def get(self, qry: VEQueryCriteria) -> pd.DataFrame: """ Fetches variants. The optional parameters are filter criteria used to limit the set of variants returned. """ where_clause = build_dataframe_where_clause( {"ALLELE_FREQUENCY": [qry.allele_frequency_operator, qry.allele_frequency]}) variant_df = self._cache.data_frame if where_clause != "": variant_df = variant_df.query(where_clause) if qry.gene_symbols is not None: variant_df = filter_dataframe_by_list(variant_df, qry.gene_symbols, ['GENE_SYMBOL'], in_list=qry.include_genes) if qry.variant_ids is not None: variant_df = filter_dataframe_by_list(variant_df, qry.variant_ids, VARIANT_PK_COLUMNS, qry.column_name_map, qry.include_variant_ids) return variant_df
[docs] class VariantEffectLabelRepository: def __init__(self, session_context: RepoSessionContext, variant_task_repo: VariantTaskRepository, variant_repo: VariantRepository, filter_repo: VariantFilterRepository):
[docs] self._cache = VariantEffectLabelCache(session_context.data_folder_root)
[docs] self._task_repo = variant_task_repo
[docs] self._filter_repo = filter_repo
[docs] self._variant_repo = variant_repo
[docs] def get_all_by_task(self, task_code: str) -> pd.DataFrame: label_df = self._cache.get_data_frame(task_code) variant_df = self._variant_repo.get( VEQueryCriteria(variant_ids=label_df[VARIANT_PK_COLUMNS])) return label_df.merge(variant_df, on=VARIANT_PK_COLUMNS, how="inner")
[docs] def get_all_for_all_tasks(self) -> pd.DataFrame: tasks_df = self._task_repo.get_all() labels_dfs = [] for row in tasks_df.itertuples(): labels_df = self.get_all_by_task(row.CODE) labels_df["TASK_CODE"] = row.CODE labels_df["TASK_NAME"] = row.NAME labels_dfs.append(labels_df) return pd.concat(labels_dfs)
[docs] def get(self, task_code: str, qry: VEQueryCriteria = None) -> pd.DataFrame: """ Fetches variant effect labels. """ label_df = self._cache.get_data_frame(task_code) if qry is not None: if qry.variant_ids is not None and len(qry.variant_ids) > 0: label_df = filter_dataframe_by_list( label_df, qry.variant_ids, VARIANT_PK_COLUMNS, in_list=qry.include_variant_ids) variant_df = self._variant_repo.get(qry) else: variant_df = self._variant_repo.get_all() merge_df = label_df.merge(variant_df, how="inner", on=VARIANT_PK_COLUMNS) if qry is not None and qry.filter_names is not None: if isinstance(qry.filter_names, str): filter_names = [qry.filter_names] else: filter_names = qry.filter_names filters = self._filter_repo.get_by_task_filter_names( task_code, filter_names) merge_df = query_by_filters(merge_df, filters) return merge_df
[docs] class VariantEffectScoreRepository: def __init__(self, session_context: RepoSessionContext, task_repo: VariantTaskRepository, variant_repo: VariantRepository, filter_repo: VariantFilterRepository):
[docs] self._cache = VariantEffectScoreCache(session_context.data_folder_root, disable_cache=True)
[docs] self._task_repo = task_repo
[docs] self._filter_repo = filter_repo
[docs] self._variant_repo = variant_repo
[docs] def get_all_by_task(self, task_code: str) -> pd.DataFrame: score_df = self._cache.get_data_frame(task_code) query_criteria = VEQueryCriteria(variant_ids=score_df[ VARIANT_PK_COLUMNS].drop_duplicates(), include_variant_ids=True) variant_df = self._variant_repo.get(query_criteria) return merge_by_chunks(score_df, variant_df, chunk_size=MERGE_CHUNK_SIZE, on=VARIANT_PK_COLUMNS, how="inner")
[docs] def get_all_by_task_slim(self, task_code: str) -> pd.DataFrame: score_df = self._cache.get_data_frame(task_code) return score_df
[docs] def get(self, task_code: str, variant_effect_sources: list[str] | str = None, include_variant_effect_sources: bool = True, qry: VEQueryCriteria = None) -> pd.DataFrame: score_df = self._cache.get_data_frame(task_code) if (variant_effect_sources is not None and len(variant_effect_sources) > 0): score_df = filter_dataframe_by_list(score_df, variant_effect_sources, "SCORE_SOURCE", None, include_variant_effect_sources) if qry is None: return score_df if qry.variant_ids is not None: score_df = filter_dataframe_by_list(score_df, qry.variant_ids, VARIANT_PK_COLUMNS, qry.column_name_map, qry.include_variant_ids) variant_df = self._variant_repo.get(qry) merge_df = merge_by_chunks(score_df, variant_df, chunk_size=MERGE_CHUNK_SIZE, how="inner", on=VARIANT_PK_COLUMNS) if qry.filter_names is not None: if isinstance(qry.filter_names, str): filter_names = [qry.filter_names] else: filter_names = qry.filter_names filters = self._filter_repo.get_by_task_filter_names( task_code, filter_names) merge_df = query_by_filters(merge_df, filters) return merge_df[VARIANT_EFFECT_SCORE_TABLE_DEF.columns]
[docs] def get_all_for_all_tasks(self) -> pd.DataFrame: tasks_df = self._task_repo.get_all() scores_dfs = [] for row in tasks_df.itertuples(): scores_df = self.get_all_by_task_slim(row.CODE) scores_df["TASK_CODE"] = row.CODE scores_df["TASK_NAME"] = row.NAME scores_dfs.append(scores_df) return pd.concat(scores_dfs)