Source code for trackcell.io.convert_annohdcell

"""
Convert annohdcell bin2cell output to trackcell format.

This module provides functions to convert 2μm bin-level h5ad files from annohdcell
(with cell assignment labels) into trackcell-compatible h5ad files with cell-level
data and polygon geometries for spatial visualization.
"""

import scanpy as sc
import geopandas as gpd
import pandas as pd
import numpy as np
from shapely.geometry import Polygon, MultiPoint
from shapely import wkt
from scipy.sparse import csr_matrix, issparse
from typing import Optional, Union
from pathlib import Path
import warnings
from multiprocessing import Pool, cpu_count
from functools import partial


def bins_to_cell_polygon(bin_coords: np.ndarray, bin_size_um: float = 2.0,
                         microns_per_pixel: Optional[float] = None) -> Polygon:
    """
    Create a polygon geometry from bin coordinates using convex hull.

    Parameters
    ----------
    bin_coords : np.ndarray
        Array of shape (n_bins, 2) containing [x, y] pixel coordinates of bins
    bin_size_um : float, default 2.0
        Size of each bin in micrometers
    microns_per_pixel : float, optional
        Conversion factor from micrometers to pixels. If provided, will expand
        the polygon slightly to account for bin size.

    Returns
    -------
    Polygon
        Shapely Polygon representing the cell boundary
    """
    if len(bin_coords) == 0:
        raise ValueError("Cannot create polygon from empty bin coordinates")

    if len(bin_coords) == 1:
        # Single bin - create a small square around it
        x, y = bin_coords[0]
        if microns_per_pixel is not None:
            half_size = (bin_size_um / microns_per_pixel) / 2
        else:
            half_size = 1.0  # Default pixel size
        return Polygon([
            (x - half_size, y - half_size),
            (x + half_size, y - half_size),
            (x + half_size, y + half_size),
            (x - half_size, y + half_size)
        ])

    # Multiple bins - use convex hull
    points = MultiPoint(bin_coords)
    polygon = points.convex_hull

    # Optionally buffer the polygon slightly to account for bin size
    if microns_per_pixel is not None and isinstance(polygon, Polygon):
        buffer_pixels = (bin_size_um / microns_per_pixel) / 2
        polygon = polygon.buffer(buffer_pixels)

    return polygon


def _create_polygon_for_cell(args):
    """
    Helper function for parallel polygon creation.

    Parameters
    ----------
    args : tuple
        (cell_id, bin_indices, spatial_coords, bin_size_um, microns_per_pixel)

    Returns
    -------
    tuple
        (cell_id, polygon)
    """
    cell_id, bin_indices, spatial_coords, bin_size_um, microns_per_pixel = args
    bin_coords = spatial_coords[bin_indices]
    polygon = bins_to_cell_polygon(bin_coords, bin_size_um, microns_per_pixel)
    return cell_id, polygon


