# load required packages
from __future__ import annotations
import os
import platform
import subprocess
import sys
import zipfile
import requests
if platform.system() == "Windows":
vipsbin = r"c:\vips-dev-8.15\bin\vips-dev-8.15\bin"
vips_file_path = os.path.join(vipsbin, "vips.exe")
# Check if VIPS is installed
if not os.path.exists(vips_file_path):
# VIPS is not installed, download and extract it
url = "https://github.com/libvips/build-win64-mxe/releases/download/v8.15.2/vips-dev-w64-all-8.15.2.zip"
zip_file_path = "vips-dev-w64-all-8.15.2.zip"
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(zip_file_path, "wb") as f:
f.write(response.raw.read())
# Extract the zip file
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(vipsbin)
else:
print("Error downloading the file.")
# Install pyvips
subprocess.check_call([sys.executable, "-m", "pip", "install", "pyvips"])
# Add vipsbin to the DLL search path or PATH environment variable
add_dll_dir = getattr(os, "add_dll_directory", None)
os.environ["PATH"] = os.pathsep.join((vipsbin, os.environ["PATH"]))
import argparse
import pathlib
import pickle
import time
from builtins import range
from itertools import combinations
from multiprocessing import Pool
from typing import TYPE_CHECKING
import anndata
import concave_hull
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import scipy.stats as st
import skimage
import skimage.color
import skimage.exposure
import skimage.filters.rank
import skimage.io as io
import skimage.morphology
import skimage.transform
import statsmodels.api as sm
import tissuumaps.jupyter as tj
import torch
from concave_hull import concave_hull_indexes
from joblib import Parallel, delayed
from pyFlowSOM import map_data_to_nodes, som
from scipy import stats
from scipy.spatial import Delaunay, KDTree, distance
from scipy.spatial.distance import cdist
from scipy.stats import pearsonr, spearmanr
from skimage.io import imsave
from skimage.segmentation import find_boundaries
from sklearn.cluster import HDBSCAN, MiniBatchKMeans
from sklearn.cross_decomposition import CCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, f1_score, pairwise_distances
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from sklearn.svm import SVC
from tqdm import tqdm
from yellowbrick.cluster import KElbowVisualizer
if TYPE_CHECKING:
from anndata import AnnData
from ..helperfunctions._general import *
try:
from torch_geometric.data import ClusterData, ClusterLoader, Data, InMemoryDataset
except ImportError:
pass
try:
import cupy as cp
import rapids_singlecell as rsc
from cupyx.scipy.sparse import csc_matrix as csc_matrix_gpu
from cupyx.scipy.sparse import csr_matrix as csr_matrix_gpu
from cupyx.scipy.sparse import isspmatrix_csc as isspmatrix_csc_gpu
from cupyx.scipy.sparse import isspmatrix_csr as isspmatrix_csr_gpu
from scanpy.get import _get_obs_rep, _set_obs_rep
from scipy.sparse import isspmatrix_csc as isspmatrix_csc_cpu
from scipy.sparse import isspmatrix_csr as isspmatrix_csr_cpu
except ImportError:
pass
# Tools
############################################################
############
"""
The function tl_cell_types_de performs differential enrichment analysis for various cell subsets between different neighborhoods using linear regression.
It takes in several inputs such as cell type frequencies, neighborhood numbers, and patient information.
The function first normalizes overall cell type frequencies and then neighborhood-specific cell type frequencies. Next, a linear regression model is fitted to find the coefficients and p-values for the group coefficient.
Finally, the function returns a dataframe with the coefficients and p-values for each cell subset. The p-values can be corrected for multiple testing after the function has been executed.
"""
def tl_cell_types_de(
ct_freq, all_freqs, neighborhood_num, nbs, patients, group, cells, cells1
):
# data prep
# normalized overall cell type frequencies
X_cts = hf_normalize(
ct_freq.reset_index().set_index("patients").loc[patients, cells]
)
# normalized neighborhood specific cell type frequencies
df_list = []
for nb in nbs:
cond_nb = (
all_freqs.loc[all_freqs[neighborhood_num] == nb, cells1]
.rename({col: col + "_" + str(nb) for col in cells}, axis=1)
.set_index("patients")
)
df_list.append(hf_normalize(cond_nb))
X_cond_nb = pd.concat(df_list, axis=1).loc[patients]
# differential enrichment for all cell subsets
changes = {}
# nbs =[0, 2, 3, 4, 6, 7, 8, 9]
for col in cells:
for nb in nbs:
# build a design matrix with a constant, group 0 or 1 and the overall frequencies
X = pd.concat(
[
X_cts[col],
group.astype("int"),
pd.Series(np.ones(len(group)), index=group.index.values),
],
axis=1,
).values
if col + "_%d" % nb in X_cond_nb.columns:
# set the neighborhood specific ct freqs as the outcome
Y = X_cond_nb[col + "_%d" % nb].values
X = X[~pd.isna(Y)]
Y = Y[~pd.isna(Y)]
# fit a linear regression model
results = sm.OLS(Y, X).fit()
# find the params and pvalues for the group coefficient
changes[(col, nb)] = (results.pvalues[1], results.params[1])
# make a dataframe with coeffs and pvalues
dat = pd.DataFrame(changes).loc[1].unstack()
dat = (
pd.DataFrame(np.nan_to_num(dat.values), index=dat.index, columns=dat.columns)
.T.sort_index(ascending=True)
.loc[:, X_cts.columns]
)
pvals = (
(pd.DataFrame(changes).loc[0].unstack())
.T.sort_index(ascending=True)
.loc[:, X_cts.columns]
)
# this is where you should correct pvalues for multiple testing
return dat, pvals
#########
def tl_Create_neighborhoods(
df, n_num, cluster_col, X, Y, regions, sum_cols=None, keep_cols=None, ks=[20]
):
if sum_cols == None:
sum_cols = df[cluster_col].unique()
if keep_cols == None:
keep_cols = df.columns.values.tolist()
Neigh = Neighborhoods(
df, ks, cluster_col, sum_cols, keep_cols, X, Y, regions, add_dummies=True
)
windows = Neigh.k_windows()
return (windows, sum_cols)
######
def tl_Chose_window_size(
windows, n_num, n_neighborhoods, sum_cols, n2_name="neigh_ofneigh"
):
# Choose the windows size to continue with
w = windows[n_num]
k_centroids = {}
km = MiniBatchKMeans(n_clusters=n_neighborhoods, random_state=0)
labels = km.fit_predict(w[sum_cols].values)
k_centroids[n_num] = km.cluster_centers_
w[n2_name] = labels
return (w, k_centroids)
#######
def tl_calculate_neigh_combs(w, l, n_num, threshold=0.85, per_keep_thres=0.85):
w.loc[:, l]
# need to normalize by number of neighborhoods or k chosen for the neighborhoods
xm = w.loc[:, l].values / n_num
# Get the neighborhood combinations based on the threshold
simps = hf_get_thresh_simps(xm, threshold)
simp_freqs = simps.value_counts(normalize=True)
simp_sums = np.cumsum(simp_freqs)
# See the percent to keep threshold or percent of neigbhorhoods that fall above a certain threshold
test_sums_thres = simp_sums[simp_sums < per_keep_thres]
test_len = len(test_sums_thres)
per_values_above = simp_sums[test_len] - simp_sums[test_len - 1]
print(test_len, per_values_above)
w["combination"] = [tuple(l[a] for a in s) for s in simps]
w["combination_num"] = [tuple(a for a in s) for s in simps]
# this shows what proportion (y) of the total cells are assigned to the top x combinations
plt.figure(figsize=(7, 3))
plt.plot(simp_sums.values)
plt.title(
"proportion (y) of the total cells are assigned to the top x combinations"
)
plt.show()
# this shows what proportion (y) of the total cells are assigned to the top x combinations
plt.figure(figsize=(7, 3))
plt.plot(test_sums_thres.values)
plt.title(
"proportion (y) of the total cells are assigned to the top x combinations - thresholded"
)
plt.show()
# plt.xticks(range(0,350,35),range(0,350,35),rotation = 90,fontsize = 10)
return (simps, simp_freqs, simp_sums)
#######
def tl_build_graph_CN_comb_map(simp_freqs, thresh_freq=0.001):
g = nx.DiGraph()
# selected_simps = simp_sums[simp_sums<=thresh_cumulative].index.values
selected_simps = simp_freqs[simp_freqs >= thresh_freq].index.values
selected_simps
"""
this builds the graph for the CN combination map
"""
for e0 in selected_simps:
for e1 in selected_simps:
if (set(list(e0)) < set(list(e1))) and (len(e1) == len(e0) + 1):
g.add_edge(e0, e1)
tops = (
simp_freqs[simp_freqs >= thresh_freq]
.sort_values(ascending=False)
.index.values.tolist()[:20]
)
return (g, tops, e0, e1)
#######
def tl_spatial_context_stats(
n_num,
patient_ID_component1,
patient_ID_component2,
windows,
total_per_thres=0.9,
comb_per_thres=0.005,
tissue_column="Block type",
subset_list=["Resection"],
plot_order=["Resection", "Biopsy"],
pal_tis={"Resection": "blue", "Biopsy": "orange"},
subset_list_tissue1=["Resection"],
subset_list_tissue2=["Biopsy"],
):
data_compare = windows[n_num]
# Prepare IDs this could for example be the combination of patient ID and tissue type. Apart from that, the function assigns a number to each name from the neighborhood column
data_compare = prepare_neighborhood_df(
data_compare,
neighborhood_column=None,
patient_ID_component1=patient_ID_component1,
patient_ID_component2=patient_ID_component2,
) # this is a helper function
data_compare["donor_tis"].unique()
simp_df_tissue1 = hf_simp_rep(
data=data_compare,
patient_col="donor_tis",
tissue_column=tissue_column,
subset_list_tissue=subset_list_tissue1,
ttl_per_thres=total_per_thres,
comb_per_thres=comb_per_thres,
thres_num=1,
)
print(simp_df_tissue1)
simp_df_tissue2 = hf_simp_rep(
data=data_compare,
patient_col="donor_tis",
tissue_column=tissue_column,
subset_list_tissue=subset_list_tissue2,
ttl_per_thres=total_per_thres,
comb_per_thres=comb_per_thres,
thres_num=1,
)
print(simp_df_tissue2)
##### Compare the organization at high level to see if differences in combinations - more or less structured/compartmentalized
data_simp = [simp_df_tissue1, simp_df_tissue2]
df_num_count = pl_comb_num_freq(data_list=data_simp)
print(df_num_count)
return (simp_df_tissue1, simp_df_tissue2)
###########
def tl_xycorr(df, sample_col, y_rows, x_columns, X_pix, Y_pix):
# Make a copy for xy correction
df_XYcorr = df.copy()
df_XYcorr["Xcorr"] = 0
df_XYcorr["Ycorr"] = 0
for sample in df_XYcorr[sample_col].unique():
df_sub = df_XYcorr.loc[df_XYcorr[sample_col] == sample]
region_num = df_sub.region.max().astype(int)
# first value of tuple is y and second is x
d = list(product(range(0, y_rows, 1), range(0, x_columns, 1)))
e = list(range(1, region_num + 1, 1))
dict_corr = {}
dict_corr = dict(zip(e, d))
# Adding the pixels with the dictionary
for x in range(1, region_num + 1, 1):
df_XYcorr["Xcorr"].loc[
(df_XYcorr["region"] == x) & (df_XYcorr[sample_col] == sample)
] = (
df_XYcorr["x"].loc[
(df_XYcorr["region"] == x) & (df_XYcorr[sample_col] == sample)
]
+ dict_corr[x][1] * X_pix
)
for x in range(1, region_num + 1, 1):
df_XYcorr["Ycorr"].loc[
(df_XYcorr["region"] == x) & (df_XYcorr[sample_col] == sample)
] = (
df_XYcorr["y"].loc[
(df_XYcorr["region"] == x) & (df_XYcorr[sample_col] == sample)
]
+ dict_corr[x][0] * Y_pix
)
return df_XYcorr
###############
def tl_get_distances(df, cell_list, cell_type_col):
names = cell_list
cls = {}
for i, cname in enumerate(names):
cls[i] = df[["x", "y"]][df[cell_type_col] == cname].to_numpy()
cls[i] = cls[i][~np.isnan(cls[i]).any(axis=1), :]
dists = {}
for i in range(5):
for j in range(0, i):
dists[(j, i)] = cdist(cls[j], cls[i])
dists[(i, j)] = dists[(j, i)]
return cls, dists
###############
# clustering
[docs]
def clustering(
adata,
clustering="leiden",
marker_list=None,
resolution=1,
n_neighbors=10,
reclustering=False,
key_added=None,
key_filter=None,
subset_cluster=None,
seed=42,
fs_xdim=10,
fs_ydim=10,
fs_rlen=10, # FlowSOM parameters
**cluster_kwargs,
):
"""
Perform clustering on the given annotated data matrix.
Parameters
----------
adata : AnnData
The annotated data matrix of shape n_obs x n_vars. Rows correspond
to cells and columns to stained markers.
clustering : str, optional
The clustering algorithm to use. Options are "leiden" or "louvain". Defaults to "leiden".
marker_list : list, optional
A list of markers for clustering. Defaults to None.
resolution : int, optional
The resolution for the clustering algorithm. Defaults to 1.
n_neighbors : int, optional
The number of neighbors to use for the neighbors graph. Defaults to 10.
reclustering : bool, optional
Whether to recluster the data. Defaults to False.
key_added : str, optional
The key name to add to the adata object. Defaults to None.
key_filter : str, optional
The key name to filter the adata object. Defaults to None.
subset_cluster : list, optional
The list of clusters to subset. Defaults to None.
seed : int, optional
Seed for random state. Default is 42.
fs_xdim : int, optional
X dimension for FlowSOM. Default is 10.
fs_ydim : int, optional
Y dimension for FlowSOM. Default is 10.
fs_rlen : int, optional
Rlen for FlowSOM. Default is 10.
**cluster_kwargs : dict
Additional keyword arguments for the clustering function.
Returns
-------
AnnData
The annotated data matrix with the clustering results added.
"""
if clustering not in ["leiden", "louvain", "leiden_gpu", "flowSOM"]:
print(
"Invalid clustering options. Please select from leiden, louvain, leiden_gpu or flowSOM!"
)
print("For GPU accelerated leiden clustering, please use leiden_gpu")
sys.exit()
# test if rapids_singlecell is available
if clustering == "leiden_gpu":
try:
import cudf
import cuml
import cupy
import rapids_singlecell as rsc
except ImportError:
print("Please install rapids_singlecell to use leiden_gpu!")
print("install_gpu_leiden(CUDA = your cuda version as string)")
print("For example: sp.tl.install_gpu_leiden(CUDA = '12')")
print("THIS FUNCTION DOES NOT WORK ON MacOS OR WINDOWS")
print("using leiden instead of leiden_gpu")
clustering = "leiden"
if key_added is None:
key_added = clustering + "_" + str(resolution)
if key_filter is not None:
if subset_cluster is None:
print("Please provide subset_cluster!")
sys.exit()
else:
adata_tmp = adata
adata = adata[adata.obs[key_filter].isin(subset_cluster)]
# input a list of markers for clustering
# reconstruct the anndata
if marker_list is not None:
if len(list(set(marker_list) - set(adata.var_names))) > 0:
print("Marker list not all in adata var_names! Using intersection instead!")
marker_list = list(set(marker_list) & set(adata.var_names))
print("New marker_list: " + " ".join(marker_list))
if key_filter is None:
adata_tmp = adata
adata = adata[:, marker_list]
# Compute the neighborhood relations of single cells the range 2 to 100 and usually 10
if reclustering:
if clustering == "leiden_gpu":
print("Clustering on GPU")
anndata_to_GPU(adata) # moves `.X` to the GPU
rsc.tl.leiden(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
anndata_to_CPU(adata) # moves `.X` to the CPU
else:
print("Clustering")
if clustering == "leiden":
sc.tl.leiden(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
else:
if clustering == "louvain":
print("Louvain clustering")
sc.tl.louvain(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
else:
print("FlowSOM clustering")
adata_df = pd.DataFrame(
adata.X, index=adata.obs.index, columns=adata.var.index
)
# df to numpy array
som_input_arr = adata_df.to_numpy()
# train the SOM
node_output = som(
som_input_arr,
xdim=fs_xdim,
ydim=fs_ydim,
rlen=fs_rlen,
seed=seed,
)
# use trained SOM to assign clusters to each observation in your data
clusters, dists = map_data_to_nodes(node_output, som_input_arr)
clusters = pd.Categorical(clusters)
# add cluster to adata
adata.obs[key_added] = clusters
else:
if clustering == "leiden_gpu":
anndata_to_GPU(adata) # moves `.X` to the GPU
print("Computing neighbors and UMAP on GPU")
rsc.pp.neighbors(adata, n_neighbors=n_neighbors)
# UMAP computation
rsc.tl.umap(adata)
print("Clustering on GPU")
# Perform leiden clustering - improved version of louvain clustering
rsc.tl.leiden(
adata, resolution=resolution, key_added=key_added, random_state=seed
)
anndata_to_CPU(adata) # moves `.X` to the CPU
else:
print("Computing neighbors and UMAP")
sc.pp.neighbors(adata, n_neighbors=n_neighbors)
# UMAP computation
sc.tl.umap(adata)
print("Clustering")
# Perform leiden clustering - improved version of louvain clustering
if clustering == "leiden":
print("Leiden clustering")
sc.tl.leiden(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
else:
if clustering == "louvain":
print("Louvain clustering")
sc.tl.louvain(
adata,
resolution=resolution,
key_added=key_added,
random_state=seed,
**cluster_kwargs,
)
else:
print("FlowSOM clustering")
adata_df = pd.DataFrame(
adata.X, index=adata.obs.index, columns=adata.var.index
)
# df to numpy array
som_input_arr = adata_df.to_numpy()
# train the SOM
node_output = som(
som_input_arr,
xdim=fs_xdim,
ydim=fs_ydim,
rlen=fs_rlen,
seed=seed,
)
# use trained SOM to assign clusters to each observation in your data
clusters, dists = map_data_to_nodes(node_output, som_input_arr)
# make clusters a string
clusters = clusters.astype(str)
clusters = pd.Categorical(clusters)
# add cluster to adata
adata.obs[key_added] = clusters
if key_filter is None:
if marker_list is None:
return adata
else:
adata_tmp.obs[key_added] = adata.obs[key_added].values
# append other data
adata_tmp.obsm = adata.obsm
adata_tmp.obsp = adata.obsp
adata_tmp.uns = adata.uns
if key_filter is not None:
original_df = adata_tmp.obs
donor_df = adata.obs
donor_df_cols = donor_df.loc[:, donor_df.columns != key_added].columns.tolist()
# Perform the merge operation
merged_df = pd.merge(
original_df,
donor_df,
left_on=donor_df_cols,
right_on=donor_df_cols,
how="left",
)
# Fill NA/NaN values in 'key_added' using the values from 'key_filter'
merged_df[key_filter] = merged_df[key_filter].astype(str)
merged_df[key_added] = merged_df[key_added].astype(str)
merged_df.replace("nan", np.nan, inplace=True)
merged_df[key_added].fillna(merged_df[key_filter], inplace=True)
merged_df[key_filter] = merged_df[key_filter].astype("category")
merged_df[key_added] = merged_df[key_added].astype("category")
merged_df.index = merged_df.index.astype(str)
# assign df as obs for adata_tmp
adata_tmp.obs = merged_df
return adata_tmp
###############
# Patch analysis
def tl_generate_voronoi_plots(
df,
output_path,
grouping_col="Community",
tissue_col="tissue",
region_col="unique_region",
x_col="x",
y_col="y",
):
"""
Generate Voronoi plots for unique combinations of tissue and region.
Parameters:
df (pandas.DataFrame): Input DataFrame containing the data.
output_path (str): Output path to save the plots.
grouping_col (str): Column that contains group label that is used to color the voronoi diagrams
tissue_col (str): Column that contains tissue labels
region_col (str): Column that contains region labels
x_col (str): Column that contains x coordinates
y_col (str): Column that contains y coordinates
Returns:
None
"""
unique_tissues = df[tissue_col].unique()
unique_regions = df[region_col].unique()
combinations = list(itertools.product(unique_tissues, unique_regions))
for tissue, region in combinations:
subset_df = df[(df[tissue_col] == tissue) & (df[region_col] == region)]
sorted_df = subset_df.sort_values(grouping_col)
unique_values = sorted_df[grouping_col].unique()
specific_output = os.path.join(output_path, tissue)
os.makedirs(specific_output, exist_ok=True)
specific_output = os.path.join(specific_output, region)
os.makedirs(specific_output, exist_ok=True)
for group in unique_values:
start = time.time()
output_filename = group + "_plot.png"
output_path2 = os.path.join(specific_output, output_filename)
color_dict = {}
for value in unique_values:
color_dict[value] = "black"
color_dict[group] = "white"
X = sorted_df[x_col]
Y = sorted_df[y_col]
np.random.seed(1234)
points = np.c_[X, Y]
vor = Voronoi(points)
regions, vertices = hf_voronoi_finite_polygons_2d(vor)
groups = sorted_df[grouping_col].values
fig, ax = plt.subplots()
ax.set_ylim(0, max(Y))
ax.set_xlim(0, max(X))
ax.axis("off")
for i, region in tqdm(
enumerate(regions), total=len(regions), desc="Processing regions"
):
group = groups[i]
color = color_dict.get(group, "gray")
polygon = vertices[region]
ax.fill(*zip(*polygon), color=color)
ax.plot(points[:, 0], points[:, 1], "o", color="black", zorder=1, alpha=0)
fig.set_size_inches(9.41, 9.07 * 1.02718006795017)
fig.savefig(
output_path2, bbox_inches="tight", pad_inches=0, dpi=129.0809327846365
)
plt.close(fig)
end = time.time()
print(end - start)
def tl_generate_masks_from_images(
image_folder, mask_output, image_type=".tif", filter_size=5, threshold_value=10
):
"""
Generate binary masks from CODEX images.
Parameters:
image_folder (str): Directory that contains the images that are used to generate the masks
mask_output (str): Directory to store the generated masks
image_type (str): File type of image. By default ".tif"
filter_size (num): Size for filter disk during mask generation
threshold_value (num): Threshold value for binary mask generation
Returns:
None
"""
folders_list = hf_list_folders(image_folder)
print(folders_list)
for folder in tqdm(folders_list, desc="Processing folders"):
direc = image_folder + "/" + folder
print(direc)
filelist = os.listdir(direc)
filelist = [f for f in filelist if f.endswith(image_type)]
print(filelist)
output_dir = mask_output + folder
os.makedirs(output_dir, exist_ok=True)
for f in tqdm(filelist, desc="Processing files"):
path = os.path.join(direc, f)
print(path)
tl_generate_mask(
path=path,
output_dir=output_dir,
filename="/" + f,
filter_size=filter_size,
threshold_value=threshold_value,
)
def tl_generate_info_dataframe(
df,
voronoi_output,
mask_output,
filter_list=None,
info_cols=["tissue", "donor", "unique_region", "region", "array"],
):
"""
Generate a filtered DataFrame based on specific columns and values.
Parameters:
df (pandas.DataFrame): Input DataFrame.
voronoi_output (str): Path to the Voronoi output directory.
mask_output (str): Path to the mask output directory.
info_cols (list): columns to extract from input df
filter_list (list, optional): List of values to filter.
Returns:
pandas.DataFrame: Filtered DataFrame.
"""
df_info = df[info_cols].drop_duplicates()
df_info["folder_names"] = df_info["array"]
df_info["region"] = df_info["region"].astype(int)
df_info["region_long"] = ["reg00" + str(region) for region in df_info["region"]]
df_info["voronoi_path"] = (
voronoi_output + df_info["tissue"] + "/" + df_info["unique_region"]
)
df_info["mask_path"] = mask_output + df_info["folder_names"] + "/"
if filter_list != None:
# remove unwanted folders
df_info = df_info[~df_info["folder_names"].isin(filter_list)]
else:
print("no filter used")
return df_info
###
def tl_process_files(voronoi_path, mask_path, region):
"""
Process files based on the provided paths and region.
Parameters:
voronoi_path (str): Path to the Voronoi files.
mask_path (str): Path to the mask files.
region (str): Region identifier.
Returns:
None
"""
png_files_list = hf_get_png_files(voronoi_path)
tiff_file_path = hf_find_tiff_file(mask_path, region)
if tiff_file_path:
print(f"Matching TIFF file found: {tiff_file_path}")
else:
print("No matching TIFF file found.")
for f in tqdm(png_files_list, desc="Processing files"):
print(f)
tl_apply_mask(f, tiff_file_path, f + "_cut.png")
###
def tl_process_data(df_info, output_dir_csv):
"""
Process data based on the information provided in the DataFrame.
Parameters:
df_info (pandas.DataFrame): DataFrame containing the information.
output_dir_csv (str): Output directory for CSV results.
Returns:
pandas.DataFrame: Concatenated DataFrame of results.
list: List of contours.
"""
DF_list = []
contour_list = []
for index, row in df_info.iterrows():
voronoi_path = row["voronoi_path"]
mask_path = row["mask_path"]
region = row["region_long"]
donor = row["donor"]
unique_region = row["unique_region"]
png_files_list = hf_get_png_files(voronoi_path)
png_files_list = [
filename for filename in png_files_list if not filename.endswith("cut.png")
]
tiff_file_path = hf_find_tiff_file(mask_path, region)
if tiff_file_path:
print(f"Matching TIFF file found: {tiff_file_path}")
else:
print("No matching TIFF file found.")
for f in png_files_list:
print(f)
g = f + "_cut" + ".png"
print(g)
tl_apply_mask(f, tiff_file_path, g)
output_dir_csv_tmp = output_dir_csv + "/" + donor + "_" + unique_region
os.makedirs(output_dir_csv_tmp, exist_ok=True)
image_dir = output_dir_csv + "/" + donor + "_" + unique_region
os.makedirs(image_dir, exist_ok=True)
print(f"Path created: {image_dir}")
image_dir = os.path.join(image_dir, os.path.basename(os.path.normpath(g)))
path = g
df, contour = tl_analyze_image(
path,
invert=False,
output_dir=image_dir,
)
df["group"] = hf_extract_filename(g)
df["unique_region"] = unique_region
DF_list.append(df)
contour_list.append(contour)
results_df = pd.concat(DF_list)
contour_list_results_df = pd.concat(DF_list)
results_df.to_csv(os.path.join(output_dir_csv, "results.csv"))
return results_df, contour_list
###
def tl_analyze_image(
path,
output_dir,
invert=False,
properties_list=[
"label",
"centroid",
"area",
"perimeter",
"solidity",
"coords",
"axis_minor_length",
"axis_major_length",
"orientation",
"slice",
],
):
"""
Analyze an image by performing connected component analysis on patches and storing their information.
The function applies image processing techniques such as Gaussian smoothing, thresholding, and connected component
labeling to identify and analyze patches within the image. It extracts region properties of these patches,
calculates their circularity, and stores the coordinates of their contour. The resulting information is saved
in a DataFrame along with a visualization plot.
Parameters:
path (str): Path to the input image.
output_dir (str): Directory to save the output plot.
invert (bool, optional): Flag indicating whether to invert the image (default is False).
properties_list: (list of str): Define properties to be measured (see SciKit Image), by default "label", "centroid", "area", "perimeter", "solidity", "coords", "axis_minor_length", "axis_major_length", "orientation", "slice"
Returns:
tuple: A tuple containing the DataFrame with region properties, including patch contour coordinates, and
the list of contour coordinates for each patch.
"""
image = skimage.io.imread(path)
if image.ndim == 2:
print("2D array")
else:
image = image[:, :, 0]
if invert:
print(
"The original background color was white. The image was inverted for further analysis."
)
# image = 255 - image
else:
print("no inversion")
smooth = skimage.filters.gaussian(image, sigma=1.5)
thresh = smooth > skimage.filters.threshold_otsu(smooth)
blobs_labels = skimage.measure.label(thresh, background=0)
properties = skimage.measure.regionprops(blobs_labels)
props_table = skimage.measure.regionprops_table(
blobs_labels,
properties=(properties_list),
)
prop_df = pd.DataFrame(props_table)
prop_df["circularity"] = (4 * np.pi * prop_df["area"]) / (prop_df["perimeter"] ** 2)
# Store the contour of each patch in the DataFrame
contour_list = []
for index in range(1, blobs_labels.max()):
label_i = properties[index].label
contour = skimage.measure.find_contours(blobs_labels == label_i, 0.5)[0]
contour_list.append(contour)
contour_list_df = pd.DataFrame({"contours": contour_list})
prop_df = pd.concat([prop_df, contour_list_df], axis=1)
plt.figure(figsize=(9, 3.5))
plt.subplot(1, 2, 1)
plt.imshow(thresh, cmap="gray")
plt.axis("off")
plt.subplot(1, 2, 2)
plt.imshow(blobs_labels, cmap="nipy_spectral")
plt.axis("off")
plt.tight_layout()
plt.savefig(output_dir)
plt.close()
return prop_df, contour_list
###
def tl_apply_mask(image_path, mask_path, output_path):
"""
Apply a mask to an image and save the resulting masked image.
Parameters:
image_path (str): Path to the input image.
mask_path (str): Path to the mask image.
output_path (str): Path to save the masked image.
Returns:
None
"""
# Load the image and the mask
image = io.imread(image_path)
mask = io.imread(mask_path, as_gray=True)
mask = np.flip(mask, axis=0)
width = 941
height = 907
image = skimage.transform.resize(image, (height, width))
if image.ndim == 2:
print("2D array")
else:
image = image[:, :, :3]
# Convert to grayscale
image = skimage.color.rgb2gray(image)
# Convert to 8-bit
image = skimage.img_as_ubyte(image)
print("Image shape:", image.shape)
print("Mask shape:", mask.shape)
# Ensure the mask is binary
mask = mask > 0
# Apply the mask to the image
masked_image = image.copy()
masked_image[~mask] = 0
# Check if the image has an alpha channel (transparency)
if masked_image.ndim == 2:
print("2D array")
else:
masked_image = masked_image[:, :, :3]
# Save the masked image
io.imsave(output_path, skimage.img_as_ubyte(masked_image))
###
def tl_generate_mask(
path, output_dir, filename="mask.png", filter_size=5, threshold_value=5
):
"""
Generate a mask from a maximum projection of an input image.
Parameters:
path (str): Path to the input image.
output_dir (str): Directory to save the generated mask and quality control plot.
filename (str, optional): Name of the generated mask file (default is "mask.png").
filter_size (int, optional): Size of the filter disk used for image processing (default is 5).
threshold_value (int, optional): Threshold value for binary conversion (default is 5).
Returns:
None
"""
# Load the image
image = io.imread(path)
# Perform Z projection using Maximum Intensity
z_projection = np.max(image, axis=0)
# Resize the image
width = 941
height = 907
resized_image = skimage.transform.resize(
z_projection, (height, width, 3), preserve_range=True
)
print("Resized image shape:", resized_image.shape)
# Remove alpha channel if present
if resized_image.shape[-1] == 4:
resized_image = resized_image[:, :, :3]
# Convert to grayscale
gray_image = skimage.color.rgb2gray(resized_image)
# Assuming gray_image has pixel values outside the range [0, 1]
# Normalize the pixel values to the range [0, 1]
gray_image_normalized = (gray_image - gray_image.min()) / (
gray_image.max() - gray_image.min()
)
# Convert to 8-bit
gray_image_8bit = skimage.img_as_ubyte(gray_image_normalized)
# Apply maximum filter
max_filtered = skimage.filters.rank.maximum(
gray_image_8bit, skimage.morphology.disk(filter_size)
)
# Apply minimum filter
min_filtered = skimage.filters.rank.minimum(
max_filtered, skimage.morphology.disk(filter_size)
)
# Apply median filter
median_filtered = skimage.filters.rank.median(
min_filtered, skimage.morphology.disk(filter_size)
)
# Manual Thresholding
binary = median_filtered > threshold_value
# Convert to mask
mask = skimage.morphology.closing(binary, skimage.morphology.square(3))
# Plotting
fig, axes = plt.subplots(2, 3, figsize=(12, 6))
axes[0, 0].imshow(gray_image, cmap="gray")
axes[0, 0].set_title("Grayscale Image")
axes[0, 1].imshow(gray_image_8bit, cmap="gray")
axes[0, 1].set_title("8-bit Image")
axes[0, 2].imshow(max_filtered, cmap="gray")
axes[0, 2].set_title("Maximum Filtered")
axes[1, 0].imshow(min_filtered, cmap="gray")
axes[1, 0].set_title("Minimum Filtered")
axes[1, 1].imshow(median_filtered, cmap="gray")
axes[1, 1].set_title("Median Filtered")
axes[1, 2].imshow(mask, cmap="gray")
axes[1, 2].set_title("Mask")
for ax in axes.ravel():
ax.axis("off")
plt.tight_layout()
fig.savefig(output_dir + filename + "_QC_plot.png", dpi=300, format="png")
plt.show()
# Save the result
io.imsave(output_dir + filename, mask)
#####
def tl_test_clustering_resolutions(
adata, clustering="leiden", n_neighbors=10, resolutions=[1]
):
"""
Test different resolutions for reclustering using Louvain or Leiden algorithm.
Parameters:
adata (AnnData): Anndata object containing the data.
clustering (str, optional): Clustering algorithm to use (default is 'leiden').
n_neighbors (int, optional): Number of nearest neighbors (default is 10).
resolutions (list, optional): List of resolutions to test (default is [1]).
Returns:
None
"""
for res in tqdm(resolutions, desc="Testing resolutions"):
if "leiden" in clustering:
clustering(
adata,
clustering="leiden",
n_neighbors=n_neighbors,
res=res,
reclustering=True,
)
else:
clustering(
adata,
clustering="louvain",
n_neighbors=n_neighbors,
res=res,
reclustering=True,
)
sc.pl.umap(adata, color=f"{clustering}_{res}", legend_loc="on data")
[docs]
def neighborhood_analysis(
adata,
unique_region,
cluster_col,
X="x",
Y="y",
k=35,
n_neighborhoods=30,
elbow=False,
metric="distortion",
):
"""
Compute for Cellular neighborhoods (CNs).
Parameters
----------
adata : AnnData
Annotated data matrix.
unique_region : str
Each region is one independent CODEX image.
cluster_col : str
Columns to compute CNs on, typically 'celltype'.
X : str, optional
X coordinate column name, by default "x".
Y : str, optional
Y coordinate column name, by default "y".
k : int, optional
Number of neighbors to compute, by default 35.
n_neighborhoods : int, optional
Number of neighborhoods one ends up with, by default 30.
elbow : bool, optional
Whether to test for optimal number of clusters and visulize as elbow plot or not, by default False. If set to true the funktion will test 1 to n_neighborhoods and plots the distortion score in an elbow plot to assist the user in finding the optimal number of clusters.
metric : str, optional
The metric to use when calculating distance between instances in a feature array, by default "distortion".
Returns
-------
AnnData
Annotated data matrix with updated neighborhood information.
"""
df = pd.DataFrame(adata.obs[[X, Y, cluster_col, unique_region]])
cells = pd.concat([df, pd.get_dummies(df[cluster_col])], axis=1)
sum_cols = cells[cluster_col].unique()
values = cells[sum_cols].values
neighborhood_name = "CN" + "_k" + str(k) + "_n" + str(n_neighborhoods)
centroids_name = "Centroid" + "_k" + str(k) + "_n" + str(n_neighborhoods)
n_neighbors = k
cells[unique_region] = cells[unique_region].astype("str")
cells["cellid"] = cells.index.values
cells.reset_index(inplace=True)
keep_cols = [X, Y, unique_region, cluster_col]
# Get each region
tissue_group = cells[[X, Y, unique_region]].groupby(unique_region)
exps = list(cells[unique_region].unique())
tissue_chunks = [
(time.time(), exps.index(t), t, a)
for t, indices in tissue_group.groups.items()
for a in np.array_split(indices, 1)
]
tissues = [
hf_get_windows(job, n_neighbors, exps=exps, tissue_group=tissue_group, X=X, Y=Y)
for job in tissue_chunks
]
# Loop over k to compute neighborhoods
out_dict = {}
for neighbors, job in zip(tissues, tissue_chunks):
chunk = np.arange(len(neighbors)) # indices
tissue_name = job[2]
indices = job[3]
window = (
values[neighbors[chunk, :k].flatten()]
.reshape(len(chunk), k, len(sum_cols))
.sum(axis=1)
)
out_dict[(tissue_name, k)] = (window.astype(np.float16), indices)
windows = {}
window = pd.concat(
[
pd.DataFrame(
out_dict[(exp, k)][0],
index=out_dict[(exp, k)][1].astype(int),
columns=sum_cols,
)
for exp in exps
],
axis=0,
)
window = window.loc[cells.index.values]
window = pd.concat([cells[keep_cols], window], axis=1)
windows[k] = window
# Fill in based on above
k_centroids = {}
# producing what to plot
windows2 = windows[k]
windows2[cluster_col] = cells[cluster_col]
if elbow != True:
km = MiniBatchKMeans(n_clusters=n_neighborhoods, random_state=0)
labels = km.fit_predict(windows2[sum_cols].values)
k_centroids[str(k)] = km.cluster_centers_
adata.obs[neighborhood_name] = labels
adata.uns[centroids_name] = k_centroids
else:
km = MiniBatchKMeans(random_state=0)
X = windows2[sum_cols].values
labels = km.fit_predict(X)
k_centroids[str(k)] = km.cluster_centers_
adata.obs[neighborhood_name] = labels
adata.uns[centroids_name] = k_centroids
visualizer = KElbowVisualizer(
km, k=(n_neighborhoods), timings=False, metric=metric
)
visualizer.fit(X) # Fit the data to the visualizer
visualizer.show() # Finalize and render the figure
return adata
def build_cn_map(
adata,
cn_col,
unique_region,
palette=None,
k=75,
X="x",
Y="y",
threshold=0.85,
per_keep_thres=0.85,
sub_list=None,
sub_col=None,
rand_seed=1,
):
"""
Generate a cellular neighborhood (CN) map.
Parameters
----------
adata : AnnData
Annotated data matrix.
cn_col : str
Column name for cellular neighborhood.
unique_region : str
Unique region identifier.
palette : dict, optional
Color palette for the CN map, by default None.
k : int, optional
Number of neighbors to compute, by default 75.
X : str, optional
X coordinate column name, by default "x".
Y : str, optional
Y coordinate column name, by default "y".
threshold : float, optional
Threshold for neighborhood computation, by default 0.85.
per_keep_thres : float, optional
Threshold for keeping percentage, by default 0.85.
sub_list : list, optional
List of sub regions, by default None.
sub_col : str, optional
Column name for sub regions, by default None.
rand_seed : int, optional
Random seed for color generation, by default 1.
Returns
-------
dict
Dictionary containing the graph, top nodes, edges and simplicial frequencies.
"""
ks = [k]
cells_df = pd.DataFrame(adata.obs)
cells_df = cells_df[[X, Y, unique_region, cn_col]]
cells_df.reset_index(inplace=True)
sum_cols = cells_df[cn_col].unique()
keep_cols = cells_df.columns
cn_colors = hf_generate_random_colors(
len(adata.obs[cn_col].unique()), rand_seed=rand_seed
)
if palette is None:
if cn_col + "_colors" not in adata.uns.keys():
palette = dict(zip(np.sort(adata.obs[cn_col].unique()), cn_colors))
adata.uns[cn_col + "_colors"] = cn_colors
else:
palette = dict(
zip(np.sort(adata.obs[cn_col].unique()), adata.uns[cn_col + "_colors"])
)
Neigh = Neighborhoods(
cells_df,
ks,
cn_col,
sum_cols,
keep_cols,
X,
Y,
reg=unique_region,
add_dummies=True,
)
windows = Neigh.k_windows()
w = windows[k]
if sub_list:
# convert sub_list to list if only str is provided
if isinstance(sub_list, str):
sub_list = [sub_list]
w = w[w[sub_col].isin(sub_list)]
l = list(palette.keys())
simps, simp_freqs, simp_sums = tl_calculate_neigh_combs(
w, l, k, threshold=threshold, per_keep_thres=per_keep_thres # color palette
)
g, tops, e0, e1 = tl_build_graph_CN_comb_map(simp_freqs)
return {
"g": g,
"tops": tops,
"e0": e0,
"e1": e1,
"simp_freqs": simp_freqs,
"w": w,
"l": l,
"k": k,
"threshold": threshold,
}
def tl_format_for_squidpy(adata, x_col, y_col):
"""
Format an AnnData object for use with Squidpy.
Parameters
----------
adata : AnnData
Annotated data matrix.
x_col : str
Column name for x spatial coordinates.
y_col : str
Column name for y spatial coordinates.
Returns
-------
AnnData
Annotated data matrix formatted for Squidpy, with spatial data in the 'obsm' attribute.
"""
# Extract the count data from your original AnnData object
counts = adata.X
# Extract the spatial coordinates from the 'obs' metadata
spatial_coordinates = adata.obs[[x_col, y_col]].values
# Create a new AnnData object with the expected format
new_adata = ad.AnnData(counts, obsm={"spatial": spatial_coordinates})
return new_adata
def tl_corr_cell_ad(
adata, per_categ, grouping_col, rep, sub_column, normed=True, sub_list2=None
):
"""
Perform correlation analysis on a pandas DataFrame and plot correlation scatter plots.
Parameters
----------
data : pandas DataFrame
The input DataFrame.
per_categ : str
The categorical column in the DataFrame to be used.
grouping_col : str
The grouping column in the DataFrame.
rep : str
The replicate column in the DataFrame.
sub_column : str
The subcategory column in the DataFrame.
normed : bool, optional
If the percentage should be normalized. Default is True.
sub_list2 : list, optional
A list of subcategories to be considered. Default is None.
Returns
-------
cmat : pandas DataFrame
The correlation matrix DataFrame.
cc : pandas DataFrame
The DataFrame after pivoting and formatting for correlation function.
"""
data = adata.obs
cmat, cc = tl_corr_cell(
data,
per_categ,
grouping_col=grouping_col,
rep=rep,
sub_column=sub_column,
normed=normed,
sub_list2=sub_list2,
)
return cmat, cc
def calculate_triangulation_distances(df_input, id, x_pos, y_pos, cell_type, region):
"""
Calculate distances between cells using Delaunay triangulation.
Parameters
----------
df_input : pandas.DataFrame
Input dataframe containing cell information.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type annotations.
region : str
Column name for region.
Returns
-------
pandas.DataFrame
Annotated result dataframe with calculated distances and additional information.
"""
# Perform Delaunay triangulation
points = df_input[[x_pos, y_pos]].values
tri = Delaunay(points)
indices = tri.simplices
# Get interactions going both directions
edges = set()
for simplex in indices:
for i in range(3):
for j in range(i + 1, 3):
edges.add(tuple(sorted([simplex[i], simplex[j]])))
edges = np.array(list(edges))
# Create dataframe from edges
rdelaun_result = pd.DataFrame(edges, columns=["ind1", "ind2"])
rdelaun_result[["x1", "y1"]] = df_input.iloc[rdelaun_result["ind1"]][
[x_pos, y_pos]
].values
rdelaun_result[["x2", "y2"]] = df_input.iloc[rdelaun_result["ind2"]][
[x_pos, y_pos]
].values
# Annotate results with cell type and region information
df_input["XYcellID"] = (
df_input[x_pos].astype(str) + "_" + df_input[y_pos].astype(str)
)
rdelaun_result["cell1ID"] = (
rdelaun_result["x1"].astype(str) + "_" + rdelaun_result["y1"].astype(str)
)
rdelaun_result["cell2ID"] = (
rdelaun_result["x2"].astype(str) + "_" + rdelaun_result["y2"].astype(str)
)
annotated_result = pd.merge(
rdelaun_result, df_input, left_on="cell1ID", right_on="XYcellID"
)
annotated_result = annotated_result.rename(
columns={cell_type: "celltype1", id: "celltype1_index"}
)
annotated_result = annotated_result.drop(columns=[x_pos, y_pos, region, "XYcellID"])
annotated_result = pd.merge(
annotated_result,
df_input,
left_on="cell2ID",
right_on="XYcellID",
suffixes=(".x", ".y"),
)
annotated_result = annotated_result.rename(
columns={cell_type: "celltype2", id: "celltype2_index"}
)
annotated_result = annotated_result.drop(columns=[x_pos, y_pos, "XYcellID"])
# Calculate distance
annotated_result["distance"] = np.sqrt(
(annotated_result["x2"] - annotated_result["x1"]) ** 2
+ (annotated_result["y2"] - annotated_result["y1"]) ** 2
)
# Reorder columns
annotated_result = annotated_result[
[
region,
"celltype1_index",
"celltype1",
"x1",
"y1",
"celltype2_index",
"celltype2",
"x2",
"y2",
"distance",
]
]
annotated_result.columns = [
region,
"celltype1_index",
"celltype1",
"celltype1_X",
"celltype1_Y",
"celltype2_index",
"celltype2",
"celltype2_X",
"celltype2_Y",
"distance",
]
return annotated_result
# Define the process_region function at the top level
def process_region(df, unique_region, id, x_pos, y_pos, cell_type, region):
"""
Process a specific region of a dataframe, calculating triangulation distances.
Parameters
----------
df : pandas.DataFrame
Input dataframe containing cell information.
unique_region : str
Unique region identifier.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
Returns
-------
pandas.DataFrame
Result dataframe with calculated distances and additional information for the specified region.
"""
subset = df[df[region] == unique_region].copy()
subset["uniqueID"] = (
subset[id].astype(str)
+ "-"
+ subset[x_pos].astype(str)
+ "-"
+ subset[y_pos].astype(str)
)
subset["XYcellID"] = subset[x_pos].astype(str) + "_" + subset[y_pos].astype(str)
result = calculate_triangulation_distances(
df_input=subset,
id=id,
x_pos=x_pos,
y_pos=y_pos,
cell_type=cell_type,
region=region,
)
return result
def get_triangulation_distances(
df_input, id, x_pos, y_pos, cell_type, region, num_cores=None, correct_dtype=True
):
"""
Calculate triangulation distances for each unique region in the input dataframe.
Parameters
----------
df_input : pandas.DataFrame
Input dataframe containing cell information.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
num_cores : int, optional
Number of cores to use for parallel processing. If None, defaults to half of available cores.
correct_dtype : bool, optional
If True, corrects the data type of the cell_type and region columns to string.
Returns
-------
pandas.DataFrame
Result dataframe with calculated distances and additional information for each unique region.
"""
if correct_dtype == True:
# change columns to pandas string
df_input[cell_type] = df_input[cell_type].astype(str)
df_input[region] = df_input[region].astype(str)
# Check if x_pos and y_pos are integers, and if not, convert them
if not issubclass(df_input[x_pos].dtype.type, np.integer):
print("This function expects integer values for xy coordinates.")
print(
x_pos
+ " and "
+ y_pos
+ " will be changed to integer. Please check the generated output!"
)
df_input[x_pos] = df_input[x_pos].astype(int).values
df_input[y_pos] = df_input[y_pos].astype(int).values
# Get unique regions
unique_regions = df_input[region].unique()
# Select only necessary columns
df_input = df_input.loc[:, [id, x_pos, y_pos, cell_type, region]]
# Set up parallelization
if num_cores is None:
num_cores = os.cpu_count() // 2 # default to using half of available cores
# Parallel processing using joblib
results = Parallel(n_jobs=num_cores)(
delayed(process_region)(df_input, reg, id, x_pos, y_pos, cell_type, region)
for reg in unique_regions
)
triangulation_distances = pd.concat(results)
return triangulation_distances
def shuffle_annotations(df_input, cell_type, region, permutation):
"""
Shuffle annotations within each unique region in the input dataframe.
Parameters
----------
df_input : pandas.DataFrame
Input dataframe containing cell information.
cell_type : str
Column name for cell type annotations.
region : str
Column name for region.
permutation : int
Seed for the random number generator.
Returns
-------
pandas.DataFrame
Result dataframe with shuffled annotations for each unique region.
"""
# Set the seed for reproducibility
np.random.seed(permutation + 1234)
# Create a copy to avoid modifying the original dataframe
df_shuffled = df_input.copy()
# Shuffle annotations within each region
for region_name in df_shuffled[region].unique():
region_mask = df_shuffled[region] == region_name
shuffled_values = df_shuffled.loc[region_mask, cell_type].sample(frac=1).values
df_shuffled.loc[region_mask, "random_annotations"] = shuffled_values
return df_shuffled
def tl_iterate_tri_distances(
df_input, id, x_pos, y_pos, cell_type, region, num_cores=None, num_iterations=1000
):
"""
Iterate over triangulation distances for each unique region in the input dataframe.
Parameters
----------
df_input : pandas.DataFrame
Input dataframe containing cell information.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
num_cores : int, optional
Number of cores to use for parallel processing. If None, defaults to half of available cores.
num_iterations : int, optional
Number of iterations to perform. Defaults to 1000.
Returns
-------
pandas.DataFrame
Result dataframe with iterative triangulation distances for each unique region.
"""
unique_regions = df_input[region].unique()
# Use only the necessary columns
df_input = df_input[[id, x_pos, y_pos, cell_type, region]]
if num_cores is None:
num_cores = os.cpu_count() // 2 # Default to using half of available cores
# Define a helper function to process each region and iteration
def process_iteration(region_name, iteration):
# Filter by region
subset = df_input[df_input[region] == region_name].copy()
# Create unique IDs
subset.loc[:, "uniqueID"] = (
subset[id].astype(str)
+ "-"
+ subset[x_pos].astype(str)
+ "-"
+ subset[y_pos].astype(str)
)
subset.loc[:, "XYcellID"] = (
subset[x_pos].astype(str) + "_" + subset[y_pos].astype(str)
)
# Shuffle annotations
shuffled = shuffle_annotations(subset, cell_type, region, iteration)
# Get triangulation distances
results = get_triangulation_distances(
df_input=shuffled,
id=id,
x_pos=x_pos,
y_pos=y_pos,
cell_type="random_annotations",
region=region,
num_cores=num_cores,
correct_dtype=False,
)
# Summarize results
per_cell_summary = (
results.groupby(["celltype1_index", "celltype1", "celltype2"])
.distance.mean()
.reset_index(name="per_cell_mean_dist")
)
per_celltype_summary = (
per_cell_summary.groupby(["celltype1", "celltype2"])
.per_cell_mean_dist.mean()
.reset_index(name="mean_dist")
)
per_celltype_summary[region] = region_name
per_celltype_summary["iteration"] = iteration
return per_celltype_summary
# TODO: remove nans valid here a good idea (attempt to fix windows unpickle issue)?
unique_regions = [r for r in unique_regions if r != np.nan]
# Parallel processing for each region and iteration
results = Parallel(n_jobs=num_cores)(
delayed(process_iteration)(region_name, iteration)
for region_name in unique_regions
for iteration in range(1, num_iterations + 1)
)
# Combine all results
iterative_triangulation_distances = pd.concat(results, ignore_index=True)
# iterative_triangulation_distances = iterative_triangulation_distances.dropna()
return iterative_triangulation_distances
def tl_iterate_tri_distances_ad(
adata,
id,
x_pos,
y_pos,
cell_type,
region,
num_cores=None,
num_iterations=1000,
key_name=None,
correct_dtype=True,
):
"""
Iterate over triangulation distances for each unique region in the input AnnData object.
Parameters
----------
adata : anndata.AnnData
Annotated data matrix.
id : str
Column name for cell id.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
num_cores : int, optional
Number of cores to use for parallel processing. If None, defaults to half of available cores.
num_iterations : int, optional
Number of iterations to perform. Defaults to 1000.
key_name : str, optional
Key name to use when saving the result to the AnnData object. If None, defaults to "iTriDist_" + str(num_iterations).
correct_dtype : bool, optional
If True, corrects the data type of the cell type and region columns to string. Defaults to True.
Returns
-------
pandas.DataFrame
Result dataframe with iterative triangulation distances for each unique region.
"""
df_input = pd.DataFrame(adata.obs)
df_input[id] = df_input.index
if correct_dtype == True:
# change columns to pandas string
df_input[cell_type] = df_input[cell_type].astype(str)
df_input[region] = df_input[region].astype(str)
# Check if x_pos and y_pos are integers, and if not, convert them
if not issubclass(df_input[x_pos].dtype.type, np.integer):
print("This function expects integer values for xy coordinates.")
print("Class will be changed to integer. Please check the generated output!")
df_input[x_pos] = df_input[x_pos].astype(int).values
df_input[y_pos] = df_input[y_pos].astype(int).values
unique_regions = df_input[region].unique()
# Use only the necessary columns
df_input = df_input.loc[:, [id, x_pos, y_pos, cell_type, region]]
if num_cores is None:
num_cores = os.cpu_count() // 2 # Default to using half of available cores
# Define a helper function to process each region and iteration
def process_iteration(region_name, iteration):
# Filter by region
subset = df_input.loc[df_input[region] == region_name, :].copy()
subset.loc[:, "uniqueID"] = (
subset[id].astype(str)
+ "-"
+ subset[x_pos].astype(str)
+ "-"
+ subset[y_pos].astype(str)
)
subset.loc[:, "XYcellID"] = (
subset[x_pos].astype(str) + "_" + subset[y_pos].astype(str)
)
# Shuffle annotations
shuffled = shuffle_annotations(subset, cell_type, region, iteration)
# Get triangulation distances
results = get_triangulation_distances(
df_input=shuffled,
id=id,
x_pos=x_pos,
y_pos=y_pos,
cell_type="random_annotations",
region=region,
num_cores=num_cores,
correct_dtype=False,
)
# Summarize results
per_cell_summary = (
results.groupby(["celltype1_index", "celltype1", "celltype2"])
.distance.mean()
.reset_index(name="per_cell_mean_dist")
)
per_celltype_summary = (
per_cell_summary.groupby(["celltype1", "celltype2"])
.per_cell_mean_dist.mean()
.reset_index(name="mean_dist")
)
per_celltype_summary[region] = region_name
per_celltype_summary["iteration"] = iteration
return per_celltype_summary
# Parallel processing for each region and iteration
results = Parallel(n_jobs=num_cores)(
delayed(process_iteration)(region_name, iteration)
for region_name in unique_regions
for iteration in range(1, num_iterations + 1)
)
# Combine all results
iterative_triangulation_distances = pd.concat(results, ignore_index=True)
# append result to adata
if key_name is None:
key_name = "iTriDist_" + str(num_iterations)
adata.uns[key_name] = iterative_triangulation_distances
print("Save iterative triangulation distance output to anndata.uns " + key_name)
return iterative_triangulation_distances
def add_missing_columns(
triangulation_distances, metadata, shared_column="unique_region"
):
"""
Add missing columns from metadata to triangulation_distances dataframe.
Parameters
----------
triangulation_distances : pandas.DataFrame
DataFrame containing triangulation distances.
metadata : pandas.DataFrame
DataFrame containing metadata.
shared_column : str, optional
Column name that is shared between the two dataframes. Defaults to "unique_region".
Returns
-------
pandas.DataFrame
Updated triangulation_distances dataframe with missing columns added.
"""
# Find the difference in columns
missing_columns = set(metadata.columns) - set(triangulation_distances.columns)
# Add missing columns to triangulation_distances with NaN values
for column in missing_columns:
triangulation_distances[column] = pd.NA
# Create a mapping from unique_region to tissue in metadata
region_to_tissue = pd.Series(
metadata[column].values, index=metadata["unique_region"]
).to_dict()
# Apply this mapping to the triangulation_distances dataframe to create/update the tissue column
triangulation_distances[column] = triangulation_distances["unique_region"].map(
region_to_tissue
)
# Handle regions with no corresponding tissue in the metadata by filling in a default value
triangulation_distances[column].fillna("Unknown", inplace=True)
return triangulation_distances
# Calculate p-values and log fold differences
def calculate_pvalue(row):
"""
Calculate the p-value using the Mann-Whitney U test.
Parameters
----------
row : pandas.Series
A row of data containing 'expected' and 'observed' values.
Returns
-------
float
The calculated p-value. Returns np.nan if there is insufficient data to perform the test.
"""
# function body here
try:
return st.mannwhitneyu(
row["expected"], row["observed"], alternative="two-sided"
).pvalue
except ValueError: # This handles cases with insufficient data
return np.nan
[docs]
def identify_interactions(
adata,
cellid,
x_pos,
y_pos,
cell_type,
region,
comparison,
iTriDist_keyname=None,
triDist_keyname=None,
min_observed=10,
distance_threshold=128,
num_cores=None,
num_iterations=1000,
key_name=None,
correct_dtype=False,
):
"""
Identify interactions between cell types based on their spatial distances.
Parameters
----------
adata : AnnData
Annotated data matrix.
id : str
Identifier for cells.
x_pos : str
Column name for x position of cells.
y_pos : str
Column name for y position of cells.
cell_type : str
Column name for cell type.
region : str
Column name for region.
comparison : str
Column name for comparison.
iTriDist_keyname : str, optional
Key name for iterative triangulation distances, by default None
triDist_keyname : str, optional
Key name for triangulation distances, by default None
min_observed : int, optional
Minimum number of observed distances, by default 10
distance_threshold : int, optional
Threshold for distance, by default 128
num_cores : int, optional
Number of cores to use for computation, by default None
num_iterations : int, optional
Number of iterations for computation, by default 1000
key_name : str, optional
Key name for output, by default None
correct_dtype : bool, optional
Whether to correct data type or not, by default False
Returns
-------
DataFrame
DataFrame with p-values and logfold changes for interactions.
"""
df_input = pd.DataFrame(adata.obs)
if cellid in df_input.columns:
df_input.index = df_input[cellid]
else:
print(cellid + " is not in the adata.obs, use index as cellid instead!")
df_input[cellid] = df_input.index
# change columns to pandas string
df_input[cell_type] = df_input[cell_type].astype(str)
df_input[region] = df_input[region].astype(str)
print("Computing for observed distances between cell types!")
triangulation_distances = get_triangulation_distances(
df_input=df_input,
id=cellid,
x_pos=x_pos,
y_pos=y_pos,
cell_type=cell_type,
region=region,
num_cores=num_cores,
correct_dtype=correct_dtype,
)
if key_name is None:
triDist_keyname = "triDist"
adata.uns["triDist_keyname"] = triangulation_distances
print("Save triangulation distances output to anndata.uns " + triDist_keyname)
print("Permuting data labels to obtain the randomly distributed distances!")
print("this step can take awhile")
iterative_triangulation_distances = tl_iterate_tri_distances(
df_input=df_input,
id=cellid,
x_pos=x_pos,
y_pos=y_pos,
cell_type=cell_type,
region=region,
num_cores=num_cores,
num_iterations=num_iterations,
)
# append result to adata
if triDist_keyname is None:
triDist_keyname = "iTriDist_" + str(num_iterations)
adata.uns[triDist_keyname] = iterative_triangulation_distances
print(
"Save iterative triangulation distance output to anndata.uns " + triDist_keyname
)
metadata = df_input.loc[:, ["unique_region", comparison]].copy()
# Reformat observed dataset
triangulation_distances_long = add_missing_columns(
triangulation_distances, metadata, shared_column=region
)
observed_distances = (
triangulation_distances_long.query("distance <= @distance_threshold")
.groupby(["celltype1_index", "celltype1", "celltype2", comparison, region])
.agg(mean_per_cell=("distance", "mean"))
.reset_index()
.groupby(["celltype1", "celltype2", comparison])
.agg(observed=("mean_per_cell", list), observed_mean=("mean_per_cell", "mean"))
.reset_index()
)
# Reformat expected dataset
iterated_triangulation_distances_long = add_missing_columns(
iterative_triangulation_distances, metadata, shared_column=region
)
expected_distances = (
iterated_triangulation_distances_long.query("mean_dist <= @distance_threshold")
.groupby(["celltype1", "celltype2", comparison])
.agg(expected=("mean_dist", list), expected_mean=("mean_dist", "mean"))
.reset_index()
)
# Drop comparisons with low numbers of observations
observed_distances["keep"] = observed_distances["observed"].apply(
lambda x: len(x) > min_observed
)
observed_distances = observed_distances[observed_distances["keep"]]
expected_distances["keep"] = expected_distances["expected"].apply(
lambda x: len(x) > min_observed
)
expected_distances = expected_distances[expected_distances["keep"]]
# concatenate observed and expected distances
distance_pvals = expected_distances.merge(
observed_distances, on=["celltype1", "celltype2", comparison], how="left"
)
distance_pvals = expected_distances.merge(
observed_distances, on=["celltype1", "celltype2", comparison], how="left"
)
distance_pvals["pvalue"] = distance_pvals.apply(calculate_pvalue, axis=1)
distance_pvals["logfold_group"] = np.log2(
distance_pvals["observed_mean"] / distance_pvals["expected_mean"]
)
distance_pvals["interaction"] = (
distance_pvals["celltype1"] + " --> " + distance_pvals["celltype2"]
)
# drop na from distance_pvals
# distance_pvals = distance_pvals.dropna()
return distance_pvals
[docs]
def filter_interactions(
distance_pvals, pvalue=0.05, logfold_group_abs=0.1, comparison="condition"
):
"""
Filters interactions based on p-value, logfold change, and other conditions.
Parameters
----------
distance_pvals : pandas.DataFrame
DataFrame containing p-values, logfold changes, and interactions for each comparison.
pvalue : float, optional
The maximum p-value to consider for significance. Defaults to 0.05.
logfold_group_abs : float, optional
The minimum absolute logfold change to consider for significance. Defaults to 0.1.
comparison : str, optional
The comparison condition to filter by. Defaults to "condition".
Returns
-------
dist_table : pandas.DataFrame
DataFrame containing logfold changes sorted into two columns by the comparison condition.
distance_pvals_sig_sub : pandas.DataFrame
Subset of the original DataFrame containing only significant interactions based on the specified conditions.
"""
# calculate absolute logfold difference
distance_pvals["logfold_group_abs"] = distance_pvals["logfold_group"].abs()
# Creating pairs
distance_pvals["pairs"] = (
distance_pvals["celltype1"] + "_" + distance_pvals["celltype2"]
)
# Filter significant p-values and other specified conditions
distance_pvals_sig = distance_pvals[
(distance_pvals["pvalue"] < pvalue)
& (distance_pvals["celltype1"] != distance_pvals["celltype2"])
& (~distance_pvals["observed_mean"].isna())
& (distance_pvals["logfold_group_abs"] > logfold_group_abs)
]
# Assuming distance_pvals_interesting2 is a pandas DataFrame with the same structure as the R dataframe.
# pair_to = distance_pvals_sig["interaction"].unique()
pairs = distance_pvals_sig["pairs"].unique()
# Filtering data
data = distance_pvals[~distance_pvals["interaction"].isna()]
# Subsetting data
distance_pvals_sig_sub = data[data["pairs"].isin(pairs)]
distance_pvals_sig_sub_reduced = distance_pvals_sig_sub.loc[
:, [comparison, "logfold_group", "pairs"]
].copy()
# set pairs as index
distance_pvals_sig_sub_reduced = distance_pvals_sig_sub_reduced.set_index("pairs")
# sort logfold_group into two columns by tissue
dist_table = distance_pvals_sig_sub_reduced.pivot(
columns=comparison, values="logfold_group"
)
dist_table.dropna(inplace=True)
return dist_table, distance_pvals_sig_sub
# Function for patch identification
## Adjust clustering parameter to get the desired number of clusters
def apply_dbscan_clustering(df, min_cluster_size=10):
"""
Apply DBSCAN clustering to a dataframe and update the cluster labels in the original dataframe.
Parameters
----------
df : pandas.DataFrame
The dataframe to be clustered.
min_cluster_size : int, optional
The number of samples in a neighborhood for a point to be considered as a core point, by default 10
Returns
-------
None
"""
# Initialize a new column for cluster labels
df["cluster"] = -1
# Apply DBSCAN clustering
hdbscan = HDBSCAN(
min_samples=None,
min_cluster_size=min_cluster_size,
cluster_selection_epsilon=0.0,
max_cluster_size=None,
metric="euclidean",
alpha=1.0,
cluster_selection_method="eom",
allow_single_cluster=False,
)
labels = hdbscan.fit_predict(df[["x", "y"]])
# Number of clusters in labels, ignoring noise if present.
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
n_noise_ = list(labels).count(-1)
print("Estimated number of clusters: %d" % n_clusters_)
print("Estimated number of noise points: %d" % n_noise_)
# Update the cluster labels in the original dataframe
df.loc[df.index, "cluster"] = labels
# plot points and identify points in radius of selected points
def plot_selected_neighbors_with_shapes(
full_df,
selected_df,
target_df,
radius,
plot=True,
identification_column="community",
):
# Get unique clusters from the full DataFrame
unique_clusters = full_df[identification_column].unique()
# DataFrame to store points within the circle but from a different cluster
all_in_circle_diff_cluster = []
# Loop through selected points
for _, row in selected_df.iterrows():
# Calculate distances from each point in the target DataFrame to the selected point
distances = np.linalg.norm(
target_df[["x", "y"]].values - np.array([row["x"], row["y"]]), axis=1
)
# Identify points within the circle and from a different cluster
in_circle = distances <= radius
diff_cluster = target_df[identification_column] != row[identification_column]
in_circle_diff_cluster = target_df[in_circle & diff_cluster]
# Append the result to the list
all_in_circle_diff_cluster.append(in_circle_diff_cluster)
# Plot the points with a different shape if plot is True
if plot:
plt.scatter(
in_circle_diff_cluster["x"],
in_circle_diff_cluster["y"],
facecolors="none",
edgecolors="#DC0000B2",
marker="*",
s=100,
zorder=5,
label="Cell within proximity",
)
# Concatenate the list of DataFrames into a single result DataFrame
all_in_circle_diff_cluster = pd.concat(
all_in_circle_diff_cluster, ignore_index=True
)
# Plot selected points in yellow and draw circles around them if plot is True
if plot:
plt.scatter(
selected_df["x"],
selected_df["y"],
color="#3C5488B2",
label="Boarder cells",
s=100,
edgecolor="black",
zorder=6,
)
for _, row in selected_df.iterrows():
circle = plt.Circle(
(row["x"], row["y"]),
radius,
color="#3C5488B2",
fill=False,
linestyle="--",
alpha=0.5,
)
plt.gca().add_patch(circle)
# Set plot labels and title
plt.xlabel("X")
plt.ylabel("Y")
plt.title(f"Cells within {radius} radius")
plt.grid(False)
plt.axis("equal")
# Place the legend outside the plot
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))
plt.legend(
by_label.values(),
by_label.keys(),
loc="center left",
bbox_to_anchor=(1, 0.5),
)
plt.tight_layout()
# set figure size
plt.gcf().set_size_inches(15, 5)
plt.show()
# Remove duplicates from the final DataFrame
all_in_circle_diff_cluster = all_in_circle_diff_cluster.drop_duplicates()
return all_in_circle_diff_cluster
def process_cluster(args):
(
df,
cluster,
cluster_column,
x_column,
y_column,
concave_hull_length_threshold,
edge_neighbours,
full_df,
radius,
plot,
identification_column,
) = args
# Filter DataFrame for the current cluster
subset = df.loc[df[cluster_column] == cluster]
points = subset[[x_column, y_column]].values
# Compute concave hull indexes
idxes = concave_hull_indexes(
points[:, :2],
length_threshold=concave_hull_length_threshold,
)
# Get hull points from the DataFrame
hull_points = pd.DataFrame(points[idxes], columns=["x", "y"])
# Find nearest neighbors of hull points in the original DataFrame
nbrs = NearestNeighbors(n_neighbors=edge_neighbours).fit(df[[x_column, y_column]])
distances, indices = nbrs.kneighbors(hull_points[["x", "y"]])
hull_nearest_neighbors = df.iloc[indices.flatten()]
# Plot selected neighbors and get the DataFrame with different clusters in the circle
prox_points = plot_selected_neighbors_with_shapes(
full_df=full_df,
selected_df=hull_nearest_neighbors,
target_df=full_df,
radius=radius,
plot=plot,
identification_column=identification_column,
)
# Add a 'patch_id' column to identify the cluster
prox_points["patch_id"] = cluster
return prox_points, hull_nearest_neighbors
def identify_points_in_proximity(
df,
full_df,
identification_column,
cluster_column="cluster",
x_column="x",
y_column="y",
radius=200,
edge_neighbours=3,
plot=True,
concave_hull_length_threshold=50,
):
num_processes = max(
1, os.cpu_count() - 2
) # Use all available CPUs minus 2, but at least 1
with Pool(processes=num_processes) as pool:
results = pool.map(
process_cluster,
[
(
df,
cluster,
cluster_column,
x_column,
y_column,
concave_hull_length_threshold,
edge_neighbours,
full_df,
radius,
plot,
identification_column,
)
for cluster in set(df[cluster_column]) - {-1}
],
)
# Unpack the results
result_list, outline_list = zip(*results)
# Concatenate the list of DataFrames into a single result DataFrame
if len(result_list) > 0:
result = pd.concat(result_list)
else:
result = pd.DataFrame(columns=["x", "y", "patch_id", identification_column])
if len(outline_list) > 0:
outlines = pd.concat(outline_list)
else:
outlines = pd.DataFrame(columns=["x", "y", "patch_id", identification_column])
return result, outlines
# This function analyzes what is in proximity of a selected group (CN, Celltype, etc...).
[docs]
def patch_proximity_analysis(
adata,
region_column,
patch_column,
group,
min_cluster_size=80,
x_column="x",
y_column="y",
radius=128,
edge_neighbours=3,
plot=True,
savefig=False,
output_dir="./",
output_fname="",
key_name="ppa_result",
plot_color="#6a3d9a",
):
"""
Performs a proximity analysis on patches of a given group within each region of a dataset.
Parameters:
adata (AnnData): The annotated data matrix of shape n_obs x n_vars. Rows correspond to cells and columns to genes.
region_column (str): The name of the column in the DataFrame that contains the region information.
patch_column (str): The name of the column in the DataFrame that contains the patch information.
group (str): The group to perform the proximity analysis on.
min_cluster_size (int, optional): The minimum number of samples required to form a dense region. Default is 80.
x_column (str, optional): The name of the column in the DataFrame that contains the x-coordinate. Default is 'x'.
y_column (str, optional): The name of the column in the DataFrame that contains the y-coordinate. Default is 'y'.
radius (int, optional): The radius within which to identify points in proximity. Default is 128.
edge_neighbours (int, optional): The number of edge neighbours to consider. Default is 3.
plot (bool, optional): Whether to plot the patches. Default is True.
savefig (bool, optional): Whether to save the figure. Default is False.
output_dir (str, optional): The directory to save the figure in. Default is "./".
output_fname (str, optional): The filename to save the figure as. Default is "".
key_name (str, optional): The key name to store the results in the AnnData object. Default is 'ppa_result'.
Returns:
final_results (DataFrame): A DataFrame containing the results of the proximity analysis.
outlines_results (DataFrame): A DataFrame containing the outlines of the patches.
"""
df = adata.obs
for col in df.select_dtypes(["category"]).columns:
df[col] = df[col].astype(str)
# list to store results for each region
region_results = []
outlines = []
for region in df[region_column].unique():
df_region = df[df[region_column] == region].copy()
df_community = df_region[df_region[patch_column] == group].copy()
if df_community.shape[0] < min_cluster_size:
print(f"No {group} in {region}")
continue
else:
apply_dbscan_clustering(df_community, min_cluster_size=min_cluster_size)
# plot patches
if plot:
df_filtered = df_community[df_community["cluster"] != -1]
fig, ax = plt.subplots(figsize=(10, 10))
ax.scatter(df_filtered["x"], df_filtered["y"], c=plot_color, alpha=0.5)
ax.set_title(f"HDBSCAN Clusters for {region}_{group}")
ax.set_xlabel(x_column)
ax.set_ylabel(y_column)
ax.grid(False)
ax.axis("equal")
if savefig:
fig.savefig(
output_dir
+ output_fname
+ "_"
+ str(region)
+ "_patch_proximity.pdf",
bbox_inches="tight",
)
else:
plt.show()
results, hull_nearest_neighbors = identify_points_in_proximity(
df=df_community,
full_df=df_region,
cluster_column="cluster",
identification_column=patch_column,
x_column=x_column,
y_column=y_column,
radius=radius,
edge_neighbours=edge_neighbours,
plot=plot,
)
# add hull_nearest_neighbors to list
outlines.append(hull_nearest_neighbors)
print(f"Finished {region}_{group}")
# append to region_results
region_results.append(results)
# Concatenate all results into a single DataFrame
final_results = pd.concat(region_results)
outlines_results = pd.concat(outlines)
# generate new column named unique_patch_ID that combines the region, group and patch ID
final_results["unique_patch_ID"] = (
final_results[region_column]
+ "_"
+ final_results[patch_column]
+ "_"
+ "patch_no_"
+ final_results["patch_id"].astype(str)
)
adata.uns[key_name] = final_results
return final_results, outlines_results
def stellar_get_tonsilbe_edge_index(pos, distance_thres):
"""
Constructs edge indexes in one region based on pairwise distances and a distance threshold.
Parameters:
pos (array-like): An array-like object of shape (n_samples, n_features) representing the positions.
distance_thres (float): The distance threshold. Pairs of positions with distances less than this threshold will be considered as edges.
Returns:
edge_list (list): A list of lists where each inner list contains two indices representing an edge.
"""
# construct edge indexes in one region
edge_list = []
dists = pairwise_distances(pos)
dists_mask = dists < distance_thres
np.fill_diagonal(dists_mask, 0)
edge_list = np.transpose(np.nonzero(dists_mask)).tolist()
return edge_list
[docs]
def adata_stellar(
adata_train,
adata_unannotated,
celltype_col="coarse_anno3",
x_col="x",
y_col="y",
sample_rate=0.5,
distance_thres=50,
epochs=50,
key_added="stellar_pred",
STELLAR_path="",
):
"""
Applies the STELLAR algorithm to the given annotated and unannotated data.
Parameters:
adata_train (AnnData): The annotated data.
adata_unannotated (AnnData): The unannotated data.
celltype_col (str, optional): The column name for cell types in the annotated data. Defaults to 'coarse_anno3'.
x_col (str, optional): The column name for x coordinates in the data. Defaults to 'x'.
y_col (str, optional): The column name for y coordinates in the data. Defaults to 'y'.
sample_rate (float, optional): The rate at which to sample the training data. Defaults to 0.5.
distance_thres (int, optional): The distance threshold for constructing edge indexes. Defaults to 50.
key_added (str, optional): The key to be added to the unannotated data's obs dataframe for the predicted results. Defaults to 'stellar_pred'.
Returns:
adata (AnnData): The unannotated data with the added key for the predicted results.
"""
sys.path.append(str(STELLAR_path))
from datasets import GraphDataset
from STELLAR import STELLAR
from utils import prepare_save_dir
parser = argparse.ArgumentParser(description="STELLAR")
parser.add_argument(
"--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
)
parser.add_argument("--name", type=str, default="STELLAR")
parser.add_argument("--epochs", type=int, default=50)
parser.add_argument("--lr", type=float, default=1e-3)
parser.add_argument("--wd", type=float, default=5e-2)
parser.add_argument("--input-dim", type=int, default=26)
parser.add_argument("--num-heads", type=int, default=13)
parser.add_argument("--num-seed-class", type=int, default=3)
parser.add_argument("--sample-rate", type=float, default=0.5)
parser.add_argument(
"-b", "--batch-size", default=1, type=int, metavar="N", help="mini-batch size"
)
parser.add_argument("--distance_thres", default=50, type=int)
parser.add_argument("--savedir", type=str, default="./")
args = parser.parse_args(args=[])
args.cuda = torch.cuda.is_available()
args.device = torch.device("cuda" if args.cuda else "cpu")
args.epochs = 50
# prepare input data
print("Preparing input data")
train_df = adata_train.to_df()
# add to train_df
positions_celltype = adata_train.obs[[x_col, y_col, celltype_col]]
train_df = pd.concat([train_df, positions_celltype], axis=1)
train_df = train_df.sample(n=round(sample_rate * len(train_df)), random_state=1)
train_X = train_df.iloc[:, 0:-3].values
test_X = adata_unannotated.to_df().values
train_y = train_df[celltype_col].str.lower()
train_y
labeled_pos = train_df.iloc[
:, -3:-1
].values # x,y coordinates, indexes depend on specific datasets
unlabeled_pos = adata_unannotated.obs[[x_col, y_col]].values
cell_types = np.sort(list(set(train_y))).tolist()
cell_types
cell_type_dict = {}
inverse_dict = {}
for i, cell_type in enumerate(cell_types):
cell_type_dict[cell_type] = i
inverse_dict[i] = cell_type
train_y = np.array([cell_type_dict[x] for x in train_y])
labeled_edges = stellar_get_tonsilbe_edge_index(
labeled_pos, distance_thres=distance_thres
)
unlabeled_edges = stellar_get_tonsilbe_edge_index(
unlabeled_pos, distance_thres=distance_thres
)
# build dataset
print("Building dataset")
dataset = GraphDataset(train_X, train_y, test_X, labeled_edges, unlabeled_edges)
# run stellar
print("Running STELLAR")
stellar = STELLAR(args, dataset)
stellar.train()
_, results = stellar.pred()
results = results.astype("object")
for i in range(len(results)):
if results[i] in inverse_dict.keys():
results[i] = inverse_dict[results[i]]
adata_unannotated.obs[key_added] = pd.Categorical(results)
# make stellar_pred a string
adata_unannotated.obs["stellar_pred"] = adata_unannotated.obs[
"stellar_pred"
].astype(str)
return adata_unannotated
[docs]
def ml_train(
adata_train,
label,
test_size=0.33,
random_state=0,
model="svm",
nan_policy_y="raise",
showfig=True,
figsize=(10, 8),
):
"""
Train a svm model on the provided data.
Parameters
----------
adata_train : AnnData
The training data as an AnnData object.
label : str
The label to predict.
test_size : float, optional
The proportion of the dataset to include in the test split, by default 0.33.
random_state : int, optional
The seed used by the random number generator, by default 0.
model : str, optional
The type of model to train, by default "svm".
nan_policy_y : str, optional
How to handle NaNs in the label, by default "raise". Can be either 'omit' or 'raise'.
showfig : bool, optional
Whether to show the confusion matrix as a heatmap, by default True.
Returns
-------
SVC
The trained Support Vector Classifier model.
Raises
------
ValueError
If `nan_policy_y` is not 'omit' or 'raise'.
"""
X = pd.DataFrame(adata_train.X)
y = adata_train.obs[label].values
if nan_policy_y == "omit":
y_msk = ~y.isna()
X = X[y_msk]
y = y[y_msk]
elif nan_policy_y == "raise":
pass
else:
raise ValueError("nan_policy_y must be either 'omit' or 'raise'")
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
print(y.unique().sort_values())
print("Training now!")
svc = SVC(kernel="linear", probability=True)
svc.fit(X_train, y_train)
pred = []
y_prob = svc.predict_proba(X_test)
y_prob = pd.DataFrame(y_prob)
y_prob.columns = svc.classes_
svm_label = y_prob.idxmax(axis=1, skipna=True)
target_names = svc.classes_
print("Evaluating now!")
svm_eval = classification_report(
y_true=y_test, y_pred=svm_label, target_names=target_names, output_dict=True
)
if showfig:
plt.figure(figsize=figsize)
sns.heatmap(pd.DataFrame(svm_eval).iloc[:-1, :].T, annot=True)
plt.show()
return svc
[docs]
def ml_predict(adata_val, svc, save_name="svm_pred", return_prob_mat=False):
"""
Predict labels for a given dataset using a trained Support Vector Classifier (SVC) model.
Parameters
----------
adata_val : AnnData
The validation data as an AnnData object.
svc : SVC
The trained Support Vector Classifier model.
save_name : str, optional
The name under which the predictions will be saved in the AnnData object, by default "svm_pred".
return_prob_mat : bool, optional
Whether to return the probability matrix, by default False.
Returns
-------
DataFrame or None
If `return_prob_mat` is True, returns a DataFrame with the probability matrix. Otherwise, returns None.
"""
print("Classifying!")
X_val = pd.DataFrame(adata_val.X)
y_prob_val = svc.predict_proba(X_val)
y_prob_val = pd.DataFrame(y_prob_val)
y_prob_val.columns = svc.classes_
svm_label_val = y_prob_val.idxmax(axis=1, skipna=True)
svm_label_val.index = X_val.index
print("Saving cell type labels to adata!")
adata_val.obs[save_name] = svm_label_val.values
if return_prob_mat:
print("Returning probability matrix!")
y_prob_val.columns = svc.classes_
svm_label_val = y_prob_val.idxmax(axis=1, skipna=True)
return svm_label_val
class ImageProcessor:
"""
A class used to process images and compute channel means and sums.
...
Attributes
----------
flatmasks : ndarray
2D numpy array containing masks for each cell.
Methods
-------
update_adjacency_value(adjacency_matrix, original, neighbor):
Updates the adjacency matrix based on the original and neighbor values.
update_adjacency_matrix(plane_mask_flattened, width, height, adjacency_matrix, index):
Updates the adjacency matrix based on the flattened plane mask.
compute_channel_means_sums_compensated(image):
Computes the channel means and sums for each cell and compensates them.
"""
def __init__(self, flatmasks):
"""
Constructs all the necessary attributes for the ImageProcessor object.
Parameters
----------
flatmasks : ndarray
2D numpy array containing masks for each cell.
"""
self.flatmasks = flatmasks
def update_adjacency_value(self, adjacency_matrix, original, neighbor):
# This function is copied from CellSeg
"""
Updates the adjacency matrix based on the original and neighbor values.
Parameters
----------
adjacency_matrix : ndarray
2D numpy array representing the adjacency matrix.
original : int
Original value.
neighbor : int
Neighbor value.
Returns
-------
bool
True if the original and neighbor values are different and not zero, False otherwise.
"""
border = False
if original != 0 and original != neighbor:
border = True
if neighbor != 0:
adjacency_matrix[int(original - 1), int(neighbor - 1)] += 1
return border
def update_adjacency_matrix(
self, plane_mask_flattened, width, height, adjacency_matrix, index
):
# This function is copied from CellSeg
"""
Updates the adjacency matrix based on the flattened plane mask.
Parameters
----------
plane_mask_flattened : ndarray
1D numpy array representing the flattened plane mask.
width : int
Width of the plane mask.
height : int
Height of the plane mask.
adjacency_matrix : ndarray
2D numpy array representing the adjacency matrix.
index : int
Index of the current cell in the flattened plane mask.
"""
mod_value_width = index % width
origin_mask = plane_mask_flattened[index]
left, right, up, down = False, False, False, False
if mod_value_width != 0:
left = self.update_adjacency_value(
adjacency_matrix, origin_mask, plane_mask_flattened[index - 1]
)
if mod_value_width != width - 1:
right = self.update_adjacency_value(
adjacency_matrix, origin_mask, plane_mask_flattened[index + 1]
)
if index >= width:
up = self.update_adjacency_value(
adjacency_matrix, origin_mask, plane_mask_flattened[index - width]
)
if index <= len(plane_mask_flattened) - 1 - width:
down = self.update_adjacency_value(
adjacency_matrix, origin_mask, plane_mask_flattened[index + width]
)
if left or right or up or down:
adjacency_matrix[int(origin_mask - 1), int(origin_mask - 1)] += 1
def compute_channel_means_sums_compensated(self, image):
# This function is copied from CellSeg but modified to solve the least squares problem with torch instead of numpy
"""
Computes the channel means and sums for each cell and compensates them.
Parameters
----------
image : ndarray
3D numpy array representing the image.
Returns
-------
compensated_means : ndarray
2D numpy array representing the compensated means for each cell.
means : ndarray
2D numpy array representing the means for each cell.
channel_counts : ndarray
1D numpy array representing the counts for each cell.
"""
height, width, n_channels = image.shape
mask_height, mask_width = self.flatmasks.shape
n_masks = len(np.unique(self.flatmasks)) - 1
channel_sums = np.zeros((n_masks, n_channels))
channel_counts = np.zeros((n_masks, n_channels))
if n_masks == 0:
return channel_sums, channel_sums, channel_counts
squashed_image = np.reshape(image, (height * width, n_channels))
# masklocs = np.nonzero(self.flatmasks)
# plane_mask = np.zeros((mask_height, mask_width), dtype = np.uint32)
# plane_mask[masklocs[0], masklocs[1]] = masklocs[2] + 1
# plane_mask = plane_mask.flatten()
plane_mask = self.flatmasks.flatten()
adjacency_matrix = np.zeros((n_masks, n_masks))
for i in range(len(plane_mask)):
self.update_adjacency_matrix(
plane_mask, mask_width, mask_height, adjacency_matrix, i
)
mask_val = plane_mask[i] - 1
if mask_val != -1:
channel_sums[mask_val.astype(np.int32)] += squashed_image[i]
channel_counts[mask_val.astype(np.int32)] += 1
# Normalize adjacency matrix
for i in range(n_masks):
adjacency_matrix[i] = adjacency_matrix[i] / (
max(adjacency_matrix[i, i], 1) * 2
)
adjacency_matrix[i, i] = 1
means = np.true_divide(
channel_sums,
channel_counts,
out=np.zeros_like(channel_sums, dtype="float"),
where=channel_counts != 0,
)
# Convert your numpy arrays to PyTorch tensors
adjacency_matrix_torch = torch.from_numpy(adjacency_matrix)
means_torch = torch.from_numpy(means)
# Solve the least squares problem
results_torch = torch.linalg.lstsq(adjacency_matrix_torch, means_torch).solution
# Convert the result back to a numpy array if needed
# Convert the result back to a numpy array if needed
results = results_torch.numpy()
compensated_means = np.maximum(results, np.zeros(results.shape))
return compensated_means, means, channel_counts[:, 0]
def compensate_cell_matrix(df, image_dict, masks, overwrite=True):
"""
Compensate cell matrix by computing channel means and sums.
Parameters
----------
df : DataFrame
The DataFrame to which the compensated means will be added.
image_dict : dict
Dictionary containing images for each channel.
masks : ndarray
3D numpy array containing masks for each cell.
overwrite : bool, optional
If True, overwrite existing columns in df. If False, add new columns to df. Default is True.
Returns
-------
DataFrame
The DataFrame with added compensated means.
Notes
-----
The function computes the channel means and sums for each cell, compensates them, and adds them to the DataFrame.
The compensated means are added to the DataFrame with column names from the keys of the image_dict.
If overwrite is True, existing columns in the DataFrame are overwritten. If overwrite is False, new columns are added to the DataFrame.
"""
masks = masks.squeeze()
image_list = [image_dict[channel_name] for channel_name in image_dict.keys()]
# Stack the 2D numpy arrays along the third dimension to create a 3D numpy array
image = np.stack(image_list, axis=-1)
# Now you can use `image` as the input for the function
processor = ImageProcessor(masks)
(
compensated_means,
means,
channel_counts,
) = processor.compute_channel_means_sums_compensated(image)
# Get the keys
keys = list(image_dict.keys())
# Cycle over the keys
for i in range(len(keys)):
# Add the compensated_means to the DataFrame with column names from keys
if overwrite == True:
df[keys[i]] = compensated_means[:, i]
else:
df[keys[i] + "_compensated"] = compensated_means[:, i]
return df
def masks_to_outlines_scikit_image(masks):
"""get outlines of masks as a 0-1 array
Parameters
----------------
masks: int, 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], 0=NO masks; 1,2,...=mask labels
Returns
----------------
outlines: 2D or 3D array
size [Ly x Lx] or [Lz x Ly x Lx], True pixels are outlines
"""
if masks.ndim > 3 or masks.ndim < 2:
raise ValueError(
"masks_to_outlines takes 2D or 3D array, not %dD array" % masks.ndim
)
if masks.ndim == 3:
outlines = np.zeros(masks.shape, bool)
for i in range(masks.shape[0]):
outlines[i] = find_boundaries(masks[i], mode="inner")
return outlines
else:
return find_boundaries(masks, mode="inner")
[docs]
def tm_viewer(
adata,
images_pickle_path,
directory,
region_column="unique_region",
region="",
xSelector="x",
ySelector="y",
color_by="celltype_fine",
keep_list=None,
include_masks=True,
open_viewer=True,
add_UMAP=True,
):
segmented_matrix = adata.obs
with open(images_pickle_path, "rb") as f:
seg_output = pickle.load(f)
image_dict = seg_output["image_dict"]
masks = seg_output["masks"]
if keep_list == None:
keep_list = [region_column, xSelector, ySelector, color_by]
print("Preparing TissUUmaps input...")
cache_dir = pathlib.Path(directory) / region
cache_dir.mkdir(parents=True, exist_ok=True)
# only keep columns in keep_list
segmented_matrix = segmented_matrix[keep_list]
if add_UMAP == True:
# add UMAP coordinates to segmented_matrix
segmented_matrix["UMAP_1"] = adata.obsm["X_umap"][:, 0]
segmented_matrix["UMAP_2"] = adata.obsm["X_umap"][:, 1]
csv_paths = []
# separate matrix by region and save every region as single csv file
region_matrix = segmented_matrix.loc[segmented_matrix[region_column] == region]
region_matrix.to_csv(cache_dir / (region + ".csv"))
csv_paths.append(cache_dir / (region + ".csv"))
# generate subdirectory for images
image_dir = cache_dir / "images"
image_dir.mkdir(parents=True, exist_ok=True)
image_list = []
# save every image as tif file in image directory from image_dict. name by key in image_dict
for key, image in image_dict.items():
file_path = os.path.join(image_dir, f"{key}.tif")
imsave(file_path, image, check_contrast=False)
image_list.append(file_path)
if include_masks == True:
# select first item from image_dict as reference image
reference_image = list(image_dict.values())[0]
# make reference image black by setting all values to 0
reference_image = np.zeros_like(reference_image)
# make the reference image rgb. Add empty channels
if len(reference_image.shape) == 2:
reference_image = np.expand_dims(reference_image, axis=-1)
reference_image = np.repeat(reference_image, 3, axis=-1)
# remove last dimension from masks
masks_3d = np.squeeze(masks)
outlines = masks_to_outlines_scikit_image(masks_3d)
reference_image[outlines == True] = [255, 0, 0]
file_path = os.path.join(image_dir, "masks.jpg")
# save black pixel as transparent
reference_image = reference_image.astype(np.uint8)
imsave(file_path, reference_image)
image_list.append(file_path)
if open_viewer == True:
print("Opening TissUUmaps viewer...")
tj.loaddata(
images=image_list,
csvFiles=[str(p) for p in csv_paths],
xSelector=xSelector,
ySelector=ySelector,
keySelector=color_by,
nameSelector=color_by,
colorSelector=color_by,
piechartSelector=None,
shapeSelector=None,
scaleSelector=None,
fixedShape=None,
scaleFactor=1,
colormap=None,
compositeMode="source-over",
boundingBox=None,
port=5100,
host="localhost",
height=900,
tmapFilename=region + "_project",
plugins=[
"Plot_Histogram",
"Points2Regions",
"Spot_Inspector",
"Feature_Space",
"ClassQC",
],
)
return image_list, csv_paths
def install_gpu_leiden(CUDA="12"):
"""
Install the necessary packages for GPU-accelerated Leiden clustering.
Parameters
----------
CUDA : str, optional
The version of CUDA to use for the installation. Options are '11' and '12'. Default is '12'.
Returns
-------
None
Notes
-----
This function runs a series of pip install commands to install the necessary packages. The specific packages and versions installed depend on the CUDA
version. The function prints the output and any errors from each command.
"""
if platform.system() != "Linux":
print("This feature is currently only supported on Linux.")
else:
print("installing rapids_singlecell")
# Define the commands to run
if CUDA == "11":
commands = [
"pip install rapids-singlecell==0.9.5",
"pip install --extra-index-url=https://pypi.nvidia.com cudf-cu11==24.2.* dask-cudf-cu11==24.2.* cuml-cu11==24.2.* cugraph-cu11==24.2.* cuspatial-cu11==24.2.* cuproj-cu11==24.2.* cuxfilter-cu11==24.2.* cucim-cu11==24.2.* pylibraft-cu11==24.2.* raft-dask-cu11==24.2.*",
"pip install protobuf==3.20",
]
else:
commands = [
"pip install rapids-singlecell==0.9.5",
"pip install --extra-index-url=https://pypi.nvidia.com cudf-cu12==24.2.* dask-cudf-cu12==24.2.* cuml-cu12==24.2.* cugraph-cu12==24.2.* cuspatial-cu12==24.2.* cuproj-cu12==24.2.* cuxfilter-cu12==24.2.* cucim-cu12==24.2.* pylibraft-cu12==24.2.* raft-dask-cu12==24.2.*",
"pip install protobuf==3.20",
]
# Run each command
for command in commands:
process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# Print the output and error, if any
if stdout:
print(f"Output:\n{stdout.decode()}")
if stderr:
print(f"Error:\n{stderr.decode()}")
def anndata_to_GPU(
adata: AnnData,
layer: str | None = None,
convert_all: bool = False,
) -> AnnData:
"""
Transfers matrices and arrays to the GPU
Parameters
----------
adata
AnnData object
layer
Layer to use as input instead of `X`. If `None`, `X` is used.
convert_all
If True, move all supported arrays and matrices on the GPU
Returns
-------
Returns an updated copy with data on GPU
"""
adata_gpu = adata.copy()
if convert_all:
anndata_to_GPU(adata_gpu)
if adata_gpu.layers:
for key in adata_gpu.layers.keys():
anndata_to_GPU(adata_gpu, layer=key)
else:
X = _get_obs_rep(adata_gpu, layer=layer)
if isspmatrix_csr_cpu(X):
X = csr_matrix_gpu(X)
elif isspmatrix_csc_cpu(X):
X = csc_matrix_gpu(X)
elif isinstance(X, np.ndarray):
# Convert to CuPy array only when necessary for GPU computations
X_gpu = cp.asarray(X)
X = X_gpu
else:
error = layer if layer else "X"
warnings.warn(f"{error} not supported for GPU conversion", Warning)
_set_obs_rep(adata_gpu, X, layer=layer)
return adata_gpu
def anndata_to_CPU(
adata: AnnData,
layer: str | None = None,
convert_all: bool = False,
copy: bool = False,
) -> AnnData | None:
"""
Transfers matrices and arrays from the GPU
Parameters
----------
adata
AnnData object
layer
Layer to use as input instead of `X`. If `None`, `X` is used.
convert_all
If True, move all GPU based arrays and matrices to the host memory
copy
Whether to return a copy or update `adata`.
Returns
-------
Updates `adata` inplace or returns an updated copy
"""
if copy:
adata = adata.copy()
if convert_all:
anndata_to_CPU(adata)
if adata.layers:
for key in adata.layers.keys():
anndata_to_CPU(adata, layer=key)
else:
X = _get_obs_rep(adata, layer=layer)
if isspmatrix_csr_gpu(X):
X = X.get()
elif isspmatrix_csc_gpu(X):
X = X.get()
elif isinstance(X, cp.ndarray):
X = X.get()
else:
pass
_set_obs_rep(adata, X, layer=layer)
if copy:
return adata
def install_stellar(CUDA=12):
if CUDA == 12:
subprocess.run(["pip3", "install", "torch"], check=True)
subprocess.run(["pip", "install", "torch_geometric"], check=True)
subprocess.run(
[
"pip",
"install",
"pyg_lib",
"torch_scatter",
"torch_sparse",
"torch_cluster",
"torch_spline_conv",
"-f",
"https://data.pyg.org/whl/torch-2.3.0+cu121.html",
],
check=True,
)
elif CUDA == 11.8:
subprocess.run(
[
"pip3",
"install",
"torch",
"--index-url",
"https://download.pytorch.org/whl/cu118",
],
check=True,
)
subprocess.run(["pip", "install", "torch_geometric"], check=True)
subprocess.run(
[
"pip",
"install",
"pyg_lib",
"torch_scatter",
"torch_sparse",
"torch_cluster",
"torch_spline_conv",
"-f",
"https://data.pyg.org/whl/torch-2.3.0+cu118.html",
],
check=True,
)
else:
print("Please choose between CUDA 12 or 11.8")
print(
"If neither is working for you check the installation guide at: https://pytorch.org/get-started/locally/ and https://pytorch-geometric.readthedocs.io/en/latest/install/installation.html"
)