Source code for trackcell.io.read_data

"""
Data reading functions for TrackCell package.

This module provides functions for reading spatial transcriptomics data,
particularly from SpaceRanger output (both bin-level and cell segmentation data).
"""

import scanpy as sc
import geopandas as gpd
import pandas as pd
from shapely import wkt, geometry
import numpy as np
import json
import imageio.v3 as iio
from pathlib import Path
from typing import Optional, Union
import ast
import warnings

[docs] def read_hd_bin( datapath: Union[str, Path], sample: Optional[str] = None, binsize: int = 16, matrix_file_h5: str = "filtered_feature_bc_matrix.h5", matrix_file_dir: str = "filtered_feature_bc_matrix", tissue_positions_file: str = "spatial/tissue_positions.parquet", hires_image_file: str = "spatial/tissue_hires_image.png", lowres_image_file: str = "spatial/tissue_lowres_image.png", scalefactors_file: str = "spatial/scalefactors_json.json" ) -> sc.AnnData: """ Read 10X HD SpaceRanger bin-level output (2um/8um/16um) and create an AnnData object with spatial information. This function reads the output from SpaceRanger pipeline and creates an AnnData object that includes spatial coordinates, tissue images, and scalefactors for bin-level data. Parameters ---------- datapath : str or Path Path to the SpaceRanger output directory containing bin-level outputs. sample : str, optional Sample name. If None, will be inferred from the path. binsize : int, default 16 Bin size in micrometers. Common values are 2, 8, or 16. This information will be stored in adata.uns["spatial"][sample]["binsize"]. matrix_file_h5 : str, default "filtered_feature_bc_matrix.h5" Name of the H5 matrix file. Will be tried first. matrix_file_dir : str, default "filtered_feature_bc_matrix" Name of the matrix directory. Will be used if H5 file is not available. tissue_positions_file : str, default "spatial/tissue_positions.parquet" Path to tissue positions file (parquet or csv format). hires_image_file : str, default "spatial/tissue_hires_image.png" Path to the high-resolution tissue image relative to datapath. lowres_image_file : str, default "spatial/tissue_lowres_image.png" Path to the low-resolution tissue image relative to datapath. scalefactors_file : str, default "spatial/scalefactors_json.json" Name of the scalefactors JSON file. Returns ------- sc.AnnData AnnData object containing: - Expression matrix in .X - Cell metadata in .obs - Gene metadata in .var - Spatial coordinates in .obsm["spatial"] - Tissue images in .uns["spatial"][sample]["images"] - Scalefactors in .uns["spatial"][sample]["scalefactors"] - Bin size in .uns["spatial"][sample]["binsize"] Examples -------- >>> import trackcell.io as tcio >>> adata = tcio.read_hd_bin("SpaceRanger4.0/Case1/outs", sample="Case1") >>> print(adata) AnnData object with n_obs × n_vars = 10000 × 2000 obs: 'barcode' obsm: 'spatial' uns: 'spatial' Notes ----- This function expects the SpaceRanger output to have the following structure: - filtered_feature_bc_matrix.h5 or filtered_feature_bc_matrix/: Expression matrix - spatial/tissue_positions.parquet or spatial/tissue_positions.csv: Spatial coordinates - spatial/tissue_hires_image.png: High-resolution tissue image - spatial/tissue_lowres_image.png: Low-resolution tissue image - spatial/scalefactors_json.json: Image scaling factors """ # Convert to Path object for easier handling datapath = Path(datapath).resolve() # If sample is not provided, try to infer from path if sample is None: sample = datapath.parent.parent.name if datapath.name == "outs" else datapath.name # Read expression matrix - try H5 first, then directory h5_path = datapath / matrix_file_h5 matrix_dir_path = datapath / matrix_file_dir if h5_path.exists(): try: adata = sc.read_10x_h5(h5_path) except Exception as e: print(f"Warning: Could not read H5 file {h5_path}: {e}") print(f"Trying to read from directory {matrix_dir_path}") if matrix_dir_path.exists(): adata = sc.read_10x_mtx(matrix_dir_path) else: raise FileNotFoundError(f"Neither H5 file nor matrix directory found in {datapath}") elif matrix_dir_path.exists(): adata = sc.read_10x_mtx(matrix_dir_path) else: raise FileNotFoundError(f"Neither {h5_path} nor {matrix_dir_path} found") # Load the Spatial Coordinates tissue_pos_path = datapath / tissue_positions_file if not tissue_pos_path.exists(): # Try CSV format as fallback tissue_pos_path_csv = datapath / tissue_positions_file.replace('.parquet', '.csv') if tissue_pos_path_csv.exists(): tissue_pos_path = tissue_pos_path_csv else: raise FileNotFoundError(f"Tissue positions file not found: {tissue_pos_path} or {tissue_pos_path_csv}") # Read tissue positions file try: if tissue_pos_path.suffix == '.parquet': df_tissue_positions = pd.read_parquet(tissue_pos_path) else: # Try CSV with different separators try: df_tissue_positions = pd.read_csv(tissue_pos_path, sep=',') except Exception: try: df_tissue_positions = pd.read_csv(tissue_pos_path, sep='\t') except Exception: df_tissue_positions = pd.read_csv(tissue_pos_path, sep=None, engine='python') except Exception as e: raise ValueError(f"Could not read tissue positions file {tissue_pos_path}: {e}") # Set the index of the dataframe to the barcodes if 'barcode' in df_tissue_positions.columns: df_tissue_positions = df_tissue_positions.set_index('barcode') elif df_tissue_positions.index.name == 'barcode' or 'barcode' in str(df_tissue_positions.index.name): # Already indexed by barcode pass else: # Try to use first column as barcode if it's not already indexed if len(df_tissue_positions.columns) > 0: df_tissue_positions = df_tissue_positions.set_index(df_tissue_positions.columns[0]) # Adding the tissue positions to the metadata adata.obs = pd.merge(adata.obs, df_tissue_positions, left_index=True, right_index=True, how='left') # Extract spatial coordinates # Try different possible column names for spatial coordinates coord_cols = None for col_pair in [ ["pxl_col_in_fullres", "pxl_row_in_fullres"], ["pxl_col", "pxl_row"], ["x", "y"], ["array_col", "array_row"] ]: if all(col in adata.obs.columns for col in col_pair): coord_cols = col_pair break if coord_cols is None: raise ValueError( "Could not find spatial coordinate columns. " "Expected one of: ['pxl_col_in_fullres', 'pxl_row_in_fullres'], " "['pxl_col', 'pxl_row'], ['x', 'y'], or ['array_col', 'array_row']" ) adata.obsm['spatial'] = adata.obs[coord_cols].values # Read tissue images try: hires_img = iio.imread(datapath / hires_image_file) lowres_img = iio.imread(datapath / lowres_image_file) except FileNotFoundError as e: print(f"Warning: Could not load tissue images: {e}") hires_img = None lowres_img = None # Initialize spatial metadata adata.uns["spatial"] = {} adata.uns["spatial"][sample] = {} # Load scalefactors try: with open(datapath / scalefactors_file, 'r', encoding='utf-8') as file: scalefactor = json.load(file) except FileNotFoundError as e: print(f"Warning: Could not load scalefactors: {e}") scalefactor = {} # Store images and scalefactors if hires_img is not None and lowres_img is not None: adata.uns["spatial"][sample]["images"] = { "hires": hires_img, "lowres": lowres_img } adata.uns["spatial"][sample]["scalefactors"] = scalefactor adata.uns["spatial"][sample]["binsize"] = binsize return adata
[docs] def read_hd_cellseg( datapath: Union[str, Path], sample: Optional[str] = None, cell_segmentations_file: str = "graphclust_annotated_cell_segmentations.geojson", matrix_file: str = "filtered_feature_cell_matrix.h5", hires_image_file: str = "spatial/tissue_hires_image.png", lowres_image_file: str = "spatial/tissue_lowres_image.png", scalefactors_file: str = "spatial/scalefactors_json.json" ) -> sc.AnnData: """ Read 10X HD SpaceRanger cell segmentation output and create an AnnData object with spatial information. This function reads the output from SpaceRanger pipeline and creates an AnnData object that includes spatial coordinates, cell segmentations, and tissue images. Parameters ---------- datapath : str or Path Path to the SpaceRanger output directory containing segmented outputs. sample : str, optional Sample name. If None, will be inferred from the path. cell_segmentations_file : str, default "graphclust_annotated_cell_segmentations.geojson" Name of the cell segmentations file. matrix_file : str, default "filtered_feature_cell_matrix.h5" Name of the filtered feature-cell matrix file. hires_image_file : str, default "spatial/tissue_hires_image.png" Path to the high-resolution tissue image relative to datapath. lowres_image_file : str, default "spatial/tissue_lowres_image.png" Path to the low-resolution tissue image relative to datapath. scalefactors_file : str, default "spatial/scalefactors_json.json" Name of the scalefactors JSON file. Returns ------- sc.AnnData AnnData object containing: - Expression matrix in .X - Cell metadata in .obs - Gene metadata in .var - Spatial coordinates in .obsm["spatial"] - Tissue images in .uns["spatial"][sample]["images"] - Scalefactors in .uns["spatial"][sample]["scalefactors"] - Cell geometries in .uns["spatial"][sample]["geometries"] (GeoDataFrame) - Cell geometries in .obs["geometry"] (WKT strings for serialization) Examples -------- >>> import trackcell.io as tcio >>> adata = tcio.read_hd_cellseg("SpaceRanger4.0/Case1/outs/segmented_outputs", sample="Case1") >>> print(adata) AnnData object with n_obs × n_vars = 1000 × 2000 obs: 'cellid' obsm: 'spatial' uns: 'spatial' Notes ----- This function expects the SpaceRanger output to have the following structure: - graphclust_annotated_cell_segmentations.geojson: Cell segmentation polygons - filtered_feature_cell_matrix.h5: Expression matrix - spatial/tissue_hires_image.png: High-resolution tissue image - spatial/tissue_lowres_image.png: Low-resolution tissue image - spatial/scalefactors_json.json: Image scaling factors """ # Convert to Path object for easier handling datapath = Path(datapath).resolve() # If sample is not provided, try to infer from path if sample is None: sample = datapath.parent.parent.name # Read cell segmentations # Try the specified file first, then try common alternative names if not found seg_file_path = datapath / cell_segmentations_file if not seg_file_path.exists(): # Try common alternative filenames alternative_names = [ "cell_segmentations.geojson", "cell_segmentations_annotated.geojson", "annotated_cell_segmentations.geojson" ] found_file = None for alt_name in alternative_names: alt_path = datapath / alt_name if alt_path.exists(): found_file = alt_name warnings.warn( f"Specified file '{cell_segmentations_file}' not found. " f"Using alternative file '{alt_name}' instead." ) break if found_file is None: raise FileNotFoundError( f"Cell segmentations file not found: {seg_file_path}\n" f"Also tried: {alternative_names}\n" f"Please specify the correct filename using the `cell_segmentations_file` parameter." ) seg_file_path = datapath / found_file gdf_seg = gpd.read_file(seg_file_path) df = pd.DataFrame(gdf_seg) # Create cellid in the format expected by SpaceRanger df['cellid'] = df['cell_id'].apply(lambda x: f"cellid_{str(x).zfill(9)}-1") # Read expression matrix adata = sc.read_10x_h5(datapath / matrix_file) # Align cell segmentations with expression data # Filter adata to only include cells that have segmentations adata = adata[adata.obs_names.isin(df['cellid']),:] # Filter and reorder df to match adata.obs_names order # Keep cellid as a column throughout (reset_index converts index back to column) df = df.set_index("cellid").loc[adata.obs_names] # Handle case where reset_index creates 'index' column instead of 'cellid' # This can happen if the index name was lost during operations """ if "cellid" not in df.columns: if "index" in df.columns: # Rename 'index' to 'cellid' if it exists df = df.rename(columns={"index": "cellid"}) else: raise ValueError( f"Unexpected: cellid is not a column after reset_index(). " f"Index name: {df.index.name}, Columns: {list(df.columns)}" ) """ # Convert geometry strings to shapely objects if needed if isinstance(df["geometry"].iloc[0], str): df["geometry"] = df["geometry"].apply(wkt.loads) # Extract centroid coordinates df["x"] = df["geometry"].apply(lambda poly: poly.centroid.x) df["y"] = df["geometry"].apply(lambda poly: poly.centroid.y) # Store spatial coordinates adata.obsm["spatial"] = np.array(df[["x", "y"]]) if 'classification' in df.columns: if isinstance(df['classification'].iloc[0], str): classifications = df['classification'].apply(ast.literal_eval) else: classifications = df['classification'] adata.obs['classification'] = [i['name'] for i in classifications] adata.uns['classification_colors'] = convert_classification_to_color_dict(df, 'classification') # Read tissue images try: hires_img = iio.imread(datapath / hires_image_file) lowres_img = iio.imread(datapath / lowres_image_file) except FileNotFoundError as e: print(f"Warning: Could not load tissue images: {e}") hires_img = None lowres_img = None # Initialize spatial metadata adata.uns["spatial"] = {} adata.uns["spatial"][sample] = {} # Load scalefactors try: with open(datapath / scalefactors_file, 'r', encoding='utf-8') as file: scalefactor = json.load(file) except FileNotFoundError as e: print(f"Warning: Could not load scalefactors: {e}") scalefactor = {} # Add spot_diameter_fullres if missing (required by scanpy's sc.pl.spatial) # For cell segmentation data, this is not a real spot diameter but needed for plotting # # Note: This is only used when spot_size parameter is NOT provided to sc.pl.spatial() # - If spot_size is provided: scanpy uses the provided value directly # - If spot_size is None: scanpy reads from scalefactors["spot_diameter_fullres"] # # Setting this ensures compatibility when users don't specify spot_size parameter if "spot_diameter_fullres" not in scalefactor: # Use a reasonable default for cell segmentation data # Typical cell diameter in pixels (scaled to fullres) # If we have fiducial_diameter_fullres, we can estimate based on that if "fiducial_diameter_fullres" in scalefactor: # Estimate spot diameter as a fraction of fiducial diameter # Fiducial markers are typically much larger than cells/spots estimated_spot_diameter = scalefactor["fiducial_diameter_fullres"] / 40.0 else: # Default to 20 pixels for cell segmentation (cells are smaller than spots) estimated_spot_diameter = 20.0 scalefactor["spot_diameter_fullres"] = estimated_spot_diameter # Store images and scalefactors if hires_img is not None and lowres_img is not None: adata.uns["spatial"][sample]["images"] = { "hires": hires_img, "lowres": lowres_img } adata.uns["spatial"][sample]["scalefactors"] = scalefactor # Store geometries: GeoDataFrame for fast access and WKT strings for serialization # Create GeoDataFrame indexed by cellid for easy lookup # cellid should already be a column from the reset_index() above gdf_indexed = gpd.GeoDataFrame(df[["geometry"]], geometry="geometry") #gdf_indexed = gdf_indexed.set_index(df.index) adata.uns["spatial"][sample]["geometries"] = gdf_indexed # Also store WKT strings in obs for serialization compatibility adata.obs["geometry"] = df["geometry"].apply(lambda g: wkt.dumps(g) if g is not None else None) return adata
def convert_classification_to_color_dict(df, classification_col='classification'): """ Convert classification information in DataFrame column to color dictionary. Parameters ---------- df : pandas.DataFrame DataFrame containing classification information. classification_col : str, default 'classification' Column name containing classification information. Returns ------- dict Dictionary mapping classification names to hexadecimal color codes. """ # Ensure data is in dictionary format (convert to dictionary if it's a string) classifications = df[classification_col] # Get unique classifications unique_classes = classifications.explode().unique() # Create color dictionary color_dict = {} for cls in unique_classes: if isinstance(cls, dict): # Ensure it's in dictionary format name = cls['name'] rgb = cls['color'] # Convert RGB list to hexadecimal color code hex_color = '#{:02x}{:02x}{:02x}'.format(*rgb) color_dict[name] = hex_color return color_dict
[docs] def sync_geometries_after_subset(adata, sample: Optional[str] = None): """ Synchronize geometries in adata.uns["spatial"][sample]["geometries"] after subsetting the AnnData object. When you subset an AnnData object (e.g., adata[mask] or adata[adata.obs['col'] == 'value']), the adata.obs["geometry"] column is automatically subset, but the GeoDataFrame in adata.uns["spatial"][sample]["geometries"] is not automatically updated. This function ensures that the geometries GeoDataFrame matches the subsetted cells. Parameters ---------- adata : sc.AnnData AnnData object that has been subsetted. sample : str, optional Sample name. If None, will use the first available sample in adata.uns["spatial"]. Returns ------- sc.AnnData The same AnnData object with synchronized geometries (modified in place). Examples -------- >>> import trackcell.io as tcio >>> adata = tcio.read_hd_cellseg("path/to/data", sample="sample1") >>> # Subset the data >>> adata_subset = adata[adata.obs['classification'] == 'Cluster-1'].copy() >>> # Synchronize geometries >>> tcio.sync_geometries_after_subset(adata_subset, sample="sample1") >>> # Now adata_subset.uns["spatial"]["sample1"]["geometries"] only contains geometries for subsetted cells """ if "spatial" not in adata.uns: warnings.warn( "No spatial information found in adata.uns['spatial']. " "This function is intended for data loaded with read_hd_cellseg()." ) return adata # Determine sample name if sample is None: available_samples = list(adata.uns["spatial"].keys()) if len(available_samples) == 0: warnings.warn("No samples found in adata.uns['spatial'].") return adata sample = available_samples[0] if len(available_samples) > 1: warnings.warn( f"Multiple samples found: {available_samples}. " f"Using '{sample}'. Specify `sample` explicitly to use a different one." ) if sample not in adata.uns["spatial"]: warnings.warn(f"Sample '{sample}' not found in adata.uns['spatial'].") return adata spatial_info = adata.uns["spatial"][sample] # Check if geometries exist if "geometries" not in spatial_info: warnings.warn( f"No geometries found in adata.uns['spatial']['{sample}']['geometries']. " "This function is intended for data loaded with read_hd_cellseg()." ) return adata geometries = spatial_info["geometries"] # Get the current cell IDs in the subsetted adata current_cell_ids = set(adata.obs_names) # Filter geometries to only include cells in the subset # Check if geometries is indexed by cell IDs if isinstance(geometries, gpd.GeoDataFrame): # Filter geometries based on current cell IDs # The geometries GeoDataFrame should be indexed by cell IDs (matching adata.obs_names) # This is how read_hd_cellseg stores them try: # Try to filter by index - this should work if geometries are indexed by cell IDs geometries_subset = geometries.loc[geometries.index.isin(current_cell_ids)] except (KeyError, IndexError) as e: # If index-based filtering fails, geometries might not be properly indexed # Try to match by intersecting indices common_indices = geometries.index.intersection(current_cell_ids) if len(common_indices) > 0: geometries_subset = geometries.loc[common_indices] else: warnings.warn( f"Could not filter geometries by index. " f"Geometries may not be properly indexed by cell IDs. " f"Error: {e}" ) return adata # Update the geometries in place spatial_info["geometries"] = geometries_subset # Note: adata.obs["geometry"] is automatically subset when adata is subset, # so we don't need to manually update it here else: warnings.warn( f"Unexpected geometry format in adata.uns['spatial']['{sample}']['geometries']. " "Expected GeoDataFrame." ) return adata