[docs] def convert_annohdcell_to_trackcell( bin_h5ad_path: Union[str, Path], output_h5ad_path: Optional[Union[str, Path]] = None, sample: Optional[str] = None, labels_key: str = "labels_joint", bin_size_um: float = 2.0, create_polygons: bool = True, buffer_polygons: bool = True ) -> sc.AnnData: """ Convert annohdcell 2μm bin h5ad to trackcell-compatible cell-level h5ad. This function reads a bin-level h5ad file from annohdcell (with cell assignment labels) and converts it to a cell-level h5ad file compatible with trackcell visualization tools. It aggregates bin counts into cells and creates polygon geometries for each cell. Parameters ---------- bin_h5ad_path : str or Path Path to the input 2μm bin h5ad file (e.g., b2c_2um.h5ad from annohdcell) output_h5ad_path : str or Path, optional Path to save the output h5ad file. If None, will not save to disk. sample : str, optional Sample name for spatial metadata. If None, inferred from filename. labels_key : str, default "labels_joint" Column name in .obs containing cell assignment labels (0 = unassigned) bin_size_um : float, default 2.0 Size of each bin in micrometers create_polygons : bool, default True Whether to create polygon geometries from bin coordinates buffer_polygons : bool, default True Whether to buffer polygons slightly to account for bin size Returns ------- sc.AnnData Cell-level AnnData object with: - .X: Summed expression counts per cell - .obs: Cell metadata including "geometry" (WKT strings) - .obsm["spatial"]: Cell centroid coordinates - .uns["spatial"][sample]["geometries"]: GeoDataFrame with cell polygons - .uns["spatial"][sample]["images"]: Tissue images (if present in input) - .uns["spatial"][sample]["scalefactors"]: Scale factors Examples -------- >>> import trackcell.io as tcio >>> adata = tcio.convert_annohdcell_to_trackcell( ... "b2c_2um.h5ad", ... output_h5ad_path="trackcell_format.h5ad", ... sample="sample1" ... ) >>> print(adata) >>> # Now can use trackcell visualization functions >>> import trackcell.pl as tcpl >>> tcpl.spatial_cell(adata, sample="sample1") Notes ----- - Input h5ad must have .obs[labels_key] with integer cell labels (0 = unassigned) - Input h5ad must have .obsm["spatial"] with bin coordinates - Bins with label 0 are excluded (unassigned bins) - Gene counts are summed across all bins in each cell - Cell coordinates are the mean of constituent bin coordinates - Polygon geometries are created using convex hull of bin coordinates """ # Read input h5ad bin_h5ad_path = Path(bin_h5ad_path) print(f"Reading bin-level h5ad from {bin_h5ad_path}") adata_bin = sc.read_h5ad(bin_h5ad_path) # Infer sample name if not provided if sample is None: sample = bin_h5ad_path.stem.replace("_2um", "").replace("b2c_", "") # Check required fields if labels_key not in adata_bin.obs.columns: raise ValueError(f"Column '{labels_key}' not found in .obs. Available columns: {adata_bin.obs.columns.tolist()}") if "spatial" not in adata_bin.obsm: raise ValueError("'spatial' not found in .obsm. Cannot extract bin coordinates.") # Filter out unassigned bins (label == 0) assigned_mask = adata_bin.obs[labels_key] != 0 n_unassigned = (~assigned_mask).sum() print(f"Filtering out {n_unassigned} unassigned bins (label=0)") adata_bin = adata_bin[assigned_mask].copy() # Get unique cell labels cell_labels = adata_bin.obs[labels_key].unique() cell_labels = np.sort(cell_labels) n_cells = len(cell_labels) print(f"Found {n_cells} cells") # Get microns_per_pixel from scalefactors if available microns_per_pixel = None if "spatial" in adata_bin.uns: for key in adata_bin.uns["spatial"]: if "scalefactors" in adata_bin.uns["spatial"][key]: microns_per_pixel = adata_bin.uns["spatial"][key]["scalefactors"].get("microns_per_pixel") if microns_per_pixel is not None: print(f"Using microns_per_pixel = {microns_per_pixel} from input h5ad") break # Create cell-to-bin mapping matrix print("Creating cell-to-bin mapping...") bin_labels = adata_bin.obs[labels_key].values label_to_idx = {label: idx for idx, label in enumerate(cell_labels)} cell_indices = np.array([label_to_idx[label] for label in bin_labels]) # Build sparse matrix: rows = cells, cols = bins n_bins = adata_bin.n_obs cell_to_bin = csr_matrix( (np.ones(n_bins), (cell_indices, np.arange(n_bins))), shape=(n_cells, n_bins) ) # Aggregate expression matrix print("Aggregating expression counts...") if issparse(adata_bin.X): X_cell = cell_to_bin.dot(adata_bin.X) else: X_cell = cell_to_bin.dot(adata_bin.X) # Aggregate spatial coordinates (mean) spatial_bin = adata_bin.obsm["spatial"] spatial_cell = cell_to_bin.dot(spatial_bin) / cell_to_bin.sum(axis=1).A # Count bins per cell bin_counts = np.array(cell_to_bin.sum(axis=1)).flatten() # Create cell-level AnnData print("Creating cell-level AnnData...") adata_cell = sc.AnnData( X=X_cell, obs=pd.DataFrame({ "cellid": [f"cellid_{int(label):09d}-1" for label in cell_labels], "object_id": cell_labels, "bin_count": bin_counts }), var=adata_bin.var.copy() ) adata_cell.obs_names = adata_cell.obs["cellid"] # Add spatial coordinates adata_cell.obsm["spatial"] = spatial_cell # Create polygon geometries if create_polygons: print("Creating polygon geometries...") geometries = [] geometry_wkt = [] for label in cell_labels: # Get bins for this cell bin_mask = bin_labels == label bin_coords = spatial_bin[bin_mask] # Create polygon try: if buffer_polygons and microns_per_pixel is not None: polygon = bins_to_cell_polygon(bin_coords, bin_size_um, microns_per_pixel) else: polygon = bins_to_cell_polygon(bin_coords, bin_size_um, None) geometries.append(polygon) geometry_wkt.append(polygon.wkt) except Exception as e: warnings.warn(f"Failed to create polygon for cell {label}: {e}") # Create a point as fallback centroid = bin_coords.mean(axis=0) point = Polygon([(centroid[0], centroid[1])] * 3) # Degenerate polygon geometries.append(point) geometry_wkt.append(point.wkt) # Add WKT strings to .obs adata_cell.obs["geometry"] = geometry_wkt # Create GeoDataFrame for .uns gdf = gpd.GeoDataFrame( geometry=geometries, index=adata_cell.obs_names ) # Initialize .uns["spatial"] structure if "spatial" not in adata_cell.uns: adata_cell.uns["spatial"] = {} if sample not in adata_cell.uns["spatial"]: adata_cell.uns["spatial"][sample] = {} adata_cell.uns["spatial"][sample]["geometries"] = gdf print(f"Created {len(geometries)} polygon geometries") # Copy spatial metadata from input if available if "spatial" in adata_bin.uns: if "spatial" not in adata_cell.uns: adata_cell.uns["spatial"] = {} # Copy from first available sample in input for input_sample in adata_bin.uns["spatial"]: input_spatial = adata_bin.uns["spatial"][input_sample] if sample not in adata_cell.uns["spatial"]: adata_cell.uns["spatial"][sample] = {} # Copy images if "images" in input_spatial: adata_cell.uns["spatial"][sample]["images"] = input_spatial["images"].copy() # Copy scalefactors if "scalefactors" in input_spatial: adata_cell.uns["spatial"][sample]["scalefactors"] = input_spatial["scalefactors"].copy() # Update spot_diameter_fullres to reflect cell size if "spot_diameter_fullres" in adata_cell.uns["spatial"][sample]["scalefactors"]: mean_bin_count = bin_counts.mean() diameter_scale = np.sqrt(mean_bin_count) original_diameter = adata_cell.uns["spatial"][sample]["scalefactors"]["spot_diameter_fullres"] adata_cell.uns["spatial"][sample]["scalefactors"]["spot_diameter_fullres"] = original_diameter * diameter_scale print(f"Scaled spot_diameter_fullres by {diameter_scale:.2f}x (mean bin_count = {mean_bin_count:.1f})") # Copy binsize if present if "binsize" in input_spatial: adata_cell.uns["spatial"][sample]["binsize"] = None # Cell-level, not bin-level break # Only copy from first sample # Calculate QC metrics print("Calculating QC metrics...") adata_cell.var_names_make_unique() sc.pp.calculate_qc_metrics(adata_cell, inplace=True) # Save if output path provided if output_h5ad_path is not None: output_h5ad_path = Path(output_h5ad_path) print(f"Saving to {output_h5ad_path}") adata_cell.write_h5ad(output_h5ad_path) print("Conversion complete!") print(f"Output: {adata_cell.n_obs} cells × {adata_cell.n_vars} genes") return adata_cell
[docs] def add_geometries_to_annohdcell_output( bin_h5ad_path: Union[str, Path], cell_h5ad_path: Union[str, Path], output_h5ad_path: Optional[Union[str, Path]] = None, sample: Optional[str] = None, labels_key: str = "labels_joint", bin_size_um: float = 2.0, buffer_polygons: bool = True, n_jobs: int = -1 ) -> sc.AnnData: """ Add polygon geometries to annohdcell's final cell h5ad using bin-level data. This function reads both the 2μm bin h5ad (with cell labels) and the final cell-level h5ad from annohdcell, then adds polygon geometries to the cell h5ad by creating convex hulls from the constituent bins of each cell. Parameters ---------- bin_h5ad_path : str or Path Path to the 2μm bin h5ad file (e.g., b2c_2um.h5ad) with cell labels cell_h5ad_path : str or Path Path to the final cell h5ad file (e.g., b2c_cell.h5ad) from annohdcell output_h5ad_path : str or Path, optional Path to save the output h5ad file. If None, will not save to disk. sample : str, optional Sample name for spatial metadata. If None, inferred from filename. labels_key : str, default "labels_joint" Column name in bin h5ad .obs containing cell assignment labels bin_size_um : float, default 2.0 Size of each bin in micrometers buffer_polygons : bool, default True Whether to buffer polygons slightly to account for bin size n_jobs : int, default -1 Number of parallel workers for polygon creation. -1 uses all available CPU cores, 1 disables parallelization. Returns ------- sc.AnnData Cell-level AnnData object (copy of input cell h5ad) with added: - .obs["geometry"]: WKT strings representing cell boundaries - .uns["spatial"][sample]["geometries"]: GeoDataFrame with cell polygons Examples -------- >>> import trackcell.io as tcio >>> adata = tcio.add_geometries_to_annohdcell_output( ... bin_h5ad_path="b2c_2um.h5ad", ... cell_h5ad_path="b2c_cell.h5ad", ... output_h5ad_path="b2c_cell_with_geom.h5ad", ... sample="sample1" ... ) >>> # Now can use trackcell visualization >>> import trackcell.pl as tcpl >>> tcpl.spatial_cell(adata, sample="sample1") Notes ----- - The cell h5ad must have .obs["object_id"] matching labels in bin h5ad - Bin h5ad must have .obs[labels_key] with integer cell labels - Bin h5ad must have .obsm["spatial"] with bin coordinates - The function preserves all data from the input cell h5ad - Only adds geometry information, does not modify counts or other data """ # Read both h5ad files bin_h5ad_path = Path(bin_h5ad_path) cell_h5ad_path = Path(cell_h5ad_path) print(f"Reading bin-level h5ad from {bin_h5ad_path}") adata_bin = sc.read_h5ad(bin_h5ad_path) print(f"Reading cell-level h5ad from {cell_h5ad_path}") adata_cell = sc.read_h5ad(cell_h5ad_path) # Infer sample name if not provided if sample is None: sample = cell_h5ad_path.stem.replace("_cell", "").replace("b2c_", "") # Check required fields in bin h5ad if labels_key not in adata_bin.obs.columns: raise ValueError(f"Column '{labels_key}' not found in bin h5ad .obs. Available: {adata_bin.obs.columns.tolist()}") if "spatial" not in adata_bin.obsm: raise ValueError("'spatial' not found in bin h5ad .obsm") # Check required fields in cell h5ad if "object_id" not in adata_cell.obs.columns: raise ValueError("Column 'object_id' not found in cell h5ad .obs. This should be the cell label ID.") # Get microns_per_pixel from scalefactors microns_per_pixel = None if "spatial" in adata_bin.uns: for key in adata_bin.uns["spatial"]: if "scalefactors" in adata_bin.uns["spatial"][key]: microns_per_pixel = adata_bin.uns["spatial"][key]["scalefactors"].get("microns_per_pixel") if microns_per_pixel is not None: print(f"Using microns_per_pixel = {microns_per_pixel}") break # Filter out unassigned bins (label == 0) assigned_mask = adata_bin.obs[labels_key] != 0 adata_bin_filtered = adata_bin[assigned_mask].copy() # Get bin labels and coordinates bin_labels = adata_bin_filtered.obs[labels_key].values spatial_bin = adata_bin_filtered.obsm["spatial"] # Get cell labels from cell h5ad cell_labels = adata_cell.obs["object_id"].values n_cells = len(cell_labels) print(f"Processing {n_cells} cells") # Create polygon geometries for each cell print("Creating polygon geometries from bin coordinates...") # Build a mapping from cell_label to bin indices for vectorized processing print("Building cell-to-bin mapping...") cell_to_bins = {} for bin_idx, label in enumerate(bin_labels): if label not in cell_to_bins: cell_to_bins[label] = [] cell_to_bins[label].append(bin_idx) # Convert lists to numpy arrays for faster indexing for label in cell_to_bins: cell_to_bins[label] = np.array(cell_to_bins[label], dtype=np.int32) # Prepare arguments for parallel processing args_list = [] cell_label_to_obs_idx = {} # Map cell label to obs index for idx, label in enumerate(cell_labels): cell_label_to_obs_idx[label] = idx if label not in cell_to_bins: warnings.warn(f"Cell {label} has no bins assigned, skipping geometry creation") continue bin_indices = cell_to_bins[label] mp = microns_per_pixel if buffer_polygons else None args_list.append((label, bin_indices, spatial_bin, bin_size_um, mp)) print(f"Processing {len(args_list)} cells with bins...") # Process in parallel or serial depending on n_jobs if n_jobs == 1: # Serial processing results = [] for args in args_list: results.append(_create_polygon_for_cell(args)) else: # Parallel processing actual_jobs = min(n_jobs if n_jobs > 0 else cpu_count(), len(args_list)) print(f"Using {actual_jobs} parallel workers") with Pool(processes=actual_jobs) as pool: results = pool.map(_create_polygon_for_cell, args_list) print(f"Created {len(results)} polygon geometries") # Unpack results - directly store Shapely objects, no WKT conversion geometries = [] cell_ids_with_geom = [] geometry_wkt = [] for cell_label, polygon in results: obs_idx = cell_label_to_obs_idx[cell_label] cell_ids_with_geom.append(adata_cell.obs_names[obs_idx]) geometries.append(polygon) geometry_wkt.append(polygon.wkt) # Only convert to WKT for .obs storage # Add WKT strings to .obs # Initialize with None for all cells adata_cell.obs["geometry"] = None # Fill in geometries for cells that have them for cell_id, wkt_str in zip(cell_ids_with_geom, geometry_wkt): adata_cell.obs.loc[cell_id, "geometry"] = wkt_str # Create GeoDataFrame directly from Shapely objects (no WKT round-trip) gdf = gpd.GeoDataFrame( geometry=geometries, index=cell_ids_with_geom ) # Initialize .uns["spatial"] structure if needed if "spatial" not in adata_cell.uns: adata_cell.uns["spatial"] = {} if sample not in adata_cell.uns["spatial"]: adata_cell.uns["spatial"][sample] = {} # Add geometries to .uns adata_cell.uns["spatial"][sample]["geometries"] = gdf # Copy spatial metadata from bin h5ad if not already present in cell h5ad if "spatial" in adata_bin.uns: for input_sample in adata_bin.uns["spatial"]: input_spatial = adata_bin.uns["spatial"][input_sample] # Copy images if not present if "images" in input_spatial and "images" not in adata_cell.uns["spatial"][sample]: adata_cell.uns["spatial"][sample]["images"] = input_spatial["images"].copy() print("Copied tissue images from bin h5ad") # Copy scalefactors if not present if "scalefactors" in input_spatial and "scalefactors" not in adata_cell.uns["spatial"][sample]: adata_cell.uns["spatial"][sample]["scalefactors"] = input_spatial["scalefactors"].copy() print("Copied scalefactors from bin h5ad") break # Save if output path provided if output_h5ad_path is not None: output_h5ad_path = Path(output_h5ad_path) print(f"Saving to {output_h5ad_path}") # Convert GeoDataFrame to regular DataFrame with WKT strings for h5ad serialization gdf_backup = adata_cell.uns["spatial"][sample]["geometries"] adata_cell.uns["spatial"][sample]["geometries"] = pd.DataFrame( {"geometry": gdf_backup.geometry.apply(lambda g: g.wkt)}, index=gdf_backup.index ) adata_cell.write_h5ad(output_h5ad_path) # Restore GeoDataFrame in memory adata_cell.uns["spatial"][sample]["geometries"] = gdf_backup print("Geometry addition complete!") print(f"Output: {adata_cell.n_obs} cells × {adata_cell.n_vars} genes with geometries") return adata_cell
[docs] def restore_geometries(adata: sc.AnnData, sample: Optional[str] = None) -> sc.AnnData: """ Restore GeoDataFrame from WKT strings after reading h5ad file. When h5ad files are saved with geometries, the GeoDataFrame is converted to a regular DataFrame with WKT strings for serialization. This function converts them back to a GeoDataFrame for spatial visualization. Parameters ---------- adata : sc.AnnData AnnData object read from h5ad file sample : str, optional Sample name in adata.uns["spatial"]. If None, processes all samples. Returns ------- sc.AnnData AnnData object with GeoDataFrame restored in uns["spatial"][sample]["geometries"] Examples -------- >>> import scanpy as sc >>> import trackcell as tcl >>> adata = sc.read_h5ad("cell_with_geom.h5ad") >>> adata = tcl.io.restore_geometries(adata) >>> # Now adata.uns["spatial"][sample]["geometries"] is a GeoDataFrame """ if "spatial" not in adata.uns: warnings.warn("No spatial data found in adata.uns") return adata samples = [sample] if sample is not None else list(adata.uns["spatial"].keys()) for s in samples: if s not in adata.uns["spatial"]: warnings.warn(f"Sample '{s}' not found in adata.uns['spatial']") continue if "geometries" not in adata.uns["spatial"][s]: continue geom_data = adata.uns["spatial"][s]["geometries"] # Check if it's already a GeoDataFrame if isinstance(geom_data, gpd.GeoDataFrame): continue # Convert DataFrame with WKT strings to GeoDataFrame if isinstance(geom_data, pd.DataFrame) and "geometry" in geom_data.columns: geometries = geom_data["geometry"].apply(wkt.loads) gdf = gpd.GeoDataFrame( geometry=geometries, index=geom_data.index ) adata.uns["spatial"][s]["geometries"] = gdf print(f"Restored {len(gdf)} geometries for sample '{s}'") return adata