# load required packages
import os
import random
import time
import warnings
from functools import reduce
from typing import TYPE_CHECKING
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scanpy as sc
import scipy as sp
import seaborn as sns
import tensorflow as tf
import tifffile as tiff
from cellpose.core import use_gpu
from scipy.spatial.distance import cdist
from scipy.stats import pearsonr
from sklearn.cross_decomposition import CCA
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm
# sns.set_style("ticks")
# helper functions
############################################################
def hf_generate_random_colors(n, rand_seed=0):
# from random import randint
random.seed(rand_seed)
color = []
for i in range(n):
color.append("#%06X" % random.randint(0, 0xFFFFFF))
return color
#########
def hf_assign_colors(names, colors):
# Printing original keys-value lists
print("Original key list is : " + str(names))
print("Original value list is : " + str(colors))
# using naive method
# to convert lists to dictionary
res = {}
for key in names:
for value in colors:
res[key] = value
colors.remove(value)
break
# Printing resultant dictionary
print("Resultant dictionary is : " + str(res))
l = list(res.keys())
SMALL_SIZE = 14
MEDIUM_SIZE = 16
BIGGER_SIZE = 18
# Settings for graph
plt.rc("font", size=SMALL_SIZE) # controls default text sizes
plt.rc("axes", titlesize=SMALL_SIZE) # fontsize of the axes title
plt.rc("axes", labelsize=MEDIUM_SIZE) # fontsize of the x and y labels
plt.rc("xtick", labelsize=SMALL_SIZE) # fontsize of the tick labels
plt.rc("ytick", labelsize=SMALL_SIZE) # fontsize of the tick labels
plt.rc("legend", fontsize=SMALL_SIZE) # legend fontsize
plt.rc("figure", titlesize=BIGGER_SIZE) # fontsize of the figure title
sns.palplot(res.values())
plt.xticks(range(len(res)), res.keys(), rotation=30, ha="right")
# plt.savefig(save_path+'color_legend.png', dpi=300)
return res
#########
def hf_per_only(data, grouping, replicate, sub_col, sub_list, per_cat, norm=True):
# Find Percentage of cell type
if norm == True:
test1 = data.loc[
data[sub_col].isin(sub_list)
] # filters df for values by values in sub_list which are in the sub_col column
immune_list = list(
test1[per_cat].unique()
) # stores unique values for the per_cat column
else:
test1 = data.copy()
immune_list = list(data.loc[data[sub_col].isin(sub_list)][per_cat].unique())
test1[per_cat] = test1[per_cat].astype("category")
test_freq = test1.groupby([grouping, replicate]).apply(
lambda x: x[per_cat].value_counts(normalize=True, sort=False) * 100
) # group data by grouping variable and replicates, than applies the lambda function to count the frequency of each category in the per_cat column and normalizes by dividing by the total count.
test_freq.columns = test_freq.columns.astype(str)
test_freq.reset_index(inplace=True)
immune_list.extend(
[grouping, replicate]
) # adds grouping and replicate column to immune_list
test_freq1 = test_freq[immune_list] # subsets test_freq by immune_list
melt_per_plot = pd.melt(
test_freq1, id_vars=[grouping, replicate]
) # ,value_vars=immune_list) #converts columns specified in id_vars into rows
melt_per_plot.rename(
columns={"value": "percentage"}, inplace=True
) # rename value to percentage
return melt_per_plot # returns a df which contains the group_column followed by the replicate column and the per category column, and a column specifying the percentage
# Example: percentage CD4+ TCs in unique region E08 assigned to community xxx
########
def hf_normalize(X):
arr = np.array(X.fillna(0).values)
return pd.DataFrame(
np.log2(1e-3 + arr / arr.sum(axis=1, keepdims=True)),
index=X.index.values,
columns=X.columns,
).fillna(0)
########
def hf_cell_types_de_helper(
df,
ID_component1,
ID_component2,
neighborhood_col,
group_col,
group_dict,
cell_type_col,
):
# read data
cells2 = df
cells2.reset_index(inplace=True, drop=True)
cells2
# generate unique ID
cells2["donor_tis"] = cells2[ID_component1] + "_" + cells2[ID_component2]
# This code is creating a dictionary called neigh_num that maps each unique value
# in the Neighborhood column of a pandas DataFrame cells2 to a unique integer index
# starting from 0.
neigh_num = {
list(cells2[neighborhood_col].unique())[i]: i
for i in range(len(cells2[neighborhood_col].unique()))
}
cells2["neigh_num"] = cells2[neighborhood_col].map(neigh_num)
cells2["neigh_num"].unique()
"""
This Python code is performing the following data transformation operations on a pandas DataFrame named cells2:
The first three lines of code create a dictionary called treatment_dict that maps two specific strings, 'SB' and 'CL', to the integers 0 and 1, respectively. Then, the map() method is used to create a new column called group, where each value in the tissue column is replaced with its corresponding integer value from the treatment_dict dictionary.
The fourth to seventh lines of code create a new dictionary called pat_dict that maps each unique value in the donor_tis column of the cells2 DataFrame to a unique integer index starting from 0. The for loop loops through the range object and assigns each integer to the corresponding unique value in the donor_tis column, creating a dictionary that maps each unique value to a unique integer index.
The last two lines of code create a new column called patients in the cells2 DataFrame, where each value in the donor_tis column is replaced with its corresponding integer index from the pat_dict dictionary. This code assigns these integer indices to each patient in the donor_tis column. The unique() method is used to return an array of unique values in the patients column to verify that each unique value in the donor_tis column has been mapped to a unique integer index in the patients column.
Overall, the code is converting categorical data in the tissue and donor_tis columns to numerical data in the group and patients columns, respectively, which could be useful for certain types of analysis.
"""
# Code treatment/group with number
cells2["group"] = cells2[group_col].map(group_dict)
cells2["group"].unique()
pat_dict = {}
for i in range(len(list(cells2["donor_tis"].unique()))):
pat_dict[list(cells2["donor_tis"].unique())[i]] = i
pat_dict
cells2["patients"] = cells2["donor_tis"].map(pat_dict)
cells2["patients"].unique()
# drop duplicates
pat_gp = cells2[["patients", "group"]].drop_duplicates()
pat_to_gp = {a: b for a, b in pat_gp.values}
# get cell type (ct) frequences per patient
ct_freq1 = cells2.groupby(["patients"]).apply(
lambda x: x[cell_type_col].value_counts(normalize=True, sort=False) * 100
)
# ct_freq = ct_freq1.to_frame()
ct_freq = ct_freq1.unstack().fillna(0)
ct_freq.reset_index(inplace=True)
ct_freq.rename(
columns={"level_1": "cell_type", "Cell Type": "Percentage"}, inplace=True
)
ct_freq
# Get frequences for every neighborhood per patient
all_freqs1 = cells2.groupby(["patients", "neigh_num"]).apply(
lambda x: x[cell_type_col].value_counts(normalize=True, sort=False) * 100
)
# all_freqs = all_freqs1.to_frame()
all_freqs = all_freqs1.unstack().fillna(0)
all_freqs.reset_index(inplace=True)
all_freqs.rename(
columns={"level_2": "cell_type", cell_type_col: "Percentage"}, inplace=True
)
all_freqs
return (cells2, ct_freq, all_freqs, pat_to_gp, neigh_num)
##########
def hf_get_pathcells(query_database, query_dict_list):
"""
Return set of cells that match query_dict path.
"""
out = []
if type(query_dict_list) == dict:
query_dict_list = [query_dict_list]
for query_dict in query_dict_list:
qd = query_database
for k, v in query_dict.items():
if type(v) != list:
v = [v]
qd = qd[qd[k].isin(v)]
out += [qd]
if len(query_database) == 1:
return out[0]
return out
# annotated
"""
def hf_get_pathcells(query_database: Union[Dict, List[Dict]], query_dict_list: List[Dict]) -> Union[Dict, List[Dict]]:
#Return set of cells that match query_dict path.
out: List[Dict] = [] # initialize an empty list to store results
if type(query_dict_list) == dict: # if query_dict_list is a dictionary, convert it into a list
query_dict_list = [query_dict_list]
for query_dict in query_dict_list: # loop through each dictionary in query_dict_list
qd = query_database # initialize a reference to query_database
for k,v in query_dict.items(): # loop through each key-value pair in the current dictionary
if type(v)!=list: # if the value is not a list, convert it into a list
v = [v]
qd = qd[qd[k].isin(v)] # filter the rows of qd based on the key-value pair
out+=[qd] # append the resulting qd to the out list
if len(query_database)==1: # if query_database contains only one row, return the first item in out
return out[0]
return out # otherwise, return the entire out list
"""
class Neighborhoods(object):
def __init__(
self,
cells,
ks,
cluster_col,
sum_cols,
keep_cols,
X="X:X",
Y="Y:Y",
reg="Exp",
add_dummies=True,
):
self.cells_nodumz = cells
self.X = X
self.Y = Y
self.reg = reg
self.keep_cols = keep_cols
self.sum_cols = sum_cols
self.ks = ks
self.cluster_col = cluster_col
self.n_neighbors = max(ks)
self.exps = list(self.cells_nodumz[self.reg].unique())
self.bool_add_dummies = add_dummies
def add_dummies(self):
c = self.cells_nodumz
dumz = pd.get_dummies(c[self.cluster_col])
keep = c[self.keep_cols]
self.cells = pd.concat([keep, dumz], axis=1)
def get_tissue_chunks(self):
self.tissue_group = self.cells[[self.X, self.Y, self.reg]].groupby(self.reg)
tissue_chunks = [
(time.time(), self.exps.index(t), t, a)
for t, indices in self.tissue_group.groups.items()
for a in np.array_split(indices, 1)
]
return tissue_chunks
def make_windows(self, job):
start_time, idx, tissue_name, indices = job
job_start = time.time()
print(
"Starting:", str(idx + 1) + "/" + str(len(self.exps)), ": " + self.exps[idx]
)
tissue = self.tissue_group.get_group(tissue_name)
to_fit = tissue.loc[indices][[self.X, self.Y]].values
fit = NearestNeighbors(n_neighbors=self.n_neighbors + 1).fit(
tissue[[self.X, self.Y]].values
)
m = fit.kneighbors(to_fit)
m = m[0][:, 1:], m[1][:, 1:]
# sort_neighbors
args = m[0].argsort(axis=1)
add = np.arange(m[1].shape[0]) * m[1].shape[1]
sorted_indices = m[1].flatten()[args + add[:, None]]
neighbors = tissue.index.values[sorted_indices]
end_time = time.time()
print(
"Finishing:",
str(idx + 1) + "/" + str(len(self.exps)),
": " + self.exps[idx],
end_time - job_start,
end_time - start_time,
)
return neighbors.astype(np.int32)
def k_windows(self):
if self.bool_add_dummies:
self.add_dummies()
else:
self.cells = self.cells_nodumz
sum_cols = list(self.sum_cols)
for col in sum_cols:
if col in self.keep_cols:
self.cells[col + "_sum"] = self.cells[col]
self.sum_cols.remove(col)
self.sum_cols += [col + "_sum"]
values = self.cells[self.sum_cols].values
tissue_chunks = self.get_tissue_chunks()
tissues = [self.make_windows(job) for job in tissue_chunks]
out_dict = {}
for k in self.ks:
for neighbors, job in zip(tissues, tissue_chunks):
chunk = np.arange(len(neighbors)) # indices
tissue_name = job[2]
indices = job[3]
window = (
values[neighbors[chunk, :k].flatten()]
.reshape(len(chunk), k, len(self.sum_cols))
.sum(axis=1)
)
out_dict[(tissue_name, k)] = (window.astype(np.float16), indices)
windows = {}
for k in self.ks:
window = pd.concat(
[
pd.DataFrame(
out_dict[(exp, k)][0],
index=out_dict[(exp, k)][1].astype(int),
columns=self.sum_cols,
)
for exp in self.exps
],
axis=0,
)
window = window.loc[self.cells.index.values]
window = pd.concat([self.cells[self.keep_cols], window], axis=1)
windows[k] = window
return windows
##########
# Define a Python function named `hf_get_windows` that takes two arguments: `job` and `n_neighbors`.
def hf_get_windows(job, n_neighbors, exps, tissue_group, X, Y):
# Unpack the tuple `job` into four variables: `start_time`, `idx`, `tissue_name`, and `indices`.
start_time, idx, tissue_name, indices = job
# Record the time at which the function starts.
job_start = time.time()
# Print a message indicating the start of the function execution, including the current job index and the corresponding experiment name.
print("Starting:", str(idx + 1) + "/" + str(len(exps)), ": " + exps[idx])
# Retrieve the subset of data that corresponds to the given `tissue_name`.
tissue = tissue_group.get_group(tissue_name)
# Select only the `X` and `Y` columns of the data corresponding to the given `indices`.
to_fit = tissue.loc[indices][[X, Y]].values
# Fit a model with the data that corresponds to the `X` and `Y` columns of the given `tissue`.
fit = NearestNeighbors(n_neighbors=n_neighbors).fit(tissue[[X, Y]].values)
# Find the indices of the `n_neighbors` nearest neighbors of the `to_fit` data points.
# The `m` variable contains the distances and indices of these neighbors.
m = fit.kneighbors(to_fit)
# Sort the `m[1]` array along each row, and store the resulting indices in the `args` variable.
args = m[0].argsort(axis=1)
# Create the `add` variable to offset the indices based on the number of rows in `m[1]`, and store the sorted indices in the `sorted_indices` variable.
add = np.arange(m[1].shape[0]) * m[1].shape[1]
sorted_indices = m[1].flatten()[args + add[:, None]]
# Create the `neighbors` variable by selecting the indices from the `tissue` data frame that correspond to the sorted indices.
neighbors = tissue.index.values[sorted_indices]
# Record the time at which the function ends.
end_time = time.time()
# Print a message indicating the end of the function execution, including the current job index and the corresponding experiment name, and the time it took to execute the function.
print(
"Finishing:",
str(idx + 1) + "/" + str(len(exps)),
": " + exps[idx],
end_time - job_start,
end_time - start_time,
)
# Return the `neighbors` array as an array of 32-bit integers.
return neighbors.astype(np.int32)
###################
def hf_index_rank(a, axis):
"""
returns the index of every index in the sorted array
haven't tested on ndarray yet
"""
arg = np.argsort(a, axis)
return np.arange(a.shape[axis])[np.argsort(arg, axis)]
def hf_znormalize(raw_cells, grouper, markers, clip=(-7, 7), dropinf=True):
not_inf = raw_cells[np.isinf(raw_cells[markers].values).sum(axis=1) == 0]
if not_inf.shape[0] != raw_cells.shape[0]:
print("removing cells with inf values", raw_cells.shape, not_inf.shape)
not_na = not_inf[not_inf[markers].isnull().sum(axis=1) == 0]
if not_na.shape[0] != not_inf.shape[0]:
print("removing cells with nan values", not_inf.shape, not_na.shape)
znorm = not_na.groupby(grouper).apply(
lambda x: (
(x[markers] - x[markers].mean(axis=0)) / x[markers].std(axis=0)
).clip(clip[0], clip[1])
)
Z = not_na.drop(markers, 1).merge(znorm, left_index=True, right_index=True)
return Z
def hf_fast_divisive_cluster(X, num_clusters, metric="cosine", prints=True):
# optimized divisive_cluster. Faster because doesn't recompute distance matrix to centroids at
# each iteration
centroids = np.zeros((num_clusters, X.shape[1])) # fill with cluster centroids
dists = np.zeros((X.shape[0], num_clusters)) # fill with dist matrix
avg_seed = X.mean(axis=0, keepdims=True)
d = cdist(X, avg_seed, metric=metric)
dists[:, 0] = d[:, 0]
c1 = d.argmax()
centroids[0] = X[c1]
for x in range(1, num_clusters):
if x % 10 == 0:
print(x, "clusters")
d = cdist(X, centroids[x - 1][None, :], metric=metric)
dists[:, x] = d[:, 0]
allocs = dists[:, : x + 1].argmin(axis=1)
next_centroid = dists[np.arange(len(dists)), allocs].argmax()
centroids[x] = X[next_centroid]
return centroids, allocs
def hf_alloc_cells(X, centroids, metric="cosine"):
dists = cdist(X, centroids, metric=metric)
allocs = dists.argmin(axis=1)
return allocs
###########
def hf_get_sum_cols(cell_cuts, panel):
arr = np.where(cell_cuts[:, 0] == panel)[0]
return slice(arr[0], arr[-1] + 1)
###############
def hf_get_thresh_simps(x, thresh):
sorts = np.argsort(-x, axis=1)
x_sorted = -np.sort(-x, axis=1)
cumsums = np.cumsum(x_sorted, axis=1)
thresh_simps = pd.Series(
[
tuple(sorted(sorts[i, : (1 + j)]))
for i, j in enumerate(np.argmax(cumsums > thresh, axis=1))
]
)
return thresh_simps
###############
def hf_prepare_neighborhood_df(
cells_df, patient_ID_component1, patient_ID_component2, neighborhood_column=None
):
# Spacer for output
print("")
# Combine two columns to form unique ID which will be stored as patients column
cells_df["patients"] = (
cells_df[patient_ID_component1] + "_" + cells_df[patient_ID_component2]
)
print("You assigned following identifiers to the column 'patients':")
print(cells_df["patients"].unique())
# Spacer for output
print("")
if neighborhood_column:
# Assign numbers to neighborhoods
neigh_num = {
list(cells_df[neighborhood_column].unique())[i]: i
for i in range(len(cells_df[neighborhood_column].unique()))
}
cells_df["neigh_num"] = cells_df[neighborhood_column].map(neigh_num)
print(
"You assigned following numbers to the column 'neigh_num'. Each number represents one neighborhood:"
)
print(cells_df["neigh_num"].unique())
cells_df["neigh_num"] = cells_df["neigh_num"].astype("category")
cells_df["patients"] = cells_df["patients"].astype("category")
return cells_df
def hf_prepare_neighborhood_df2(
cells_df, patient_ID_component1, patient_ID_component2, neighborhood_column=None
):
# Spacer for output
print("")
# Combine two columns to form unique ID which will be stored as patients column
cells_df["unique_ID"] = (
cells_df[patient_ID_component1] + "_" + cells_df[patient_ID_component2]
)
print("You assigned following identifiers to the column 'unique_ID':")
print(cells_df["unique_ID"].unique())
# Spacer for output
print("")
if neighborhood_column:
# Assign numbers to neighborhoods
neigh_num = {
list(cells_df[neighborhood_column].unique())[i]: i
for i in range(len(cells_df[neighborhood_column].unique()))
}
cells_df["neigh_num"] = cells_df[neighborhood_column].map(neigh_num)
print(
"You assigned following numbers to the column 'neigh_num'. Each number represents one neighborhood:"
)
print(cells_df["neigh_num"].unique())
cells_df["neigh_num"] = cells_df["neigh_num"].astype("category")
cells_df["unique_ID"] = cells_df["unique_ID"].astype("category")
return cells_df
############
def hf_cor_subset(cor_mat, threshold, cell_type):
pairs = hf_get_top_abs_correlations(cor_mat, thresh=threshold)
piar1 = pairs.loc[pairs["col1"] == cell_type]
piar2 = pairs.loc[pairs["col2"] == cell_type]
piar = pd.concat([piar1, piar2])
pair_list = list(set(list(piar["col1"].unique()) + list(piar["col2"].unique())))
return pair_list, piar, pairs
#############
# correlation analysis
def hf_get_redundant_pairs(df):
"""Get diagonal and lower triangular pairs of correlation matrix"""
pairs_to_drop = set()
cols = df.columns
for i in range(0, df.shape[1]):
for j in range(0, i + 1):
pairs_to_drop.add((cols[i], cols[j]))
return pairs_to_drop
##############
# Spatial context analysis
"""
this is the code that finds the minimal combination of CNs
required to make up a threshold percentage of assignments in a window
combinations are stored as a sorted tuple
"""
def hf_simp_rep(
data,
patient_col,
tissue_column,
subset_list_tissue,
ttl_per_thres,
comb_per_thres,
thres_num=3,
):
# Choose the windows size to continue with
if tissue_column != None:
w2 = data.loc[data[tissue_column].isin(subset_list_tissue)]
print("tissue_column true")
else:
w2 = data.copy()
print("tissue_column false")
simp_list = []
for patient in list(w2[patient_col].unique()):
w = w2.loc[w2[patient_col] == patient]
xm = w.loc[:, l].values / n_num
# Get the neighborhood combinations based on the threshold
simps = hf_get_thresh_simps(xm, ttl_per_thres)
simp_freqs = simps.value_counts(normalize=True)
sf = simp_freqs.to_frame()
sf.rename(columns={0: patient}, inplace=True)
sf.reset_index(inplace=True)
sf.rename(columns={"index": "merge"}, inplace=True)
simp_list.append(sf)
# simp_sums = np.cumsum(simp_freqs)
# thresh_cumulative = .95
# selected_simps = simp_sums[simp_sums<=thresh_cumulative].index.values
# selected_simps = simp_freqs[simp_freqs>=comb_per_thres].index.values
simp_df = reduce(
lambda left, right: pd.merge(left, right, on=["merge"], how="outer"), simp_list
)
# simp_df = pd.concat(simp_list, axis=0)
# simp_df.index = simp_df.index.to_series()
simp_df.fillna(0, inplace=True)
simp_df.set_index("merge", inplace=True)
simp_out = simp_df.loc[simp_df.gt(0).sum(axis=1).ge(thres_num)]
return simp_out
################
def hf_get_top_abs_correlations(df, thresh=0.5):
au_corr = df.corr().unstack()
labels_to_drop = hf_get_redundant_pairs(df)
au_corr = au_corr.drop(labels=labels_to_drop).sort_values(ascending=False)
cc = au_corr.to_frame()
cc.index.rename(["col1", "col2"], inplace=True)
cc.reset_index(inplace=True)
cc.rename(columns={0: "value"}, inplace=True)
gt_pair = cc.loc[cc["value"].abs().gt(thresh)]
return gt_pair
################
# help to convert dataframe after denoising into anndata
[docs]
def make_anndata(
df_nn, # data frame coming out from denoising
col_sum, # this is the column index that has the last protein feature
nonFuncAb_list, # inspect which markers work, and drop the ones that did not work from the clustering step
):
"""
Convert a denoised DataFrame into anndata format.
Parameters:
df_nn (pandas.DataFrame): Denoised data frame.
col_sum (int): Column index of the last protein feature.
nonFuncAb_list (list): List of markers that did not work in the clustering step.
Returns:
AnnData: Anndata object containing the converted data.
"""
adata = sc.AnnData(X=df_nn.iloc[:, : col_sum + 1].drop(columns=nonFuncAb_list))
adata.obs = df_nn.iloc[:, col_sum + 1 :]
return adata
################
def hf_split_channels(input_path, output_folder, channel_names_file=None):
"""
Split channels of a TIFF image and save them as separate files.
Parameters:
input_path (str): Path to the input TIFF image.
output_folder (str): Path to the output folder where the channels will be saved.
channel_names_file (str, optional): Path to the file containing channel names.
If None, numbers will be used as channel names.
Raises:
FileNotFoundError: If the input_path or channel_names_file (if provided) is not found.
Returns:
None
"""
# Load the large TIFF image
print("loading image file...")
image = tiff.imread(input_path)
print("Done")
print("splitting channels...")
if channel_names_file is not None:
# Read the channel names from the text file
with open(channel_names_file, "r") as f:
channel_names = f.read().splitlines()
else:
# Generate channel names as numbers
channel_names = [str(i) for i in range(image.shape[0])]
print("Done")
# Split the channels
channels = np.split(image, image.shape[0], axis=0)
# Save each channel as a TIFF file with the corresponding channel name
for i, channel in tqdm(
enumerate(channels), total=len(channels), desc="Saving channels"
):
channel = np.squeeze(channel) # Remove single-dimensional entries
output_filename = f"{output_folder}/{channel_names[i]}.tif"
tiff.imsave(output_filename, channel)
print("Channels saved successfully!")
##############
# Patch analysis
def hf_voronoi_finite_polygons_2d(vor, radius=None):
"""
Reconstruct infinite voronoi regions in a 2D diagram to finite
regions.
Parameters
----------
vor : Voronoi
Input diagram
radius : float, optional
Distance to 'points at infinity'.
Returns
-------
regions : list of tuples
Indices of vertices in each revised Voronoi regions.
vertices : list of tuples
Coordinates for revised Voronoi vertices. Same as coordinates
of input vertices, with 'points at infinity' appended to the
end.
"""
if vor.points.shape[1] != 2:
raise ValueError("Requires 2D input")
new_regions = []
new_vertices = vor.vertices.tolist()
center = vor.points.mean(axis=0)
if radius is None:
radius = vor.points.ptp().max()
# Construct a map containing all ridges for a given point
all_ridges = {}
for (p1, p2), (v1, v2) in zip(vor.ridge_points, vor.ridge_vertices):
all_ridges.setdefault(p1, []).append((p2, v1, v2))
all_ridges.setdefault(p2, []).append((p1, v1, v2))
# Reconstruct infinite regions
for p1, region in enumerate(vor.point_region):
vertices = vor.regions[region]
if all(v >= 0 for v in vertices):
# finite region
new_regions.append(vertices)
continue
# reconstruct a non-finite region
ridges = all_ridges[p1]
new_region = [v for v in vertices if v >= 0]
for p2, v1, v2 in ridges:
if v2 < 0:
v1, v2 = v2, v1
if v1 >= 0:
# finite ridge: already in the region
continue
# Compute the missing endpoint of an infinite ridge
t = vor.points[p2] - vor.points[p1] # tangent
t /= np.linalg.norm(t)
n = np.array([-t[1], t[0]]) # normal
midpoint = vor.points[[p1, p2]].mean(axis=0)
direction = np.sign(np.dot(midpoint - center, n)) * n
far_point = vor.vertices[v2] + direction * radius * 10
new_region.append(len(new_vertices))
new_vertices.append(far_point.tolist())
# sort region counterclockwise
vs = np.asarray([new_vertices[v] for v in new_region])
c = vs.mean(axis=0)
angles = np.arctan2(vs[:, 1] - c[1], vs[:, 0] - c[0])
new_region = np.array(new_region)[np.argsort(angles)]
# finish
new_regions.append(new_region.tolist())
return new_regions, np.asarray(new_vertices)
###
def hf_list_folders(directory):
"""
Retrieve a list of folders in a given directory.
Parameters:
directory (str): Path to the directory.
Returns:
list: List of folder names within the specified directory.
"""
folders = []
for item in os.listdir(directory):
item_path = os.path.join(directory, item)
if os.path.isdir(item_path):
folders.append(item)
return folders
###
def hf_process_dataframe(df):
"""
Extract information from a pandas DataFrame containing file paths and IDs.
Parameters:
df (pandas.DataFrame): The input DataFrame containing file paths and IDs.
Returns:
tuple: A tuple containing the voronoi_path, mask_path, and region for the first row.
"""
for index, row in df.iterrows():
voronoi_path = row["voronoi_path"]
mask_path = row["mask_path"]
region = row["region_long"]
# Process the voronoi_path and mask_path variables as needed
# Here, we are printing them for demonstration purposes
print(f"Voronoi Path: {voronoi_path}")
print(f"Mask Path: {mask_path}")
return voronoi_path, mask_path, region
###
def hf_get_png_files(directory):
"""
Get a list of PNG files in a given directory.
Parameters:
directory (str): Path to the directory.
Returns:
list: List of PNG file paths within the specified directory.
"""
png_files = []
for filename in os.listdir(directory):
if filename.endswith(".png"):
png_files.append(os.path.join(directory, filename))
return png_files
###
def hf_find_tiff_file(directory, prefix):
"""
Find a TIFF file in a given directory with a specified prefix.
Parameters:
directory (str): Path to the directory.
prefix (str): Prefix of the TIFF file name.
Returns:
str or None: Path to the found TIFF file, or None if no matching file is found.
"""
for filename in os.listdir(directory):
if filename.endswith(".tif") and filename.startswith(prefix):
return os.path.join(directory, filename)
return None
###
def hf_extract_filename(filepath):
"""
Extract the filename from a given filepath.
Parameters:
filepath (str): The input filepath.
Returns:
str: The extracted filename without the extension.
"""
filename = os.path.basename(filepath) # Extracts the last element of the path
filename = filename.replace(
"_plot.png_cut.png", ""
) # Removes the ".png_cut.png" extension
return filename
###
def hf_get_tif_filepaths(directory):
"""
Recursively searches the specified directory and its subdirectories for TIFF files (.tif) and returns a list of their file paths.
Args:
directory (str): The directory path to search for TIFF files.
Returns:
list: A list of file paths for all TIFF files found in the directory and its subdirectories.
"""
tif_filepaths = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith(".tif"):
tif_filepaths.append(os.path.join(root, file))
return tif_filepaths
################ CCA
def hf_prepare_cca(df, neighborhood_column, subsets=None):
neigh_num = {
list(df[neighborhood_column].unique())[i]: i
for i in range(len(df[neighborhood_column].unique()))
}
df["neigh_num"] = df[neighborhood_column].map(neigh_num)
df["neigh_num"] = df["neigh_num"].astype("category")
cca = CCA(n_components=1, max_iter=5000)
func = pearsonr
# select which neighborhoods and functional subsets
cns = list(df["neigh_num"].unique())
# print(cns)
# log (1e-3 + neighborhood specific cell type frequency) of functional subsets ('nsctf')
if subsets is not None:
nsctf = np.log(
1e-3
+ df.groupby(["unique_region", "neigh_num"])[subsets]
.mean()
.reset_index()
.set_index(["neigh_num", "unique_region"])
)
# print(nsctf)
else:
nsctf = np.log(
1e-3
+ df.groupby(["unique_region", "neigh_num"])
.mean()
.reset_index()
.set_index(["neigh_num", "unique_region"])
)
# print(nsctf)
cca = CCA(n_components=1, max_iter=5000)
func = pearsonr
nsctf = nsctf.fillna(1e-3)
return df, cns, nsctf, cca, func, neigh_num
def invert_dictionary(dictionary):
inverted_dict = {value: key for key, value in dictionary.items()}
return inverted_dict
def hf_replace_names(color_dict, name_dict):
color_dict = hf_invert_dictionary(color_dict)
for color, name in color_dict.items():
if name in name_dict:
color_dict[color] = name_dict[name]
color_dict = hf_invert_dictionary(color_dict)
return color_dict
def hf_annotate_cor_plot(x, y, **kws):
data = kws["data"]
r, p = sp.stats.pearsonr(data[x], data[y])
ax = plt.gca()
ax.text(0.5, 0.8, f"r={r:.2f}, p={p:.2g}", transform=ax.transAxes, fontsize=14)
def is_dark(color):
"""
Determines if a color is dark based on its RGB values.
Parameters:
color (str): The color to check. This can be any valid color string accepted by mcolors.to_rgb().
Returns:
bool: True if the color is dark, False otherwise. A color is considered dark if its brightness is less than 0.5.
"""
r, g, b = mcolors.to_rgb(color)
brightness = (r * 299 + g * 587 + b * 114) / 1000
return brightness < 0.5
[docs]
def check_for_gpu(tensorflow=False, torch=True):
"""
Check if a GPU is available for use by TensorFlow and PyTorch.
This function checks if a GPU is available for use by TensorFlow and PyTorch.
It prints a message indicating whether a GPU is available for each library,
and returns a boolean indicating whether a GPU is available.
Returns
-------
bool
True if
a GPU is available for PyTorch
and `pytorch=True` (default),
a GPU is available for tensorflow
and `tensorflow=True`,
a GPU is available for PyTorch AND tensorflow
and `pytorch=True` and `tensorflow=True`
otherwise it returns `False`.
"""
gpu_tf = False
if tf.config.list_physical_devices("GPU"):
print("GPU is available to Tensorflow")
gpu_tf = True
else:
print("GPU is not available to Tensorflow")
gpu_tf = False
gpu_torch = use_gpu()
if gpu_torch:
print("GPU is available to Pytorch")
else:
print("GPU is not available to Pytorch")
if torch and tensorflow:
return gpu_torch and gpu_tf
elif torch:
return gpu_torch
elif tensorflow:
return gpu_tf
else:
return False