# load required packages
from __future__ import annotations
import os
import platform
import subprocess
import sys
import tempfile
import zipfile
import requests
if platform.system() == "Windows":
vipsbin = r"c:\vips-dev-8.15\bin\vips-dev-8.15\bin"
vips_file_path = os.path.join(vipsbin, "vips.exe")
# Check if VIPS is installed
if not os.path.exists(vips_file_path):
# VIPS is not installed, download and extract it
url = "https://github.com/libvips/build-win64-mxe/releases/download/v8.15.2/vips-dev-w64-all-8.15.2.zip"
zip_file_path = "vips-dev-w64-all-8.15.2.zip"
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(zip_file_path, "wb") as f:
f.write(response.raw.read())
# Extract the zip file
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(vipsbin)
else:
print("Error downloading the file.")
# Install pyvips
subprocess.check_call([sys.executable, "-m", "pip", "install", "pyvips"])
# Add vipsbin to the DLL search path or PATH environment variable
add_dll_dir = getattr(os, "add_dll_directory", None)
os.environ["PATH"] = os.pathsep.join((vipsbin, os.environ["PATH"]))
import argparse
import pathlib
import pickle
import time
from builtins import range
from itertools import combinations
from multiprocessing import Pool
from typing import TYPE_CHECKING
import anndata
import concave_hull
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import panel as pn
import scipy.stats as st
import skimage
import skimage.color
import skimage.exposure
import skimage.filters.rank
import skimage.io as io
import skimage.morphology
import skimage.transform
import statsmodels.api as sm
import tissuumaps.jupyter as tj
import torch
from concave_hull import concave_hull_indexes
from joblib import Parallel, delayed
from pyFlowSOM import map_data_to_nodes, som
from scipy import stats
from scipy.spatial import Delaunay, KDTree, distance
from scipy.spatial.distance import cdist
from scipy.stats import pearsonr, spearmanr
from skimage.io import imsave
from skimage.segmentation import find_boundaries
from sklearn.cluster import HDBSCAN, MiniBatchKMeans
from sklearn.cross_decomposition import CCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, f1_score, pairwise_distances
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from sklearn.svm import SVC
from tqdm import tqdm
from yellowbrick.cluster import KElbowVisualizer
if TYPE_CHECKING:
from anndata import AnnData
from ..helperfunctions._general import *
from ..plotting._general import catplot
try:
from torch_geometric.data import ClusterData, ClusterLoader, Data, InMemoryDataset
except ImportError:
pass
try:
import cupy as cp
import rapids_singlecell as rsc
from cupyx.scipy.sparse import csc_matrix as csc_matrix_gpu
from cupyx.scipy.sparse import csr_matrix as csr_matrix_gpu
from cupyx.scipy.sparse import isspmatrix_csc as isspmatrix_csc_gpu
from cupyx.scipy.sparse import isspmatrix_csr as isspmatrix_csr_gpu
from scanpy.get import _get_obs_rep, _set_obs_rep
from scipy.sparse import isspmatrix_csc as isspmatrix_csc_cpu
from scipy.sparse import isspmatrix_csr as isspmatrix_csr_cpu
except ImportError:
pass
# Tools
############################################################
def tl_calculate_neigh_combs(w, l, n_num, threshold=0.85, per_keep_thres=0.85):
"""
Calculate neighborhood combinations based on a threshold.
Parameters
----------
w : DataFrame
DataFrame containing the data.
l : list
List of column names to be used.
n_num : int
Number of neighborhoods or k chosen for the neighborhoods.
threshold : float, optional
Threshold for neighborhood combinations, by default 0.85.
per_keep_thres : float, optional
Percent to keep threshold or percent of neighborhoods that fall above a certain threshold, by default 0.85.
Returns
-------
tuple
A tuple containing:
- simps: Series of neighborhood combinations.
- simp_freqs: Series of frequency counts of the combinations.
- simp_sums: Series of cumulative sums of the frequency counts.
"""
w.loc[:, l]
# need to normalize by number of neighborhoods or k chosen for the neighborhoods
xm = w.loc[:, l].values / n_num
# Get the neighborhood combinations based on the threshold
simps = hf_get_thresh_simps(xm, threshold)
simp_freqs = simps.value_counts(normalize=True)
simp_sums = np.cumsum(simp_freqs)
# See the percent to keep threshold or percent of neigbhorhoods that fall above a certain threshold
test_sums_thres = simp_sums[simp_sums < per_keep_thres]
test_len = len(test_sums_thres)
per_values_above = simp_sums[test_len] - simp_sums[test_len - 1]
print(test_len, per_values_above)
w["combination"] = [tuple(l[a] for a in s) for s in simps]
w["combination_num"] = [tuple(a for a in s) for s in simps]
# this shows what proportion (y) of the total cells are assigned to the top x combinations
plt.figure(figsize=(7, 3))
plt.plot(simp_sums.values)
plt.title(
"proportion (y) of the total cells are assigned to the top x combinations"
)
plt.show()
# this shows what proportion (y) of the total cells are assigned to the top x combinations
plt.figure(figsize=(7, 3))
plt.plot(test_sums_thres.values)
plt.title(
"proportion (y) of the total cells are assigned to the top x combinations - thresholded"
)
plt.show()
# plt.xticks(range(0,350,35),range(0,350,35),rotation = 90,fontsize = 10)
return (simps, simp_freqs, simp_sums)
def tl_build_graph_CN_comb_map(simp_freqs, thresh_freq=0.001):
"""
Build a directed graph for the CN combination map.
Parameters
----------
simp_freqs : pandas.Series
A series containing the frequencies of simplices.
thresh_freq : float, optional
The threshold frequency to filter simplices, by default 0.001.
Returns
-------
tuple
A tuple containing:
- g : networkx.DiGraph
The directed graph with edges representing the CN combination map.
- tops : list
A list of the top 20 simplices sorted by frequency.
- e0 : str
The last simplex in the outer loop.
- e1 : str
The last simplex in the inner loop.
"""
g = nx.DiGraph()
# selected_simps = simp_sums[simp_sums<=thresh_cumulative].index.values
selected_simps = simp_freqs[simp_freqs >= thresh_freq].index.values
selected_simps
"""
this builds the graph for the CN combination map
"""
for e0 in selected_simps:
for e1 in selected_simps:
if (set(list(e0)) < set(list(e1))) and (len(e1) == len(e0) + 1):
g.add_edge(e0, e1)
tops = (
simp_freqs[simp_freqs >= thresh_freq]
.sort_values(ascending=False)
.index.values.tolist()[:20]
)
return (g, tops, e0, e1)
[docs]
def clustering(
adata,
clustering="leiden",
marker_list=None,
resolution=1,
n_neighbors=10,
reclustering=False,
key_added=None,
key_filter=None,
subset_cluster=None,
seed=42,
fs_xdim=10,
fs_ydim=10,
fs_rlen=10, # FlowSOM parameters
**cluster_kwargs,
):
"""
Perform clustering on the given annotated data matrix.
Parameters
----------
adata : AnnData
The annotated data matrix of shape n_obs x n_vars. Rows correspond
to cells and columns to stained markers.
clustering : str, optional
The clustering algorithm to use. Options are "leiden" or "louvain". Defaults to "leiden".
marker_list : list, optional
A list of markers for clustering. Defaults to None.
resolution : int, optional
The resolution for the clustering algorithm. Defaults to 1.
n_neighbors : int, optional
The number of neighbors to use for the neighbors graph. Defaults to 10.
reclustering : bool, optional
Whether to recluster the data. Defaults to False.
key_added : str, optional
The key name to add to the adata object. Defaults to None.
key_filter : str, optional
The key name to filter the adata object. Defaults to None.
subset_cluster : list, optional
The list of clusters to subset. Defaults to None.
seed : int, optional
Seed for random state. Default is 42.
fs_xdim : int, optional
X dimension for FlowSOM. Default is 10.
fs_ydim : int, optional
Y dimension for FlowSOM. Default is 10.
fs_rlen : int, optional
Rlen for FlowSOM. Default is 10.
**cluster_kwargs : dict
Additional keyword arguments for the clustering function.
Returns
-------
AnnData
The annotated data matrix with the clustering results added.
"""
if clustering not in ["leiden", "louvain", "leiden_gpu", "flowSOM"]:
print(
"Invalid clustering options. Please select from leiden, louvain, leiden_gpu or flowSOM!"
)
print("For GPU accelerated leiden clustering, please use leiden_gpu")
sys.exit()
# test if rapids_singlecell is available
if clustering == "leiden_gpu":
try:
import cudf
import cuml
import cupy
import rapids_singlecell as rsc
except ImportError:
print("Please install rapids_singlecell to use leiden_gpu!")
print("install_gpu_leiden(CUDA = your cuda version as string)")
print("For example: sp.tl.install_gpu_leiden(CUDA = '12')")
print("THIS FUNCTION DOES NOT WORK ON MacOS OR WINDOWS")
print("using leiden instead of leiden_gpu")
clustering = "leiden"
if key_added is None:
key_added = clustering + "_" + str(resolution)
if key_filter is not None:
if subset_cluster is None:
print("Please provide subset_cluster!")
sys.exit()
else:
adata_tmp = adata
adata = adata[adata.obs[key_filter].isin(subset_cluster)]
# input a list of markers for clustering
# reconstruct the anndata
if marker_list is not None:
if len(list(set(marker_list) - set(adata.var_names))) > 0:
print("Marker list not all in adata var_names! Using intersection instead!")
marker_list = list(set(marker_list) & set(adata.var_names))
print("New marker_list: " + " ".join(marker_list))
if key_filter is None:
adata_tmp = adata
adata = adata[:, marker_list]
# Compute the neighborhood relations of single cells the range 2 to 100 and usually 10
if reclustering:
if clustering == "leiden_gpu":
print("Clustering on GPU")
anndata_to_GPU(adata) # moves `.X` to the GPU
rsc.tl.leiden(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
anndata_to_CPU(adata) # moves `.X` to the CPU
else:
print("Clustering")
if clustering == "leiden":
sc.tl.leiden(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
else:
if clustering == "louvain":
print("Louvain clustering")
sc.tl.louvain(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
else:
print("FlowSOM clustering")
adata_df = pd.DataFrame(
adata.X, index=adata.obs.index, columns=adata.var.index
)
# df to numpy array
som_input_arr = adata_df.to_numpy()
# train the SOM
node_output = som(
som_input_arr,
xdim=fs_xdim,
ydim=fs_ydim,
rlen=fs_rlen,
seed=seed,
)
# use trained SOM to assign clusters to each observation in your data
clusters, dists = map_data_to_nodes(node_output, som_input_arr)
clusters = pd.Categorical(clusters)
# add cluster to adata
adata.obs[key_added] = clusters
else:
if clustering == "leiden_gpu":
anndata_to_GPU(adata) # moves `.X` to the GPU
print("Computing neighbors and UMAP on GPU")
rsc.pp.neighbors(adata, n_neighbors=n_neighbors)
# UMAP computation
rsc.tl.umap(adata)
print("Clustering on GPU")
# Perform leiden clustering - improved version of louvain clustering
rsc.tl.leiden(
adata, resolution=resolution, key_added=key_added, random_state=seed
)
anndata_to_CPU(adata) # moves `.X` to the CPU
else:
print("Computing neighbors and UMAP")
print("- neighbors")
sc.pp.neighbors(adata, n_neighbors=n_neighbors)
# UMAP computation
print("- UMAP")
sc.tl.umap(adata)
print("Clustering")
# Perform leiden clustering - improved version of louvain clustering
if clustering == "leiden":
print("Leiden clustering")
sc.tl.leiden(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
else:
if clustering == "louvain":
print("Louvain clustering")
sc.tl.louvain(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
else:
print("FlowSOM clustering")
adata_df = pd.DataFrame(
adata.X, index=adata.obs.index, columns=adata.var.index
)
# df to numpy array
som_input_arr = adata_df.to_numpy()
# train the SOM
node_output = som(
som_input_arr,
xdim=fs_xdim,
ydim=fs_ydim,
rlen=fs_rlen,
seed=seed,
)
# use trained SOM to assign clusters to each observation in your data
clusters, dists = map_data_to_nodes(node_output, som_input_arr)
# make clusters a string
clusters = clusters.astype(str)
clusters = pd.Categorical(clusters)
# add cluster to adata
adata.obs[key_added] = clusters
if key_filter is None:
if marker_list is None:
return adata
else:
adata_tmp.obs[key_added] = adata.obs[key_added].values
# append other data
adata_tmp.obsm = adata.obsm
adata_tmp.obsp = adata.obsp
adata_tmp.uns = adata.uns
if key_filter is not None:
original_df = adata_tmp.obs
donor_df = adata.obs
donor_df_cols = donor_df.loc[:, donor_df.columns != key_added].columns.tolist()
# Perform the merge operation
merged_df = pd.merge(
original_df,
donor_df,
left_on=donor_df_cols,
right_on=donor_df_cols,
how="left",
)
# Fill NA/NaN values in 'key_added' using the values from 'key_filter'
merged_df[key_filter] = merged_df[key_filter].astype(str)
merged_df[key_added] = merged_df[key_added].astype(str)
merged_df.replace("nan", np.nan, inplace=True)
merged_df[key_added].fillna(merged_df[key_filter], inplace=True)
merged_df[key_filter] = merged_df[key_filter].astype("category")
merged_df[key_added] = merged_df[key_added].astype("category")
merged_df.index = merged_df.index.astype(str)
# assign df as obs for adata_tmp
adata_tmp.obs = merged_df
return adata_tmp
[docs]
def neighborhood_analysis(
adata,
unique_region,
cluster_col,
X="x",
Y="y",
k=35,
n_neighborhoods=30,
elbow=False,
metric="distortion",
):
"""
Compute for Cellular neighborhoods (CNs).
Parameters
----------
adata : AnnData
Annotated data matrix.
unique_region : str
Each region is one independent CODEX image.
cluster_col : str
Columns to compute CNs on, typically 'celltype'.
X : str, optional
X coordinate column name, by default "x".
Y : str, optional
Y coordinate column name, by default "y".
k : int, optional
Number of neighbors to compute, by default 35.
n_neighborhoods : int, optional
Number of neighborhoods one ends up with, by default 30.
elbow : bool, optional
Whether to test for optimal number of clusters and visualize as elbow plot or not, by default False. If set to True, the function will test 1 to n_neighborhoods and plot the distortion score in an elbow plot to assist the user in finding the optimal number of clusters.
metric : str, optional
The metric to use when calculating distance between instances in a feature array, by default "distortion". Other options include "silhouette" and "calinski_harabasz".
Returns
-------
AnnData
Annotated data matrix with updated neighborhood information.
Notes
-----
The function performs the following steps:
1. Extracts relevant columns from the input AnnData object.
2. Computes dummy variables for the cluster column.
3. Groups data by the unique region and computes neighborhoods.
4. Optionally performs k-means clustering and visualizes the elbow plot if `elbow` is set to True.
5. Updates the input AnnData object with neighborhood labels and centroids.
"""
df = pd.DataFrame(adata.obs[[X, Y, cluster_col, unique_region]])
cells = pd.concat([df, pd.get_dummies(df[cluster_col])], axis=1)
sum_cols = cells[cluster_col].unique()
values = cells[sum_cols].values
neighborhood_name = "CN" + "_k" + str(k) + "_n" + str(n_neighborhoods)
centroids_name = "Centroid" + "_k" + str(k) + "_n" + str(n_neighborhoods)
n_neighbors = k
cells[unique_region] = cells[unique_region].astype("str")
cells["cellid"] = cells.index.values
cells.reset_index(inplace=True)
keep_cols = [X, Y, unique_region, cluster_col]
# Get each region
tissue_group = cells[[X, Y, unique_region]].groupby(unique_region)
exps = list(cells[unique_region].unique())
tissue_chunks = [
(time.time(), exps.index(t), t, a)
for t, indices in tissue_group.groups.items()
for a in np.array_split(indices, 1)
]
tissues = [
hf_get_windows(job, n_neighbors, exps=exps, tissue_group=tissue_group, X=X, Y=Y)
for job in tissue_chunks
]
# Loop over k to compute neighborhoods
out_dict = {}
for neighbors, job in zip(tissues, tissue_chunks):
chunk = np.arange(len(neighbors)) # indices
tissue_name = job[2]
indices = job[3]
window = (
values[neighbors[chunk, :k].flatten()]
.reshape(len(chunk), k, len(sum_cols))
.sum(axis=1)
)
out_dict[(tissue_name, k)] = (window.astype(np.float16), indices)
windows = {}
window = pd.concat(
[
pd.DataFrame(
out_dict[(exp, k)][0],
index=out_dict[(exp, k)][1].astype(int),
columns=sum_cols,
)
for exp in exps
],
axis=0,
)
window = window.loc[cells.index.values]
window = pd.concat([cells[keep_cols], window], axis=1)
windows[k] = window
# Fill in based on above
k_centroids = {}
# producing what to plot
windows2 = windows[k]
windows2[cluster_col] = cells[cluster_col]
if elbow != True:
km = MiniBatchKMeans(n_clusters=n_neighborhoods, random_state=0)
labels = km.fit_predict(windows2[sum_cols].values)
k_centroids[str(k)] = km.cluster_centers_
adata.obs[neighborhood_name] = labels
adata.uns[centroids_name] = k_centroids
else:
km = MiniBatchKMeans(random_state=0)
X = windows2[sum_cols].values
labels = km.fit_predict(X)
k_centroids[str(k)] = km.cluster_centers_
adata.obs[neighborhood_name] = labels
adata.uns[centroids_name] = k_centroids
visualizer = KElbowVisualizer(
km, k=(n_neighborhoods), timings=False, metric=metric
)
visualizer.fit(X) # Fit the data to the visualizer
visualizer.show() # Finalize and render the figure
return adata
def build_cn_map(
adata,
cn_col,
unique_region,
palette=None,
k=75,
X="x",
Y="y",
threshold=0.85,
per_keep_thres=0.85,
sub_list=None,
sub_col=None,
rand_seed=1,
):
"""
Generate a cellular neighborhood (CN) map.
Parameters
----------
adata : AnnData
Annotated data matrix.
cn_col : str
Column name for cellular neighborhood.
unique_region : str
Unique region identifier.
palette : dict, optional
Color palette for the CN map, by default None.
k : int, optional
Number of neighbors to compute, by default 75.
X : str, optional
X coordinate column name, by default "x".
Y : str, optional
Y coordinate column name, by default "y".
threshold : float, optional
Threshold for neighborhood computation, by default 0.85.
per_keep_thres : float, optional
Threshold for keeping percentage, by default 0.85.
sub_list : list, optional
List of sub regions, by default None.
sub_col : str, optional
Column name for sub regions, by default None.
rand_seed : int, optional
Random seed for color generation, by default 1.
Returns
-------
dict
Dictionary containing the graph, top nodes, edges and simplicial frequencies.
"""
ks = [k]
cells_df = pd.DataFrame(adata.obs)
cells_df = cells_df[[X, Y, unique_region, cn_col]]
cells_df.reset_index(inplace=True)
sum_cols = cells_df[cn_col].unique()
keep_cols = cells_df.columns
cn_colors = hf_generate_random_colors(
len(adata.obs[cn_col].unique()), rand_seed=rand_seed
)
if palette is None:
if cn_col + "_colors" not in adata.uns.keys():
palette = dict(zip(np.sort(adata.obs[cn_col].unique()), cn_colors))
adata.uns[cn_col + "_colors"] = cn_colors
else:
palette = dict(
zip(np.sort(adata.obs[cn_col].unique()), adata.uns[cn_col + "_colors"])
)
Neigh = Neighborhoods(
cells_df,
ks,
cn_col,
sum_cols,
keep_cols,
X,
Y,
reg=unique_region,
add_dummies=True,
)
windows = Neigh.k_windows()
w = windows[k]
if sub_list:
# convert sub_list to list if only str is provided
if isinstance(sub_list, str):
sub_list = [sub_list]
w = w[w[sub_col].isin(sub_list)]
l = list(palette.keys())
simps, simp_freqs, simp_sums = tl_calculate_neigh_combs(
w, l, k, threshold=threshold, per_keep_thres=per_keep_thres # color palette
)
g, tops, e0, e1 = tl_build_graph_CN_comb_map(simp_freqs)
return {
"g": g,
"tops": tops,
"e0": e0,
"e1": e1,
"simp_freqs": simp_freqs,
"w": w,
"l": l,
"k": k,
"threshold": threshold,
}
def tl_format_for_squidpy(adata, x_col, y_col):
"""
Format an AnnData object for use with Squidpy.
Parameters
----------
adata : AnnData
Annotated data matrix.
x_col : str
Column name for x spatial coordinates.
y_col : str
Column name for y spatial coordinates.
Returns
-------
AnnData
Annotated data matrix formatted for Squidpy, with spatial data in the 'obsm' attribute.
"""
# Validate input types
if not isinstance(adata, ad.AnnData):
raise TypeError("adata must be an AnnData object")
if not isinstance(x_col, str) or not isinstance(y_col, str):
raise TypeError("x_col and y_col must be strings")
# Check if the columns exist in the 'obs' metadata
if x_col not in adata.obs.columns or y_col not in adata.obs.columns:
raise ValueError(f"Columns {x_col} and/or {y_col} not found in adata.obs")
# Extract the count data from your original AnnData object
counts = adata.X
# Extract the spatial coordinates from the 'obs' metadata
spatial_coordinates = adata.obs[[x_col, y_col]].values
# Ensure spatial coordinates are numeric
if not np.issubdtype(spatial_coordinates.dtype, np.number):
raise ValueError("Spatial coordinates must be numeric")
# Create a new AnnData object with the expected format
new_adata = ad.AnnData(counts, obsm={"spatial": spatial_coordinates})
return new_adata
def calculate_triangulation_distances(df_input, id, x_pos, y_pos, cell_type, region):
"""
Calculate distances between cells using Delaunay triangulation.
Parameters
----------
df_input : pandas.DataFrame
Input dataframe containing cell information.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type annotations.
region : str
Column name for region.
Returns
-------
pandas.DataFrame
Annotated result dataframe with calculated distances and additional information.
"""
# Perform Delaunay triangulation
points = df_input[[x_pos, y_pos]].values
tri = Delaunay(points)
indices = tri.simplices
# Get interactions going both directions
edges = set()
for simplex in indices:
for i in range(3):
for j in range(i + 1, 3):
edges.add(tuple(sorted([simplex[i], simplex[j]])))
edges = np.array(list(edges))
# Create dataframe from edges
rdelaun_result = pd.DataFrame(edges, columns=["ind1", "ind2"])
rdelaun_result[["x1", "y1"]] = df_input.iloc[rdelaun_result["ind1"]][
[x_pos, y_pos]
].values
rdelaun_result[["x2", "y2"]] = df_input.iloc[rdelaun_result["ind2"]][
[x_pos, y_pos]
].values
# Annotate results with cell type and region information
df_input["XYcellID"] = (
df_input[x_pos].astype(str) + "_" + df_input[y_pos].astype(str)
)
rdelaun_result["cell1ID"] = (
rdelaun_result["x1"].astype(str) + "_" + rdelaun_result["y1"].astype(str)
)
rdelaun_result["cell2ID"] = (
rdelaun_result["x2"].astype(str) + "_" + rdelaun_result["y2"].astype(str)
)
annotated_result = pd.merge(
rdelaun_result, df_input, left_on="cell1ID", right_on="XYcellID"
)
annotated_result = annotated_result.rename(
columns={cell_type: "celltype1", id: "celltype1_index"}
)
annotated_result = annotated_result.drop(columns=[x_pos, y_pos, region, "XYcellID"])
annotated_result = pd.merge(
annotated_result,
df_input,
left_on="cell2ID",
right_on="XYcellID",
suffixes=(".x", ".y"),
)
annotated_result = annotated_result.rename(
columns={cell_type: "celltype2", id: "celltype2_index"}
)
annotated_result = annotated_result.drop(columns=[x_pos, y_pos, "XYcellID"])
# Calculate distance
annotated_result["distance"] = np.sqrt(
(annotated_result["x2"] - annotated_result["x1"]) ** 2
+ (annotated_result["y2"] - annotated_result["y1"]) ** 2
)
# Ensure symmetry by adding reversed pairs
reversed_pairs = annotated_result.copy()
reversed_pairs = reversed_pairs.rename(
columns={
"celltype1_index": "celltype2_index",
"celltype1": "celltype2",
"celltype1_X": "celltype2_X",
"celltype1_Y": "celltype2_Y",
"celltype2_index": "celltype1_index",
"celltype2": "celltype1",
"celltype2_X": "celltype1_X",
"celltype2_Y": "celltype1_Y",
}
)
annotated_result = pd.concat([annotated_result, reversed_pairs])
# Reorder columns
annotated_result = annotated_result[
[
region,
"celltype1_index",
"celltype1",
"x1",
"y1",
"celltype2_index",
"celltype2",
"x2",
"y2",
"distance",
]
]
annotated_result.columns = [
region,
"celltype1_index",
"celltype1",
"celltype1_X",
"celltype1_Y",
"celltype2_index",
"celltype2",
"celltype2_X",
"celltype2_Y",
"distance",
]
return annotated_result
# Define the process_region function at the top level
def process_region(df, unique_region, id, x_pos, y_pos, cell_type, region):
"""
Process a specific region of a dataframe, calculating triangulation distances.
Parameters
----------
df : pandas.DataFrame
Input dataframe containing cell information.
unique_region : str
Unique region identifier.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
Returns
-------
pandas.DataFrame
Result dataframe with calculated distances and additional information for the specified region.
"""
subset = df[df[region] == unique_region].copy()
subset["uniqueID"] = (
subset[id].astype(str)
+ "-"
+ subset[x_pos].astype(str)
+ "-"
+ subset[y_pos].astype(str)
)
subset["XYcellID"] = subset[x_pos].astype(str) + "_" + subset[y_pos].astype(str)
result = calculate_triangulation_distances(
df_input=subset,
id=id,
x_pos=x_pos,
y_pos=y_pos,
cell_type=cell_type,
region=region,
)
return result
def get_triangulation_distances(
df_input, id, x_pos, y_pos, cell_type, region, num_cores=None, correct_dtype=True
):
"""
Calculate triangulation distances for each unique region in the input dataframe.
Parameters
----------
df_input : pandas.DataFrame
Input dataframe containing cell information.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
num_cores : int, optional
Number of cores to use for parallel processing. If None, defaults to half of available cores.
correct_dtype : bool, optional
If True, corrects the data type of the cell_type and region columns to string.
Returns
-------
pandas.DataFrame
Result dataframe with calculated distances and additional information for each unique region.
"""
if correct_dtype == True:
# change columns to pandas string
df_input[cell_type] = df_input[cell_type].astype(str)
df_input[region] = df_input[region].astype(str)
# Check if x_pos and y_pos are integers, and if not, convert them
if not issubclass(df_input[x_pos].dtype.type, np.integer):
print("This function expects integer values for xy coordinates.")
print(
x_pos
+ " and "
+ y_pos
+ " will be changed to integer. Please check the generated output!"
)
df_input[x_pos] = df_input[x_pos].astype(int).values
df_input[y_pos] = df_input[y_pos].astype(int).values
# Get unique regions
unique_regions = df_input[region].unique()
# Select only necessary columns
df_input = df_input.loc[:, [id, x_pos, y_pos, cell_type, region]]
# Set up parallelization
if num_cores is None:
num_cores = os.cpu_count() // 2 # default to using half of available cores
# Parallel processing using joblib
results = Parallel(n_jobs=num_cores)(
delayed(process_region)(df_input, reg, id, x_pos, y_pos, cell_type, region)
for reg in unique_regions
)
triangulation_distances = pd.concat(results)
return triangulation_distances
def shuffle_annotations(df_input, cell_type, region, permutation):
"""
Shuffle annotations within each unique region in the input dataframe.
Parameters
----------
df_input : pandas.DataFrame
Input dataframe containing cell information.
cell_type : str
Column name for cell type annotations.
region : str
Column name for region.
permutation : int
Seed for the random number generator.
Returns
-------
pandas.DataFrame
Result dataframe with shuffled annotations for each unique region.
"""
# Set the seed for reproducibility
np.random.seed(permutation + 1234)
# Create a copy to avoid modifying the original dataframe
df_shuffled = df_input.copy()
# Shuffle annotations within each region
for region_name in df_shuffled[region].unique():
region_mask = df_shuffled[region] == region_name
shuffled_values = df_shuffled.loc[region_mask, cell_type].sample(frac=1).values
df_shuffled.loc[region_mask, "random_annotations"] = shuffled_values
return df_shuffled
def tl_iterate_tri_distances(
df_input, id, x_pos, y_pos, cell_type, region, num_cores=None, num_iterations=1000
):
"""
Iterate over triangulation distances for each unique region in the input dataframe.
Parameters
----------
df_input : pandas.DataFrame
Input dataframe containing cell information.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
num_cores : int, optional
Number of cores to use for parallel processing. If None, defaults to half of available cores.
num_iterations : int, optional
Number of iterations to perform. Defaults to 1000.
Returns
-------
pandas.DataFrame
Result dataframe with iterative triangulation distances for each unique region.
"""
unique_regions = df_input[region].unique()
# Use only the necessary columns
df_input = df_input[[id, x_pos, y_pos, cell_type, region]]
if num_cores is None:
num_cores = os.cpu_count() // 2 # Default to using half of available cores
# Define a helper function to process each region and iteration
def process_iteration(region_name, iteration):
# Filter by region
subset = df_input[df_input[region] == region_name].copy()
# Create unique IDs
subset.loc[:, "uniqueID"] = (
subset[id].astype(str)
+ "-"
+ subset[x_pos].astype(str)
+ "-"
+ subset[y_pos].astype(str)
)
subset.loc[:, "XYcellID"] = (
subset[x_pos].astype(str) + "_" + subset[y_pos].astype(str)
)
# Shuffle annotations
shuffled = shuffle_annotations(subset, cell_type, region, iteration)
# Get triangulation distances
results = get_triangulation_distances(
df_input=shuffled,
id=id,
x_pos=x_pos,
y_pos=y_pos,
cell_type="random_annotations",
region=region,
num_cores=num_cores,
correct_dtype=False,
)
# Summarize results
per_cell_summary = (
results.groupby(["celltype1_index", "celltype1", "celltype2"])
.distance.mean()
.reset_index(name="per_cell_mean_dist")
)
per_celltype_summary = (
per_cell_summary.groupby(["celltype1", "celltype2"])
.per_cell_mean_dist.mean()
.reset_index(name="mean_dist")
)
per_celltype_summary[region] = region_name
per_celltype_summary["iteration"] = iteration
return per_celltype_summary
# TODO: remove nans valid here a good idea (attempt to fix windows unpickle issue)?
unique_regions = [r for r in unique_regions if r != np.nan]
# Parallel processing for each region and iteration
results = Parallel(n_jobs=num_cores)(
delayed(process_iteration)(region_name, iteration)
for region_name in unique_regions
for iteration in range(1, num_iterations + 1)
)
# Combine all results
iterative_triangulation_distances = pd.concat(results, ignore_index=True)
# iterative_triangulation_distances = iterative_triangulation_distances.dropna()
return iterative_triangulation_distances
# def tl_iterate_tri_distances_ad(
# adata,
# id,
# x_pos,
# y_pos,
# cell_type,
# region,
# num_cores=None,
# num_iterations=1000,
# key_name=None,
# correct_dtype=True,
# ):
# """
# Iterate over triangulation distances for each unique region in the input AnnData object
# Parameters
# ----------
# adata : anndata.AnnData
# Annotated data matrix.
# id : str
# Column name for cell id.
# x_pos : str
# Column name for x position of cells.
# y_pos : str
# Column name for y position of cells.
# cell_type : str
# Column name for cell type.
# region : str
# Column name for region.
# num_cores : int, optional
# Number of cores to use for parallel processing. If None, defaults to half of available cores.
# num_iterations : int, optional
# Number of iterations to perform. Defaults to 1000.
# key_name : str, optional
# Key name to use when saving the result to the AnnData object. If None, defaults to "iTriDist_" + str(num_iterations).
# correct_dtype : bool, optional
# If True, corrects the data type of the cell type and region columns to string. Defaults to True.
# Returns
# -------
# pandas.DataFrame
# Result dataframe with iterative triangulation distances for each unique region.
# """
# df_input = pd.DataFrame(adata.obs)
# df_input[id] = df_input.index
# if correct_dtype == True:
# # change columns to pandas string
# df_input[cell_type] = df_input[cell_type].astype(str)
# df_input[region] = df_input[region].astype(str)
# # Check if x_pos and y_pos are integers, and if not, convert them
# if not issubclass(df_input[x_pos].dtype.type, np.integer):
# print("This function expects integer values for xy coordinates.")
# print("Class will be changed to integer. Please check the generated output!")
# df_input[x_pos] = df_input[x_pos].astype(int).values
# df_input[y_pos] = df_input[y_pos].astype(int).values
# unique_regions = df_input[region].unique()
# # Use only the necessary columns
# df_input = df_input.loc[:, [id, x_pos, y_pos, cell_type, region]]
# if num_cores is None:
# num_cores = os.cpu_count() // 2 # Default to using half of available cores
# Define a helper function to process each region and iteration
# def process_iteration(region_name, iteration):
# # Filter by region
# subset = df_input.loc[df_input[region] == region_name, :].copy()
# subset.loc[:, "uniqueID"] = (
# subset[id].astype(str)
# + "-"
# + subset[x_pos].astype(str)
# + "-"
# + subset[y_pos].astype(str)
# )
# subset.loc[:, "XYcellID"] = (
# subset[x_pos].astype(str) + "_" + subset[y_pos].astype(str)
# )
# Shuffle annotations
# shuffled = shuffle_annotations(subset, cell_type, region, iteration)
# Get triangulation distances
# results = get_triangulation_distances(
# df_input=shuffled,
# id=id,
# x_pos=x_pos,
# y_pos=y_pos,
# cell_type="random_annotations",
# region=region,
# num_cores=num_cores,
# correct_dtype=False,
# )
# # Summarize results
# per_cell_summary = (
# results.groupby(["celltype1_index", "celltype1", "celltype2"])
# .distance.mean()
# .reset_index(name="per_cell_mean_dist")
# )
# # Ensure symmetry by aggregating distances in both directions
# per_cell_summary_reversed = per_cell_summary.rename(
# columns={"celltype1": "celltype2", "celltype2": "celltype1"}
# )
# per_cell_summary_combined = pd.concat([per_cell_summary, per_cell_summary_reversed])#
# per_celltype_summary = (
# per_cell_summary_combined.groupby(["celltype1", "celltype2"])
# .per_cell_mean_dist.mean()
# .reset_index(name="mean_dist")
# )
# per_celltype_summary[region] = region_name
# per_celltype_summary["iteration"] = iteration#
# return per_celltype_summary
# Parallel processing for each region and iteration
# results = Parallel(n_jobs=num_cores)(
# delayed(process_iteration)(region_name, iteration)
# for region_name in unique_regions
# for iteration in range(1, num_iterations + 1)
# )
# Combine all results
# iterative_triangulation_distances = pd.concat(results, ignore_index=True)
# append result to adata
# if key_name is None:
# key_name = "iTriDist_" + str(num_iterations)
# adata.uns[key_name] = iterative_triangulation_distances
# print("Save iterative triangulation distance output to anndata.uns " + key_name)
# return iterative_triangulation_distances
def add_missing_columns(
triangulation_distances, metadata, shared_column="unique_region"
):
"""
Add missing columns from metadata to triangulation_distances dataframe.
Parameters
----------
triangulation_distances : pandas.DataFrame
DataFrame containing triangulation distances.
metadata : pandas.DataFrame
DataFrame containing metadata.
shared_column : str, optional
Column name that is shared between the two dataframes. Defaults to "unique_region".
Returns
-------
pandas.DataFrame
Updated triangulation_distances dataframe with missing columns added.
"""
# Find the difference in columns
missing_columns = set(metadata.columns) - set(triangulation_distances.columns)
# Add missing columns to triangulation_distances with NaN values
for column in missing_columns:
triangulation_distances[column] = pd.NA
# Create a mapping from unique_region to tissue in metadata
region_to_tissue = pd.Series(
metadata[column].values, index=metadata["unique_region"]
).to_dict()
# Apply this mapping to the triangulation_distances dataframe to create/update the tissue column
triangulation_distances[column] = triangulation_distances["unique_region"].map(
region_to_tissue
)
# Handle regions with no corresponding tissue in the metadata by filling in a default value
triangulation_distances[column].fillna("Unknown", inplace=True)
return triangulation_distances
# Calculate p-values and log fold differences
def calculate_pvalue(row):
"""
Calculate the p-value using the Mann-Whitney U test.
Parameters
----------
row : pandas.Series
A row of data containing 'expected' and 'observed' values.
Returns
-------
float
The calculated p-value. Returns np.nan if there is insufficient data to perform the test.
"""
# function body here
try:
return st.mannwhitneyu(
row["expected"], row["observed"], alternative="two-sided"
).pvalue
except ValueError: # This handles cases with insufficient data
return np.nan
[docs]
def identify_interactions(
adata,
cellid,
x_pos,
y_pos,
cell_type,
region,
comparison,
min_observed=10,
distance_threshold=128,
num_cores=None,
num_iterations=1000,
key_name=None,
correct_dtype=False,
aggregate_per_cell=True,
):
"""
Identify interactions between cell types based on their spatial distances.
Parameters
----------
adata : AnnData
Annotated data matrix.
id : str
Identifier for cells.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
comparison : str
Column name for comparison.
iTriDist_keyname : str, optional
Key name for iterative triangulation distances, by default None
triDist_keyname : str, optional
Key name for triangulation distances, by default None
min_observed : int, optional
Minimum number of observed distances, by default 10
distance_threshold : int, optional
Threshold for distance, by default 128
num_cores : int, optional
Number of cores to use for computation, by default None
num_iterations : int, optional
Number of iterations for computation, by default 1000
key_name : str, optional
Key name for output, by default None
correct_dtype : bool, optional
Whether to correct data type or not, by default False
Returns
-------
DataFrame
DataFrame with p-values and logfold changes for interactions.
"""
df_input = pd.DataFrame(adata.obs)
if cellid in df_input.columns:
df_input.index = df_input[cellid]
else:
print(cellid + " is not in the adata.obs, use index as cellid instead!")
df_input[cellid] = df_input.index
# change columns to pandas string
df_input[cell_type] = df_input[cell_type].astype(str)
df_input[region] = df_input[region].astype(str)
print("Computing for observed distances between cell types!")
triangulation_distances = get_triangulation_distances(
df_input=df_input,
id=cellid,
x_pos=x_pos,
y_pos=y_pos,
cell_type=cell_type,
region=region,
num_cores=num_cores,
correct_dtype=correct_dtype,
)
if key_name is None:
triDist_keyname = "triDist"
adata.uns[triDist_keyname] = triangulation_distances
print("Save triangulation distances output to anndata.uns " + triDist_keyname)
print("Permuting data labels to obtain the randomly distributed distances!")
print("this step can take awhile")
iterative_triangulation_distances = tl_iterate_tri_distances(
df_input=df_input,
id=cellid,
x_pos=x_pos,
y_pos=y_pos,
cell_type=cell_type,
region=region,
num_cores=num_cores,
num_iterations=num_iterations,
)
metadata = df_input.loc[:, ["unique_region", comparison]].copy()
# Reformat observed dataset
triangulation_distances_long = add_missing_columns(
triangulation_distances, metadata, shared_column=region
)
if aggregate_per_cell == True:
observed_distances = (
triangulation_distances_long.query("distance <= @distance_threshold")
.groupby(["celltype1_index", "celltype1", "celltype2", comparison, region])
.agg(mean_per_cell=("distance", "mean"))
.reset_index()
.groupby(["celltype1", "celltype2", comparison])
.agg(
observed=("mean_per_cell", list),
observed_mean=("mean_per_cell", "mean"),
)
.reset_index()
)
else:
observed_distances = (
triangulation_distances_long.query("distance <= @distance_threshold")
.groupby(
[
"celltype1_index",
"celltype2_index",
"celltype1",
"celltype2",
comparison,
region,
]
)
.agg(mean_per_cell=("distance", "mean"))
.reset_index()
.groupby(["celltype1", "celltype2", comparison])
.agg(
observed=("mean_per_cell", list),
observed_mean=("mean_per_cell", "mean"),
)
.reset_index()
)
# Reformat expected dataset
iterated_triangulation_distances_long = add_missing_columns(
iterative_triangulation_distances, metadata, shared_column=region
)
expected_distances = (
iterated_triangulation_distances_long.query("mean_dist <= @distance_threshold")
.groupby(["celltype1", "celltype2", comparison])
.agg(expected=("mean_dist", list), expected_mean=("mean_dist", "mean"))
.reset_index()
)
# Drop comparisons with low numbers of observations
observed_distances["keep"] = observed_distances["observed"].apply(
lambda x: len(x) > min_observed
)
observed_distances = observed_distances[observed_distances["keep"]]
expected_distances["keep"] = expected_distances["expected"].apply(
lambda x: len(x) > min_observed
)
expected_distances = expected_distances[expected_distances["keep"]]
# concatenate observed and expected distances
distance_pvals = expected_distances.merge(
observed_distances, on=["celltype1", "celltype2", comparison], how="left"
)
distance_pvals = expected_distances.merge(
observed_distances, on=["celltype1", "celltype2", comparison], how="left"
)
distance_pvals["pvalue"] = distance_pvals.apply(calculate_pvalue, axis=1)
distance_pvals["logfold_group"] = np.log2(
distance_pvals["observed_mean"] / distance_pvals["expected_mean"]
)
distance_pvals["interaction"] = (
distance_pvals["celltype1"] + " --> " + distance_pvals["celltype2"]
)
# drop na from distance_pvals
# distance_pvals = distance_pvals.dropna()
# append result to adata
# create dictionary for the results
triangulation_distances_dict = {
"distance_pvals": distance_pvals,
"triangulation_distances_observed": iterated_triangulation_distances_long,
"triangulation_distances_iterated": triangulation_distances_long,
}
return distance_pvals, triangulation_distances_dict
def adata_cell_percentages(adata, column_percentage="cell_type"):
"""
Calculate the percentage of each cell type in an AnnData object.
Parameters:
adata (AnnData): An AnnData object containing single-cell data.
column_percentage (str): The column name in adata.obs that contains cell type information. Default is 'cell_type'.
Returns:
DataFrame: A pandas DataFrame with two columns: the specified column name and 'percentage', representing the percentage of each cell type.
"""
# Assuming 'adata' is an AnnData object and 'cell_type' is the column with cell type information
cell_type_counts = adata.obs[column_percentage].value_counts()
total_cells = len(adata)
cell_type_percentages = (cell_type_counts / total_cells) * 100
# Convert to DataFrame for better readability
cell_type_percentages_df = pd.DataFrame(
{
column_percentage: cell_type_counts.index,
"percentage": cell_type_percentages.values,
}
)
return cell_type_percentages_df
[docs]
def filter_interactions(
distance_pvals, pvalue=0.05, logfold_group_abs=0.1, comparison="condition"
):
"""
Filters interactions based on p-value, logfold change, and other conditions.
Parameters
----------
distance_pvals : pandas.DataFrame
DataFrame containing p-values, logfold changes, and interactions for each comparison.
pvalue : float, optional
The maximum p-value to consider for significance. Defaults to 0.05.
logfold_group_abs : float, optional
The minimum absolute logfold change to consider for significance. Defaults to 0.1.
comparison : str, optional
The comparison condition to filter by. Defaults to "condition".
Returns
-------
dist_table : pandas.DataFrame
DataFrame containing logfold changes sorted into two columns by the comparison condition.
distance_pvals_sig_sub : pandas.DataFrame
Subset of the original DataFrame containing only significant interactions based on the specified conditions.
"""
# calculate absolute logfold difference
distance_pvals["logfold_group_abs"] = distance_pvals["logfold_group"].abs()
# Creating pairs
distance_pvals["pairs"] = (
distance_pvals["celltype1"] + "_" + distance_pvals["celltype2"]
)
# Filter significant p-values and other specified conditions
distance_pvals_sig = distance_pvals[
(distance_pvals["pvalue"] < pvalue)
& (distance_pvals["celltype1"] != distance_pvals["celltype2"])
& (~distance_pvals["observed_mean"].isna())
& (distance_pvals["logfold_group_abs"] > logfold_group_abs)
]
# Assuming distance_pvals_interesting2 is a pandas DataFrame with the same structure as the R dataframe.
# pair_to = distance_pvals_sig["interaction"].unique()
pairs = distance_pvals_sig["pairs"].unique()
# Filtering data
data = distance_pvals[~distance_pvals["interaction"].isna()]
# Subsetting data
distance_pvals_sig_sub = data[data["pairs"].isin(pairs)]
distance_pvals_sig_sub_reduced = distance_pvals_sig_sub.loc[
:, [comparison, "logfold_group", "pairs"]
].copy()
# set pairs as index
distance_pvals_sig_sub_reduced = distance_pvals_sig_sub_reduced.set_index("pairs")
# sort logfold_group into two columns by tissue
dist_table = distance_pvals_sig_sub_reduced.pivot(
columns=comparison, values="logfold_group"
)
dist_table.dropna(inplace=True)
return dist_table, distance_pvals_sig_sub
[docs]
def remove_rare_cell_types(
adata, distance_pvals, cell_type_column="cell_type", min_cell_type_percentage=1
):
"""
Remove cell types with a percentage lower than the specified threshold from the distance_pvals DataFrame.
Parameters
----------
adata : AnnData
Annotated data matrix.
distance_pvals : DataFrame
DataFrame containing distance p-values with columns 'celltype1' and 'celltype2'.
cell_type_column : str, optional
Column name in adata containing cell type information, by default "cell_type".
min_cell_type_percentage : float, optional
Minimum percentage threshold for cell types to be retained, by default 1.
Returns
-------
DataFrame
Filtered distance_pvals DataFrame with rare cell types removed.
"""
cell_type_percentages_df = adata_cell_percentages(
adata, column_percentage=cell_type_column
)
# Identify cell types with less than the specified percentage of the total cells
rare_cell_types = cell_type_percentages_df[
cell_type_percentages_df["percentage"] < min_cell_type_percentage
][cell_type_column].values
# Print the names of the cell types with less than the specified percentage of the total cells
print(
"Cell types that belong to less than "
+ str(min_cell_type_percentage)
+ "% of total cells:"
)
print(rare_cell_types)
# Remove rows from distance_pvals that contain rare cell types in column celltype1 or celltype2
distance_pvals = distance_pvals[
~distance_pvals["celltype1"].isin(rare_cell_types)
& ~distance_pvals["celltype2"].isin(rare_cell_types)
]
return distance_pvals
# Function for patch identification
## Adjust clustering parameter to get the desired number of clusters
def apply_dbscan_clustering(df, min_cluster_size=10):
"""
Apply DBSCAN clustering to a dataframe and update the cluster labels in the original dataframe.
Parameters
----------
df : pandas.DataFrame
The dataframe to be clustered.
min_cluster_size : int, optional
The number of samples in a neighborhood for a point to be considered as a core point, by default 10
Returns
-------
None
"""
# Initialize a new column for cluster labels
df["cluster"] = -1
# Apply DBSCAN clustering
hdbscan = HDBSCAN(
min_samples=None,
min_cluster_size=min_cluster_size,
cluster_selection_epsilon=0.0,
max_cluster_size=None,
metric="euclidean",
alpha=1.0,
cluster_selection_method="eom",
allow_single_cluster=True,
)
labels = hdbscan.fit_predict(df[["x", "y"]])
# Number of clusters in labels, ignoring noise if present.
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
n_noise_ = list(labels).count(-1)
print("Estimated number of clusters: %d" % n_clusters_)
print("Estimated number of noise points: %d" % n_noise_)
# Update the cluster labels in the original dataframe
df.loc[df.index, "cluster"] = labels
def identify_points_in_proximity(
df,
full_df,
identification_column,
cluster_column="cluster",
x_column="x",
y_column="y",
radius=200,
edge_neighbours=3,
plot=True,
concave_hull_length_threshold=50,
):
"""
Identify points in proximity within clusters and generate result and outline DataFrames.
Parameters
----------
df : pandas.DataFrame
DataFrame containing the points to be processed.
full_df : pandas.DataFrame
Full DataFrame containing all points.
identification_column : str
Column name used for identification.
cluster_column : str, optional
Column name for cluster labels, by default "cluster".
x_column : str, optional
Column name for x-coordinates, by default "x".
y_column : str, optional
Column name for y-coordinates, by default "y".
radius : int, optional
Radius for proximity search, by default 200.
edge_neighbours : int, optional
Number of edge neighbours, by default 3.
plot : bool, optional
Whether to plot the results, by default True.
concave_hull_length_threshold : int, optional
Threshold for concave hull length, by default 50.
Returns
-------
result : pandas.DataFrame
DataFrame containing the result points.
outlines : pandas.DataFrame
DataFrame containing the outline points.
"""
nbrs, unique_clusters = precompute(
df, x_column, y_column, full_df, identification_column, edge_neighbours
)
num_processes = max(
1, os.cpu_count() - 1
) # Use all available CPUs minus 2, but at least 1
with Pool(processes=num_processes) as pool:
results = pool.starmap(
process_cluster,
[
(
(
df,
cluster,
cluster_column,
x_column,
y_column,
concave_hull_length_threshold,
edge_neighbours,
full_df,
radius,
plot,
identification_column,
),
nbrs,
unique_clusters,
)
for cluster in set(df[cluster_column]) - {-1}
],
)
# Unpack the results
result_list, outline_list = zip(*results)
# Concatenate the list of DataFrames into a single result DataFrame
if len(result_list) > 0:
result = pd.concat(result_list)
else:
result = pd.DataFrame(columns=["x", "y", "patch_id", identification_column])
if len(outline_list) > 0:
outlines = pd.concat(outline_list)
else:
outlines = pd.DataFrame(columns=["x", "y", "patch_id", identification_column])
return result, outlines
# Precompute nearest neighbors model and unique clusters
def precompute(df, x_column, y_column, full_df, identification_column, edge_neighbours):
"""
Precompute nearest neighbors and unique clusters.
Parameters
----------
df : pandas.DataFrame
DataFrame containing the points to be processed.
x_column : str
Column name for x-coordinates.
y_column : str
Column name for y-coordinates.
full_df : pandas.DataFrame
Full DataFrame containing all points.
identification_column : str
Column name used for identification.
edge_neighbours : int
Number of edge neighbours.
Returns
-------
nbrs : sklearn.neighbors.NearestNeighbors
Fitted NearestNeighbors model.
unique_clusters : numpy.ndarray
Array of unique cluster identifiers.
"""
nbrs = NearestNeighbors(n_neighbors=edge_neighbours).fit(df[[x_column, y_column]])
unique_clusters = full_df[identification_column].unique()
return nbrs, unique_clusters
def process_cluster(args, nbrs, unique_clusters):
(
df,
cluster,
cluster_column,
x_column,
y_column,
concave_hull_length_threshold,
edge_neighbours,
full_df,
radius,
plot,
identification_column,
) = args
"""
Process a single cluster to identify points in proximity and generate hull points.
Parameters
----------
args : tuple
Tuple containing the following elements:
- df : pandas.DataFrame
DataFrame containing the points to be processed.
- cluster : int
Cluster identifier.
- cluster_column : str
Column name for cluster labels.
- x_column : str
Column name for x-coordinates.
- y_column : str
Column name for y-coordinates.
- concave_hull_length_threshold : int
Threshold for concave hull length.
- edge_neighbours : int
Number of edge neighbours.
- full_df : pandas.DataFrame
Full DataFrame containing all points.
- radius : int
Radius for proximity search.
- plot : bool
Whether to plot the results.
- identification_column : str
Column name used for identification.
nbrs : sklearn.neighbors.NearestNeighbors
Fitted NearestNeighbors model.
unique_clusters : numpy.ndarray
Array of unique cluster identifiers.
Returns
-------
prox_points : pandas.DataFrame
DataFrame containing points within the proximity of the cluster.
hull_nearest_neighbors : pandas.DataFrame
DataFrame containing the nearest neighbors of the hull points.
"""
# Filter DataFrame for the current cluster
subset = df.loc[df[cluster_column] == cluster]
points = subset[[x_column, y_column]].values
# Compute concave hull indexes
idxes = concave_hull_indexes(
points[:, :2],
length_threshold=concave_hull_length_threshold,
)
# Get hull points from the DataFrame
hull_points = pd.DataFrame(points[idxes], columns=["x", "y"])
# Find nearest neighbors of hull points in the original DataFrame
distances, indices = nbrs.kneighbors(hull_points[["x", "y"]])
hull_nearest_neighbors = df.iloc[indices.flatten()]
# DataFrame to store points within the circle but from a different cluster
all_in_circle_diff_cluster = []
# Extract hull points coordinates
hull_coords = hull_nearest_neighbors[["x", "y"]].values
# Calculate distances from all points in full_df to all hull points
distances = cdist(full_df[["x", "y"]].values, hull_coords)
# Identify points within the circle for each hull point
in_circle = distances <= radius
# Identify points from a different cluster for each hull point
diff_cluster = (
full_df[identification_column].values[:, np.newaxis]
!= hull_nearest_neighbors[identification_column].values
)
# Combine the conditions
in_circle_diff_cluster = in_circle & diff_cluster
# Collect all points within the circle but from a different cluster
all_in_circle_diff_cluster = full_df[np.any(in_circle_diff_cluster, axis=1)]
# Plot the points with a different shape if plot is True
if plot:
plt.scatter(
all_in_circle_diff_cluster["x"],
all_in_circle_diff_cluster["y"],
facecolors="none",
edgecolors="#DC0000B2",
marker="*",
s=100,
zorder=5,
label="Cell within proximity",
)
# Remove duplicates from the final DataFrame
all_in_circle_diff_cluster = all_in_circle_diff_cluster.drop_duplicates()
# Plot selected points in yellow and draw circles around them if plot is True
if plot:
plt.scatter(
hull_nearest_neighbors["x"],
hull_nearest_neighbors["y"],
color="#3C5488B2",
label="Boarder cells",
s=100,
edgecolor="black",
zorder=6,
)
for _, row in hull_nearest_neighbors.iterrows():
circle = plt.Circle(
(row["x"], row["y"]),
radius,
color="#3C5488B2",
fill=False,
linestyle="--",
alpha=0.5,
)
plt.gca().add_patch(circle)
# Set plot labels and title
plt.xlabel("X")
plt.ylabel("Y")
plt.title(f"Cells within {radius} radius")
plt.grid(False)
plt.axis("equal")
# Place the legend outside the plot
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))
plt.legend(
by_label.values(),
by_label.keys(),
loc="center left",
bbox_to_anchor=(1, 0.5),
)
plt.tight_layout()
# set figure size
plt.gcf().set_size_inches(15, 5)
plt.show()
# Remove duplicates from the final DataFrame
prox_points = all_in_circle_diff_cluster.drop_duplicates()
# Add a 'patch_id' column to identify the cluster
prox_points["patch_id"] = cluster
return prox_points, hull_nearest_neighbors
# This function analyzes what is in proximity of a selected group (CN, Celltype, etc...).
[docs]
def patch_proximity_analysis(
adata,
region_column,
patch_column,
group,
min_cluster_size=80,
x_column="x",
y_column="y",
radius=128,
edge_neighbours=3,
plot=True,
savefig=False,
output_dir="./",
output_fname="",
key_name="ppa_result",
plot_color="#6a3d9a",
):
"""
Performs a proximity analysis on patches of a given group within each region of a dataset.
Parameters:
adata (AnnData): The annotated data matrix of shape n_obs x n_vars. Rows correspond to cells and columns to genes.
region_column (str): The name of the column in the DataFrame that contains the region information.
patch_column (str): The name of the column in the DataFrame that contains the patch information.
group (str): The group to perform the proximity analysis on.
min_cluster_size (int, optional): The minimum number of samples required to form a dense region. Default is 80.
x_column (str, optional): The name of the column in the DataFrame that contains the x-coordinate. Default is 'x'.
y_column (str, optional): The name of the column in the DataFrame that contains the y-coordinate. Default is 'y'.
radius (int, optional): The radius within which to identify points in proximity. Default is 128.
edge_neighbours (int, optional): The number of edge neighbours to consider. Default is 3.
plot (bool, optional): Whether to plot the patches. Default is True.
savefig (bool, optional): Whether to save the figure. Default is False.
output_dir (str, optional): The directory to save the figure in. Default is "./".
output_fname (str, optional): The filename to save the figure as. Default is "".
key_name (str, optional): The key name to store the results in the AnnData object. Default is 'ppa_result'.
Returns:
final_results (DataFrame): A DataFrame containing the results of the proximity analysis.
outlines_results (DataFrame): A DataFrame containing the outlines of the patches.
"""
df = adata.obs
for col in df.select_dtypes(["category"]).columns:
df[col] = df[col].astype(str)
# list to store results for each region
region_results = []
outlines = []
for region in df[region_column].unique():
df_region = df[df[region_column] == region].copy()
df_community = df_region[df_region[patch_column] == group].copy()
if df_community.shape[0] < min_cluster_size:
print(f"No {group} in {region}")
continue
else:
apply_dbscan_clustering(df_community, min_cluster_size=min_cluster_size)
# plot patches
if plot:
df_filtered = df_community[df_community["cluster"] != -1]
fig, ax = plt.subplots(figsize=(10, 10))
ax.scatter(df_filtered["x"], df_filtered["y"], c=plot_color, alpha=0.5)
ax.set_title(f"HDBSCAN Clusters for {region}_{group}")
ax.set_xlabel(x_column)
ax.set_ylabel(y_column)
ax.grid(False)
ax.axis("equal")
if savefig:
fig.savefig(
output_dir
+ output_fname
+ "_"
+ str(region)
+ "_patch_proximity.pdf",
bbox_inches="tight",
)
else:
plt.show()
results, hull_nearest_neighbors = identify_points_in_proximity(
df=df_community,
full_df=df_region,
cluster_column="cluster",
identification_column=patch_column,
x_column=x_column,
y_column=y_column,
radius=radius,
edge_neighbours=edge_neighbours,
plot=plot,
)
# add hull_nearest_neighbors to list
outlines.append(hull_nearest_neighbors)
print(f"Finished {region}_{group}")
# append to region_results
region_results.append(results)
# Concatenate all results into a single DataFrame
final_results = pd.concat(region_results)
outlines_results = pd.concat(outlines)
# generate new column named unique_patch_ID that combines the region, group and patch ID
final_results["unique_patch_ID"] = (
final_results[region_column]
+ "_"
+ final_results[patch_column]
+ "_"
+ "patch_no_"
+ final_results["patch_id"].astype(str)
)
adata.uns[key_name] = final_results
return final_results, outlines_results
def stellar_get_edge_index(
pos, distance_thres, max_memory_usage=1.6e10, chunk_size=1000
):
"""
Constructs edge indexes in one region based on pairwise distances and a distance threshold.
Parameters:
pos (array-like): An array-like object of shape (n_samples, n_features) representing the positions.
distance_thres (float): The distance threshold. Pairs of positions with distances less than this threshold will be considered as edges.
max_memory_usage (float): The maximum memory usage in bytes before switching to chunk processing.
chunk_size (int): The size of the chunks to process at a time.
Returns:
edge_list (list): A list of lists where each inner list contains two indices representing an edge.
"""
n_samples = pos.shape[0]
estimated_memory_usage = (
n_samples * n_samples * 8
) # Estimate memory usage for the distance matrix (float64)
if estimated_memory_usage > max_memory_usage:
print("Processing will be done in chunks to save memory.")
edge_list = []
for i in tqdm(range(0, n_samples, chunk_size), desc="Processing chunks"):
pos_chunk = pos[i : i + chunk_size]
dists_chunk = pairwise_distances(pos_chunk, pos)
dists_mask_chunk = dists_chunk < distance_thres
np.fill_diagonal(dists_mask_chunk[:, i : i + chunk_size], 0)
chunk_edge_list = np.transpose(np.nonzero(dists_mask_chunk)).tolist()
chunk_edge_list = [[i + edge[0], edge[1]] for edge in chunk_edge_list]
edge_list.extend(chunk_edge_list)
else:
dists = pairwise_distances(pos)
dists_mask = dists < distance_thres
np.fill_diagonal(dists_mask, 0)
edge_list = np.transpose(np.nonzero(dists_mask)).tolist()
return edge_list
[docs]
def adata_stellar(
adata_train,
adata_unannotated,
celltype_col="coarse_anno3",
x_col="x",
y_col="y",
sample_rate=0.5,
distance_thres=50,
epochs=50,
key_added="stellar_pred",
STELLAR_path="",
):
"""
Applies the STELLAR algorithm to the given annotated and unannotated data.
Parameters:
adata_train (AnnData): The annotated data.
adata_unannotated (AnnData): The unannotated data.
celltype_col (str, optional): The column name for cell types in the annotated data. Defaults to 'coarse_anno3'.
x_col (str, optional): The column name for x coordinates in the data. Defaults to 'x'.
y_col (str, optional): The column name for y coordinates in the data. Defaults to 'y'.
sample_rate (float, optional): The rate at which to sample the training data. Defaults to 0.5.
distance_thres (int, optional): The distance threshold for constructing edge indexes. Defaults to 50.
key_added (str, optional): The key to be added to the unannotated data's obs dataframe for the predicted results. Defaults to 'stellar_pred'.
Returns:
adata (AnnData): The unannotated data with the added key for the predicted results.
"""
print(
"Please consider to cite the following paper when using STELLAR: Brbić, M., Cao, K., Hickey, J.W. et al. Annotation of spatially resolved single-cell data with STELLAR. Nat Methods 19, 1411–1418 (2022). https://doi.org/10.1038/s41592-022-01651-8"
)
sys.path.append(str(STELLAR_path))
from datasets import GraphDataset
from STELLAR import STELLAR
from utils import prepare_save_dir
parser = argparse.ArgumentParser(description="STELLAR")
parser.add_argument(
"--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
)
parser.add_argument("--name", type=str, default="STELLAR")
parser.add_argument("--epochs", type=int, default=50)
parser.add_argument("--lr", type=float, default=1e-3)
parser.add_argument("--wd", type=float, default=5e-2)
parser.add_argument("--input-dim", type=int, default=26)
parser.add_argument("--num-heads", type=int, default=13)
parser.add_argument("--num-seed-class", type=int, default=3)
parser.add_argument("--sample-rate", type=float, default=0.5)
parser.add_argument(
"-b", "--batch-size", default=1, type=int, metavar="N", help="mini-batch size"
)
parser.add_argument("--distance_thres", default=50, type=int)
parser.add_argument("--savedir", type=str, default="./")
args = parser.parse_args(args=[])
args.cuda = torch.cuda.is_available()
args.device = torch.device("cuda" if args.cuda else "cpu")
args.epochs = 50
# prepare input data
print("Preparing input data")
train_df = adata_train.to_df()
# add to train_df
positions_celltype = adata_train.obs[[x_col, y_col, celltype_col]]
train_df = pd.concat([train_df, positions_celltype], axis=1)
train_df = train_df.sample(n=round(sample_rate * len(train_df)), random_state=1)
train_X = train_df.iloc[:, 0:-3].values
test_X = adata_unannotated.to_df().values
train_y = train_df[celltype_col].str.lower()
train_y
labeled_pos = train_df.iloc[
:, -3:-1
].values # x,y coordinates, indexes depend on specific datasets
unlabeled_pos = adata_unannotated.obs[[x_col, y_col]].values
cell_types = np.sort(list(set(train_y))).tolist()
cell_types
cell_type_dict = {}
inverse_dict = {}
for i, cell_type in enumerate(cell_types):
cell_type_dict[cell_type] = i
inverse_dict[i] = cell_type
train_y = np.array([cell_type_dict[x] for x in train_y])
labeled_edges = stellar_get_edge_index(labeled_pos, distance_thres=distance_thres)
unlabeled_edges = stellar_get_edge_index(
unlabeled_pos, distance_thres=distance_thres
)
# build dataset
print("Building dataset")
dataset = GraphDataset(train_X, train_y, test_X, labeled_edges, unlabeled_edges)
# run stellar
print("Running STELLAR")
stellar = STELLAR(args, dataset)
stellar.train()
_, results = stellar.pred()
results = results.astype("object")
for i in range(len(results)):
if results[i] in inverse_dict.keys():
results[i] = inverse_dict[results[i]]
adata_unannotated.obs[key_added] = pd.Categorical(results)
# make stellar_pred a string
adata_unannotated.obs["stellar_pred"] = adata_unannotated.obs[
"stellar_pred"
].astype(str)
return adata_unannotated
[docs]
def ml_train(
adata_train,
label,
test_size=0.33,
random_state=0,
model="svm",
nan_policy_y="raise",
showfig=True,
figsize=(10, 8),
):
"""
Train a svm model on the provided data.
Parameters
----------
adata_train : AnnData
The training data as an AnnData object.
label : str
The label to predict.
test_size : float, optional
The proportion of the dataset to include in the test split, by default 0.33.
random_state : int, optional
The seed used by the random number generator, by default 0.
model : str, optional
The type of model to train, by default "svm".
nan_policy_y : str, optional
How to handle NaNs in the label, by default "raise". Can be either 'omit' or 'raise'.
showfig : bool, optional
Whether to show the confusion matrix as a heatmap, by default True.
Returns
-------
SVC
The trained Support Vector Classifier model.
Raises
------
ValueError
If `nan_policy_y` is not 'omit' or 'raise'.
"""
X = pd.DataFrame(adata_train.X)
y = adata_train.obs[label].values
if nan_policy_y == "omit":
y_msk = ~y.isna()
X = X[y_msk]
y = y[y_msk]
elif nan_policy_y == "raise":
pass
else:
raise ValueError("nan_policy_y must be either 'omit' or 'raise'")
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
print(y.unique().sort_values())
print("Training now!")
svc = SVC(kernel="linear", probability=True)
svc.fit(X_train, y_train)
pred = []
y_prob = svc.predict_proba(X_test)
y_prob = pd.DataFrame(y_prob)
y_prob.columns = svc.classes_
svm_label = y_prob.idxmax(axis=1, skipna=True)
target_names = svc.classes_
print("Evaluating now!")
svm_eval = classification_report(
y_true=y_test, y_pred=svm_label, target_names=target_names, output_dict=True
)
if showfig:
plt.figure(figsize=figsize)
sns.heatmap(pd.DataFrame(svm_eval).iloc[:-1, :].T, annot=True)
plt.show()
return svc
[docs]
def ml_predict(adata_val, svc, save_name="svm_pred", return_prob_mat=False):
"""
Predict labels for a given dataset using a trained Support Vector Classifier (SVC) model.
Parameters
----------
adata_val : AnnData
The validation data as an AnnData object.
svc : SVC
The trained Support Vector Classifier model.
save_name : str, optional
The name under which the predictions will be saved in the AnnData object, by default "svm_pred".
return_prob_mat : bool, optional
Whether to return the probability matrix, by default False.
Returns
-------
DataFrame or None
If `return_prob_mat` is True, returns a DataFrame with the probability matrix. Otherwise, returns None.
"""
print("Classifying!")
X_val = pd.DataFrame(adata_val.X)
y_prob_val = svc.predict_proba(X_val)
y_prob_val = pd.DataFrame(y_prob_val)
y_prob_val.columns = svc.classes_
svm_label_val = y_prob_val.idxmax(axis=1, skipna=True)
svm_label_val.index = X_val.index
print("Saving cell type labels to adata!")
adata_val.obs[save_name] = svm_label_val.values
if return_prob_mat:
print("Returning probability matrix!")
y_prob_val.columns = svc.classes_
svm_label_val = y_prob_val.idxmax(axis=1, skipna=True)
return svm_label_val
def masks_to_outlines_scikit_image(masks):
"""get outlines of masks as a 0-1 array
Parameters
----------------
masks: int, 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], 0=NO masks; 1,2,...=mask labels
Returns
----------------
outlines: 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], True pixels are outlines
"""
if masks.ndim > 3 or masks.ndim < 2:
raise ValueError(
"masks_to_outlines takes 2D or 3D array, not %dD array" % masks.ndim
)
if masks.ndim == 3:
outlines = np.zeros(masks.shape, bool)
for i in range(masks.shape[0]):
outlines[i] = find_boundaries(masks[i], mode="inner")
return outlines
else:
return find_boundaries(masks, mode="inner")
def download_file_tm(url, save_path):
"""
Download a file from a given URL and save it to a specified path.
Parameters
----------
url : str
The URL of the file to download.
save_path : str
The local path where the downloaded file will be saved.
Raises
------
requests.exceptions.HTTPError
If the HTTP request returned an unsuccessful status code.
"""
response = requests.get(url)
response.raise_for_status() # Check if the request was successful
with open(save_path, "wb") as file:
file.write(response.content)
def check_download_tm_plugins():
"""
Check and download the TissUUmaps plugins if they are not already present.
This function checks if the required TissUUmaps plugins are present in the
appropriate directory within the active Conda environment. If any plugins
are missing, they are downloaded from the specified URLs.
Raises
------
EnvironmentError
If the Conda environment is not activated.
"""
urls = [
"https://tissuumaps.github.io/TissUUmaps/plugins/latest/ClassQC.js",
"https://tissuumaps.github.io/TissUUmaps/plugins/latest/Plot_Histogram.js",
"https://tissuumaps.github.io/TissUUmaps/plugins/latest/Points2Regions.js",
"https://tissuumaps.github.io/TissUUmaps/plugins/latest/Spot_Inspector.js",
"https://tissuumaps.github.io/TissUUmaps/plugins/latest/Feature_Space.js",
]
conda_env_path = os.getenv("CONDA_PREFIX")
if not conda_env_path:
raise EnvironmentError("Conda environment is not activated.")
python_version = f"python{sys.version_info.major}.{sys.version_info.minor}"
save_directory = os.path.join(
conda_env_path, "lib", python_version, "site-packages", "tissuumaps", "plugins"
)
if not os.path.exists(save_directory):
save_directory_option = os.path.join(
conda_env_path, "lib", "site-packages", "tissuumaps", "plugins"
)
for url in urls:
file_name = os.path.basename(url)
save_path = os.path.join(save_directory_option, file_name)
if not os.path.exists(save_path):
download_file_tm(url, save_path)
print(f"Plug-in downloaded and saved to {save_path}")
else:
for url in urls:
file_name = os.path.basename(url)
save_path = os.path.join(save_directory, file_name)
if not os.path.exists(save_path):
download_file_tm(url, save_path)
print(f"Plug-in downloaded and saved to {save_path}")
[docs]
def tm_viewer(
adata,
images_pickle_path,
directory=None,
region_column="unique_region",
region="",
xSelector="x",
ySelector="y",
color_by="cell_type",
keep_list=None,
include_masks=True,
open_viewer=True,
add_UMAP=True,
use_jpg_compression=False,
):
"""
Prepare and visualize spatial transcriptomics data using TissUUmaps.
Parameters
----------
adata : AnnData
Annotated data matrix.
images_pickle_path : str
Path to the pickle file containing images and masks.
directory : str, optional
Directory to save the output files. If None, a temporary directory will be created.
region_column : str, optional
Column name in `adata.obs` that specifies the region, by default "unique_region".
region : str, optional
Specific region to process, by default "".
xSelector : str, optional
Column name for x coordinates, by default "x".
ySelector : str, optional
Column name for y coordinates, by default "y".
color_by : str, optional
Column name for coloring the points, by default "celltype_fine".
keep_list : list, optional
List of columns to keep from `adata.obs`, by default None.
include_masks : bool, optional
Whether to include masks in the output, by default True.
open_viewer : bool, optional
Whether to open the TissUUmaps viewer, by default True.
add_UMAP : bool, optional
Whether to add UMAP coordinates to the output, by default True.
use_jpg_compression : bool, optional
Whether to use JPEG compression for saving images, by default False.
Returns
-------
list
List of paths to the saved image files.
list
List of paths to the saved CSV files.
"""
print(
"Please consider to cite the following paper when using TissUUmaps: TissUUmaps 3: Improvements in interactive visualization, exploration, and quality assessment of large-scale spatial omics data - Pielawski, Nicolas et al. 2023 - Heliyon, Volume 9, Issue 5, e15306"
)
check_download_tm_plugins()
segmented_matrix = adata.obs
with open(images_pickle_path, "rb") as f:
seg_output = pickle.load(f)
image_dict = seg_output["image_dict"]
masks = seg_output["masks"]
if keep_list is None:
keep_list = [region_column, xSelector, ySelector, color_by]
print("Preparing TissUUmaps input...")
if directory is None:
directory = tempfile.mkdtemp()
cache_dir = pathlib.Path(directory) / region
cache_dir.mkdir(parents=True, exist_ok=True)
# only keep columns in keep_list
segmented_matrix = segmented_matrix[keep_list]
if add_UMAP:
# add UMAP coordinates to segmented_matrix
segmented_matrix["UMAP_1"] = adata.obsm["X_umap"][:, 0]
segmented_matrix["UMAP_2"] = adata.obsm["X_umap"][:, 1]
csv_paths = []
# separate matrix by region and save every region as single csv file
region_matrix = segmented_matrix.loc[segmented_matrix[region_column] == region]
region_matrix.to_csv(cache_dir / (region + ".csv"))
csv_paths.append(cache_dir / (region + ".csv"))
# generate subdirectory for images
image_dir = cache_dir / "images"
image_dir.mkdir(parents=True, exist_ok=True)
image_list = []
# save every image as tif file in image directory from image_dict. name by key in image_dict
if use_jpg_compression == True:
print("Using jpg compression")
for key, image in image_dict.items():
if use_jpg_compression == True:
file_path = os.path.join(image_dir, f"{key}.jpg")
imsave(file_path, image, quality=100)
else:
file_path = os.path.join(image_dir, f"{key}.tif")
imsave(file_path, image, check_contrast=False)
image_list.append(file_path)
if include_masks:
# select first item from image_dict as reference image
reference_image = list(image_dict.values())[0]
# make reference image black by setting all values to 0
reference_image = np.zeros_like(reference_image)
# make the reference image rgb. Add empty channels
if len(reference_image.shape) == 2:
reference_image = np.expand_dims(reference_image, axis=-1)
reference_image = np.repeat(reference_image, 3, axis=-1)
# remove last dimension from masks
masks_3d = np.squeeze(masks)
outlines = masks_to_outlines_scikit_image(masks_3d)
reference_image[outlines] = [255, 0, 0]
file_path = os.path.join(image_dir, "masks.jpg")
# save black pixel as transparent
reference_image = reference_image.astype(np.uint8)
imsave(file_path, reference_image)
image_list.append(file_path)
if open_viewer:
print("Opening TissUUmaps viewer...")
tj.loaddata(
images=image_list,
csvFiles=[str(p) for p in csv_paths],
xSelector=xSelector,
ySelector=ySelector,
keySelector=color_by,
nameSelector=color_by,
colorSelector=color_by,
piechartSelector=None,
shapeSelector=None,
scaleSelector=None,
fixedShape=None,
scaleFactor=1,
colormap=None,
compositeMode="source-over",
boundingBox=None,
port=5100,
host="localhost",
height=900,
tmapFilename=region + "_project",
plugins=[
"Plot_Histogram",
"Points2Regions",
"Spot_Inspector",
"Feature_Space",
"ClassQC",
],
)
return image_list, csv_paths
[docs]
def tm_viewer_catplot(
adata,
directory=None,
region_column="unique_region",
x="x",
y="y",
color_by="cell_type",
open_viewer=True,
add_UMAP=False,
keep_list=None,
):
"""
Generate and visualize categorical plots using TissUUmaps.
Parameters
----------
adata : AnnData
Annotated data matrix.
directory : str, optional
Directory to save the output CSV files. If None, a temporary directory is created.
region_column : str, optional
Column name in `adata.obs` that contains region information. Default is "unique_region".
x : str, optional
Column name in `adata.obs` to be used for x-axis. Default is "x".
y : str, optional
Column name in `adata.obs` to be used for y-axis. Default is "y".
color_by : str, optional
Column name in `adata.obs` to be used for coloring the points. Default is "cell_type".
open_viewer : bool, optional
Whether to open the TissUUmaps viewer after generating the CSV files. Default is True.
add_UMAP : bool, optional
Whether to add UMAP coordinates to the output data. Default is False.
keep_list : list of str, optional
List of columns to keep from `adata.obs`. If None, defaults to [region_column, x, y, color_by].
Returns
-------
list of str
List of paths to the generated CSV files.
"""
check_download_tm_plugins()
segmented_matrix = adata.obs
if keep_list is None:
keep_list = [region_column, x, y, color_by]
print("Preparing TissUUmaps input...")
if directory is None:
print(
"Creating temporary directory... If you want to save the files, please specify a directory."
)
directory = tempfile.mkdtemp()
if not os.path.exists(directory):
os.makedirs(directory)
# only keep columns in keep_list
segmented_matrix = segmented_matrix[keep_list]
if add_UMAP:
# add UMAP coordinates to segmented_matrix
segmented_matrix["UMAP_1"] = adata.obsm["X_umap"][:, 0]
segmented_matrix["UMAP_2"] = adata.obsm["X_umap"][:, 1]
csv_paths = []
# separate matrix by region and save every region as single csv file
unique_regions = segmented_matrix[region_column].unique()
for region in unique_regions:
region_matrix = segmented_matrix.loc[segmented_matrix[region_column] == region]
region_csv_path = os.path.join(directory, region + ".csv")
region_matrix.to_csv(region_csv_path)
csv_paths.append(region_csv_path)
if open_viewer:
print("Opening TissUUmaps viewer...")
tj.loaddata(
images=[],
csvFiles=[str(p) for p in csv_paths],
xSelector=x,
ySelector=y,
keySelector=color_by,
nameSelector=color_by,
colorSelector=color_by,
piechartSelector=None,
shapeSelector=None,
scaleSelector=None,
fixedShape=None,
scaleFactor=1,
colormap=None,
compositeMode="source-over",
boundingBox=None,
port=5100,
host="localhost",
height=900,
tmapFilename="project",
plugins=[
"Plot_Histogram",
"Points2Regions",
"Spot_Inspector",
"Feature_Space",
"ClassQC",
],
)
return csv_paths
def install_gpu_leiden(CUDA="12"):
"""
Install the necessary packages for GPU-accelerated Leiden clustering.
Parameters
----------
CUDA : str, optional
The version of CUDA to use for the installation. Options are '11' and '12'. Default is '12'.
Returns
-------
None
Notes
-----
This function runs a series of pip install commands to install the necessary packages. The specific packages and versions installed depend on the CUDA
version. The function prints the output and any errors from each command.
"""
if platform.system() != "Linux":
print("This feature is currently only supported on Linux.")
else:
print("installing rapids_singlecell")
# Define the commands to run
if CUDA == "11":
commands = [
"pip install rapids-singlecell==0.9.5",
"pip install --extra-index-url=https://pypi.nvidia.com cudf-cu11==24.2.* dask-cudf-cu11==24.2.* cuml-cu11==24.2.* cugraph-cu11==24.2.* cuspatial-cu11==24.2.* cuproj-cu11==24.2.* cuxfilter-cu11==24.2.* cucim-cu11==24.2.* pylibraft-cu11==24.2.* raft-dask-cu11==24.2.*",
"pip install protobuf==3.20",
]
else:
commands = [
"pip install rapids-singlecell==0.9.5",
"pip install --extra-index-url=https://pypi.nvidia.com cudf-cu12==24.2.* dask-cudf-cu12==24.2.* cuml-cu12==24.2.* cugraph-cu12==24.2.* cuspatial-cu12==24.2.* cuproj-cu12==24.2.* cuxfilter-cu12==24.2.* cucim-cu12==24.2.* pylibraft-cu12==24.2.* raft-dask-cu12==24.2.*",
"pip install protobuf==3.20",
]
# Run each command
for command in commands:
process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# Print the output and error, if any
if stdout:
print(f"Output:\n{stdout.decode()}")
if stderr:
print(f"Error:\n{stderr.decode()}")
def anndata_to_GPU(
adata: AnnData,
layer: str | None = None,
convert_all: bool = False,
) -> AnnData:
"""
Transfers matrices and arrays to the GPU
Parameters
----------
adata
AnnData object
layer
Layer to use as input instead of `X`. If `None`, `X` is used.
convert_all
If True, move all supported arrays and matrices on the GPU
Returns
-------
Returns an updated copy with data on GPU
"""
adata_gpu = adata.copy()
if convert_all:
anndata_to_GPU(adata_gpu)
if adata_gpu.layers:
for key in adata_gpu.layers.keys():
anndata_to_GPU(adata_gpu, layer=key)
else:
X = _get_obs_rep(adata_gpu, layer=layer)
if isspmatrix_csr_cpu(X):
X = csr_matrix_gpu(X)
elif isspmatrix_csc_cpu(X):
X = csc_matrix_gpu(X)
elif isinstance(X, np.ndarray):
# Convert to CuPy array only when necessary for GPU computations
X_gpu = cp.asarray(X)
X = X_gpu
else:
error = layer if layer else "X"
warnings.warn(f"{error} not supported for GPU conversion", Warning)
_set_obs_rep(adata_gpu, X, layer=layer)
return adata_gpu
def anndata_to_CPU(
adata: AnnData,
layer: str | None = None,
convert_all: bool = False,
copy: bool = False,
) -> AnnData | None:
"""
Transfers matrices and arrays from the GPU
Parameters
----------
adata
AnnData object
layer
Layer to use as input instead of `X`. If `None`, `X` is used.
convert_all
If True, move all GPU based arrays and matrices to the host memory
copy
Whether to return a copy or update `adata`.
Returns
-------
Updates `adata` inplace or returns an updated copy
"""
if copy:
adata = adata.copy()
if convert_all:
anndata_to_CPU(adata)
if adata.layers:
for key in adata.layers.keys():
anndata_to_CPU(adata, layer=key)
else:
X = _get_obs_rep(adata, layer=layer)
if isspmatrix_csr_gpu(X):
X = X.get()
elif isspmatrix_csc_gpu(X):
X = X.get()
elif isinstance(X, cp.ndarray):
X = X.get()
else:
pass
_set_obs_rep(adata, X, layer=layer)
if copy:
return adata
def install_stellar(CUDA=12):
if CUDA == 12:
subprocess.run(["pip", "install", "torch"], check=True)
subprocess.run(["pip", "install", "torch_geometric"], check=True)
subprocess.run(
[
"pip",
"install",
"pyg_lib",
"torch_scatter",
"torch_sparse",
"torch_cluster",
"torch_spline_conv",
"-f",
"https://data.pyg.org/whl/torch-2.3.0+cu121.html",
],
check=True,
)
elif CUDA == 11.8:
subprocess.run(
[
"pip3",
"install",
"torch",
"--index-url",
"https://download.pytorch.org/whl/cu118",
],
check=True,
)
subprocess.run(["pip", "install", "torch_geometric"], check=True)
subprocess.run(
[
"pip",
"install",
"pyg_lib",
"torch_scatter",
"torch_sparse",
"torch_cluster",
"torch_spline_conv",
"-f",
"https://data.pyg.org/whl/torch-2.3.0+cu118.html",
],
check=True,
)
else:
print("Please choose between CUDA 12 or 11.8")
print(
"If neither is working for you check the installation guide at: https://pytorch.org/get-started/locally/ and https://pytorch-geometric.readthedocs.io/en/latest/install/installation.html"
)
[docs]
def launch_interactive_clustering(adata=None, output_dir=None):
"""
Launch an interactive clustering application for single-cell data analysis.
Parameters
----------
adata : AnnData, optional
An AnnData object containing single-cell data. If provided, the data will be loaded automatically.
output_dir : str, optional
The directory where the annotated AnnData object will be saved. Required if `adata` is provided.
Returns
-------
main_layout : panel.layout.Row
The main layout of the interactive clustering application.
Raises
------
ValueError
If `adata` is provided but `output_dir` is not specified, or if `output_dir` is not a string.
"""
warnings.filterwarnings("ignore")
pn.extension("deckgl", design="bootstrap", theme="default", template="bootstrap")
pn.state.template.config.raw_css.append(
"""
#main {
padding: 0;
}"""
)
# check if output_dir is provided if adata is provided
if adata is not None and not output_dir:
raise ValueError(
"Please provide an output directory to save the annotated AnnData object."
)
# exit the function if output_dir is not provided
return
else:
# check if output_dir is a string
if output_dir and not isinstance(output_dir, str):
raise ValueError("output_dir must be a string.")
# check if output directory exists and create if not:
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Define the app
def create_clustering_app():
# Callback to load data
def load_data(event=None):
if adata is not None:
adata_container["adata"] = adata
marker_list_input.options = list(adata.var_names)
output_area.object = "**AnnData object loaded successfully.**"
return
if not input_path.value or not os.path.isfile(input_path.value):
output_area.object = "**Please enter a valid AnnData file path.**"
return
loaded_adata = sc.read_h5ad(input_path.value)
adata_container["adata"] = loaded_adata
marker_list_input.options = list(loaded_adata.var_names)
output_area.object = "**AnnData file loaded successfully.**"
# Callback to run clustering
def run_clustering(event):
adata = adata_container.get("adata", None)
if adata is None:
output_area.object = "**Please load an AnnData file first.**"
return
marker_list = (
list(marker_list_input.value) if marker_list_input.value else None
)
key_added = (
key_added_input.value
if key_added_input.value
else clustering_method.value + "_" + str(resolution.value)
)
# Start loading indicator
loading_indicator.active = True
output_area.object = "**Clustering in progress...**"
# Run clustering
try:
adata = clustering(
adata,
clustering=clustering_method.value,
marker_list=marker_list,
resolution=resolution.value,
n_neighbors=n_neighbors.value,
reclustering=reclustering.value,
key_added=key_added,
key_filter=None,
subset_cluster=None,
seed=42,
fs_xdim=fs_xdim.value,
fs_ydim=fs_ydim.value,
fs_rlen=fs_rlen.value,
)
adata_container["adata"] = adata
output_area.object = "**Clustering completed.**"
# Automatically generate visualization
key_to_visualize = key_added
tabs = []
sc.pl.umap(adata, color=[key_to_visualize], show=False)
umap_fig = plt.gcf()
plt.close()
tabs.append(("UMAP", pn.pane.Matplotlib(umap_fig, dpi=100)))
if marker_list:
sc.pl.dotplot(
adata,
marker_list,
groupby=key_to_visualize,
dendrogram=True,
show=False,
)
dotplot_fig = plt.gcf()
plt.close()
tabs.append(("Dotplot", pn.pane.Matplotlib(dotplot_fig, dpi=100)))
# Generate histogram plot
cluster_counts = adata.obs[key_to_visualize].value_counts()
cluster_counts.sort_index(inplace=True)
cluster_counts.plot(kind="bar")
plt.xlabel("Cluster")
plt.ylabel("Number of Cells")
plt.title(f"Cluster Counts for {key_to_visualize}")
hist_fig = plt.gcf()
plt.close()
tabs.append(("Histogram", pn.pane.Matplotlib(hist_fig, dpi=100)))
# Add new tabs to visualization area
for name, pane in tabs:
visualization_area.append((name, pane))
# Update cluster annotations
clusters = adata.obs[key_to_visualize].unique().astype(str)
annotations_df = pd.DataFrame(
{"Cluster": clusters, "Annotation": [""] * len(clusters)}
)
cluster_annotation.value = annotations_df
except Exception as e:
output_area.object = f"**Error during clustering: {e}**"
finally:
# Stop loading indicator
loading_indicator.active = False
# Callback to run subclustering
def run_subclustering(event):
adata = adata_container.get("adata", None)
if adata is None:
output_area.object = "**Please run clustering first.**"
return
if not subcluster_key.value or not subcluster_values.value:
output_area.object = "**Please provide subcluster key and values.**"
return
clusters = [c.strip() for c in subcluster_values.value.split(",")]
key_added = subcluster_key.value + "_subcluster"
# Start loading indicator for subclustering
loading_indicator_subcluster.active = True
output_area.object = "**Subclustering in progress...**"
try:
sc.tl.leiden(
adata,
seed=seed.value,
restrict_to=(subcluster_key.value, clusters),
resolution=subcluster_resolution.value,
key_added=key_added,
)
adata_container["adata"] = adata
output_area.object = "**Subclustering completed.**"
# Update visualization
tabs = []
sc.pl.umap(adata, color=[key_added], show=False)
umap_fig = plt.gcf()
plt.close()
tabs.append(("UMAP_Sub", pn.pane.Matplotlib(umap_fig, dpi=100)))
marker_list = (
list(marker_list_input.value) if marker_list_input.value else None
)
if marker_list:
sc.pl.dotplot(
adata,
marker_list,
groupby=key_added,
dendrogram=True,
show=False,
)
dotplot_fig = plt.gcf()
plt.close()
tabs.append(
("Dotplot_Sub", pn.pane.Matplotlib(dotplot_fig, dpi=100))
)
# Generate histogram plot
cluster_counts = adata.obs[key_added].value_counts()
cluster_counts.sort_index(inplace=True)
cluster_counts.plot(kind="bar")
plt.xlabel("Subcluster")
plt.ylabel("Number of Cells")
plt.title(f"Subcluster Counts for {key_added}")
hist_fig = plt.gcf()
plt.close()
tabs.append(("Histogram_Sub", pn.pane.Matplotlib(hist_fig, dpi=100)))
# Add new tabs to visualization area
for name, pane in tabs:
visualization_area.append((name, pane))
# Update cluster annotations
clusters = adata.obs[key_added].unique().astype(str)
annotations_df = pd.DataFrame(
{"Cluster": clusters, "Annotation": [""] * len(clusters)}
)
cluster_annotation.value = annotations_df
except Exception as e:
output_area.object = f"**Error during subclustering: {e}**"
finally:
# Stop loading indicator for subclustering
loading_indicator_subcluster.active = False
# Callback to save annotations
def save_annotations(event):
adata = adata_container.get("adata", None)
if adata is None:
output_area.object = "**No AnnData object to annotate.**"
return
annotation_dict = dict(
zip(
cluster_annotation.value["Cluster"],
cluster_annotation.value["Annotation"],
)
)
key_to_annotate = (
key_added_input.value
if key_added_input.value
else clustering_method.value + "_" + str(resolution.value)
)
adata.obs["cell_type"] = (
adata.obs[key_to_annotate]
.astype(str)
.map(annotation_dict)
.astype("category")
)
output_area.object = "**Annotations saved to AnnData object.**"
def save_adata(event):
adata = adata_container.get("adata", None)
if adata is None:
output_area.object = "**No AnnData object to save.**"
return
if not output_dir_widget.value:
output_area.object = "**Please specify an output directory.**"
return
os.makedirs(output_dir_widget.value, exist_ok=True)
output_filepath = os.path.join(
output_dir_widget.value, "adata_annotated.h5ad"
)
adata.write(output_filepath)
output_area.object = f"**AnnData saved to {output_filepath}.**"
# Callback to run spatial visualization
def run_spatial_visualization(event):
adata = adata_container.get("adata", None)
if adata is None:
output_area.object = "**Please load an AnnData file first.**"
return
try:
catplot(
adata,
color=spatial_color.value,
unique_region=spatial_unique_region.value,
X=spatial_x.value,
Y=spatial_y.value,
n_columns=spatial_n_columns.value,
palette=spatial_palette.value,
savefig=spatial_savefig.value,
output_fname=spatial_output_fname.value,
output_dir=output_dir_widget.value,
figsize=spatial_figsize.value,
size=spatial_size.value,
)
spatial_fig = plt.gcf()
plt.close()
# Add new tab to visualization area
visualization_area.append(
("Spatial Visualization", pn.pane.Matplotlib(spatial_fig, dpi=100))
)
output_area.object = "**Spatial visualization completed.**"
except Exception as e:
output_area.object = f"**Error during spatial visualization: {e}**"
# File paths
input_path = pn.widgets.TextInput(
name="AnnData File Path", placeholder="Enter path to .h5ad file"
)
output_dir_widget = pn.widgets.TextInput(
name="Output Directory",
placeholder="Enter output directory path",
value=output_dir if output_dir else "",
)
load_data_button = pn.widgets.Button(name="Load Data", button_type="primary")
# Clustering parameters
clustering_method = pn.widgets.Select(
name="Clustering Method",
options=["leiden", "louvain", "flowSOM", "leiden_gpu"],
)
resolution = pn.widgets.FloatInput(name="Resolution", value=1.0)
n_neighbors = pn.widgets.IntInput(name="Number of Neighbors", value=10)
reclustering = pn.widgets.Checkbox(name="Reclustering", value=False)
seed = pn.widgets.IntInput(name="Random Seed", value=42)
key_added_input = pn.widgets.TextInput(
name="Key Added", placeholder="Enter key to add to AnnData.obs", value=""
)
marker_list_input = pn.widgets.MultiChoice(
name="Marker List", options=[], width=950
)
# Subclustering parameters
subcluster_key = pn.widgets.TextInput(
name="Subcluster Key",
placeholder='Enter key to filter on (e.g., "leiden_1")',
)
subcluster_values = pn.widgets.TextInput(
name="Subcluster Values",
placeholder="Enter clusters to subset (comma-separated)",
)
subcluster_resolution = pn.widgets.FloatInput(
name="Subcluster Resolution", value=0.3
)
subcluster_button = pn.widgets.Button(
name="Run Subclustering", button_type="primary"
)
# Cluster annotation
cluster_annotation = pn.widgets.DataFrame(
pd.DataFrame(columns=["Cluster", "Annotation"]),
name="Cluster Annotations",
autosize_mode="fit_columns",
)
save_annotations_button = pn.widgets.Button(
name="Save Annotations", button_type="success"
)
fs_xdim = pn.widgets.IntInput(name="FlowSOM xdim", value=10)
fs_ydim = pn.widgets.IntInput(name="FlowSOM ydim", value=10)
fs_rlen = pn.widgets.IntInput(name="FlowSOM rlen", value=10)
# Buttons
run_clustering_button = pn.widgets.Button(
name="Run Clustering", button_type="primary"
)
save_adata_button = pn.widgets.Button(
name="Save AnnData", button_type="success"
)
# Loading indicators
loading_indicator = pn.widgets.Progress(
name="Clustering Progress", active=False, bar_color="primary"
)
loading_indicator_subcluster = pn.widgets.Progress(
name="Subclustering Progress", active=False, bar_color="primary"
)
# Output areas
output_area = pn.pane.Markdown()
visualization_area = pn.Tabs() # Changed to pn.Tabs to hold multiple plots
# Global variable to hold the AnnData object
adata_container = {}
# Spatial visualization parameters
spatial_color = pn.widgets.TextInput(
name="Color By Column",
placeholder="Enter group column name (e.g., cell_type_coarse)",
)
spatial_unique_region = pn.widgets.TextInput(
name="Unique Region Column", value="unique_region"
)
spatial_x = pn.widgets.TextInput(name="X Coordinate Column", value="x")
spatial_y = pn.widgets.TextInput(name="Y Coordinate Column", value="y")
spatial_n_columns = pn.widgets.IntInput(name="Number of Columns", value=2)
spatial_palette = pn.widgets.TextInput(name="Color Palette", value="tab20")
spatial_figsize = pn.widgets.FloatInput(name="Figure Size", value=17)
spatial_size = pn.widgets.FloatInput(name="Point Size", value=20)
spatial_savefig = pn.widgets.Checkbox(name="Save Figure", value=False)
spatial_output_fname = pn.widgets.TextInput(
name="Output Filename", placeholder="Enter output filename"
)
run_spatial_visualization_button = pn.widgets.Button(
name="Run Spatial Visualization", button_type="primary"
)
# Link callbacks
load_data_button.on_click(load_data)
run_clustering_button.on_click(run_clustering)
subcluster_button.on_click(run_subclustering)
save_annotations_button.on_click(save_annotations)
save_adata_button.on_click(save_adata)
run_spatial_visualization_button.on_click(run_spatial_visualization)
# Clustering Tab Layout
clustering_tab = pn.Column(
pn.pane.Markdown("### Load Data"),
(
pn.Row(input_path, output_dir_widget, load_data_button)
if adata is None
else pn.pane.Markdown("AnnData object loaded.")
),
pn.layout.Divider(),
pn.pane.Markdown("### Clustering Parameters"),
pn.Row(clustering_method, resolution, n_neighbors),
pn.Row(seed, reclustering),
pn.Row(fs_xdim, fs_ydim, fs_rlen),
key_added_input,
marker_list_input,
pn.layout.Divider(),
pn.Row(run_clustering_button, loading_indicator),
output_area,
)
# Subclustering Tab Layout
subclustering_tab = pn.Column(
pn.pane.Markdown("### Subclustering Parameters"),
pn.Row(subcluster_key, subcluster_values, subcluster_resolution),
pn.layout.Divider(),
pn.Row(subcluster_button, loading_indicator_subcluster),
output_area,
)
# Annotation Tab Layout
annotation_tab = pn.Column(
pn.pane.Markdown("### Cluster Annotation"),
cluster_annotation,
pn.layout.Divider(),
save_annotations_button,
output_area,
)
# Save Tab Layout
save_tab = pn.Column(
pn.pane.Markdown("### Save Data"), save_adata_button, output_area
)
# Spatial Visualization Tab Layout
spatial_visualization_tab = pn.Column(
pn.pane.Markdown("### Spatial Visualization Parameters"),
pn.Row(spatial_color, spatial_palette),
pn.Row(spatial_unique_region, spatial_n_columns),
pn.Row(spatial_x, spatial_y),
pn.Row(spatial_figsize, spatial_size),
pn.layout.Divider(),
pn.Row(spatial_savefig, spatial_output_fname),
pn.layout.Divider(),
pn.Row(run_spatial_visualization_button),
output_area,
)
# Assemble Tabs
tabs = pn.Tabs(
("Clustering", clustering_tab),
("Subclustering", subclustering_tab),
("Annotation", annotation_tab),
("Spatial Visualization", spatial_visualization_tab),
("Save", save_tab),
)
# Main Layout with Visualization Area
main_layout = pn.Row(tabs, visualization_area, sizing_mode="stretch_both")
# Automatically load data if adata is provided
if adata is not None:
load_data()
return main_layout
# Run the app
main_layout = create_clustering_app()
main_layout.servable(title="SPACEc Clustering App")
return main_layout