# load required packages
import glob
import os
import sys
from itertools import product
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scanpy as sc
import seaborn as sns
from scipy.stats import norm, zscore
# read the data frame output from the segmentation functions
[docs]
def read_segdf(
segfile_list,
seg_method,
region_list=None, # optional information #please make sure the length of each list matches
meta_list=None, # optional information
):
"""
Read the data frame output from the segmentation functions.
Parameters
----------
segfile_list : list
List of segmented csv files to be read.
seg_method : str
The segmentation method used.
region_list : list, optional
List of regions, by default None. Please make sure the length of each list matches.
meta_list : list, optional
List of metadata, by default None. Please make sure the length of each list matches.
Returns
-------
df : pandas.DataFrame
The concatenated DataFrame from all the segmentation files.
Raises
------
SystemExit
If the length of region_list or meta_list does not match with segfile_list.
"""
if region_list is not None:
if len(region_list) != len(segfile_list):
sys.exit("length of each list does not match!")
elif meta_list is not None:
if len(meta_list) != len(segfile_list):
sys.exit("length of each list does not match!")
df = pd.DataFrame()
# concat old dataframes
for i in range(len(segfile_list)):
tmp = pd.read_csv(segfile_list[i], index_col=0)
tmp["region_num"] = str(i)
if region_list is not None:
tmp["unique_region"] = str(region_list[i])
if meta_list is not None:
tmp["condition"] = str(meta_list[i])
df = pd.concat([df, tmp], axis=0)
if seg_method == "cellseg":
# See resultant dataframe
df.columns = df.columns.str.split(":").str[-1].tolist()
df = df.reset_index().rename(columns={"index": "first_index"})
df.columns = df.columns.str.split(":").str[-1].tolist()
df.rename(columns={"size": "area"}, inplace=True)
return df
[docs]
def filter_data(
df,
nuc_thres=1,
size_thres=1,
nuc_marker="DAPI",
cell_size="area",
region_column="region_num",
color_by=None,
palette="Paired",
alpha=0.8,
size=0.4, # dot style
log_scale=False,
plot=False,
):
"""
Filter data based on nuclear threshold and size threshold, and visualize the data before and after filtering.
Parameters
----------
df : pandas.DataFrame
The DataFrame to be filtered.
nuc_thres : int, optional
The nuclear threshold, by default 1.
size_thres : int, optional
The size threshold, by default 1.
nuc_marker : str, optional
The nuclear marker, by default "DAPI".
cell_size : str, optional
The cell size, by default "area".
region_column : str, optional
The region column, by default "region_num".
color_by : str, optional
The column to color by, by default None.
palette : str, optional
The color palette, by default "Paired".
alpha : float, optional
The alpha for the scatter plot, by default 0.8.
size : float, optional
The size for the scatter plot, by default 0.4.
log_scale : bool, optional
Whether to use log scale for the scatter plot, by default False.
Returns
-------
df_nuc : pandas.DataFrame
The filtered DataFrame.
"""
if color_by == None:
color_by = region_column
df_nuc = df[(df[nuc_marker] > nuc_thres) * df[cell_size] > size_thres]
per_keep = len(df_nuc) / len(df)
# Create a figure with two subplots
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 5))
# Boxplot
# sns.boxplot(data=df.loc[:, [cell_size, nuc_marker]], orient="h", ax=ax1)
# ax1.set_title("Cell size and nuclear marker intensity")
if plot:
print("Plotting scatterplots now, if your data is large, it will take awhile!")
sns.scatterplot(
x=df[nuc_marker],
y=df[cell_size],
hue=df[color_by],
palette=palette,
alpha=alpha,
ax=ax2,
legend=False,
)
if log_scale == True:
ax2.set_xscale("log")
ax2.set_yscale("log")
ax2.axhline(size_thres, color="k", linestyle="dashed", linewidth=1)
ax2.axvline(nuc_thres, color="k", linestyle="dashed", linewidth=1)
ax2.legend(bbox_to_anchor=(1.2, 1), loc="upper left", borderaxespad=0)
ax2.set_title("Before filtering")
# Plot 2
sns.scatterplot(
x=df_nuc[nuc_marker],
y=df_nuc[cell_size],
hue=df_nuc[color_by],
palette=palette,
alpha=alpha,
ax=ax3,
)
if log_scale == True:
ax3.set_xscale("log")
ax3.set_yscale("log")
ax3.axhline(size_thres, color="k", linestyle="dashed", linewidth=1)
ax3.axvline(nuc_thres, color="k", linestyle="dashed", linewidth=1)
ax3.legend(bbox_to_anchor=(1.2, 1), loc="upper left", borderaxespad=0)
ax3.set_title("After filtering")
plt.show()
# show plot
plt.tight_layout()
plt.show()
# print the percentage of cells that are kept
print("Percentage of cells kept: ", per_keep * 100, "%")
return df_nuc
# Only useful for "classic CODEX" where samples are covered by multiple regions
# Could also be used for montages of multiple samples (tiles arraged in grid)
def xycorr(data, y_rows, x_columns, X_pix, Y_pix):
"""
Corrects the x and y coordinates of the data for "classic CODEX" where samples are covered by multiple regions.
This function could also be used for montages of multiple samples (tiles arranged in a grid).
Parameters
----------
data : DataFrame
The input data to be corrected.
y_rows : int
The number of rows in the y direction.
x_columns : int
The number of columns in the x direction.
X_pix : int
The number of pixels in the x direction.
Y_pix : int
The number of pixels in the y direction.
Returns
-------
DataFrame
The corrected data with added 'Xcorr' and 'Ycorr' columns representing the corrected x and y coordinates respectively.
"""
# Make a copy for xy correction
df_XYcorr = data.copy()
df_XYcorr["Xcorr"] = 0
df_XYcorr["Ycorr"] = 0
dict_test = dict(enumerate(df_XYcorr.region_num.unique()))
dict_map = {v: k + 1 for k, v in dict_test.items()}
df_XYcorr["regloop"] = df_XYcorr["region_num"].map(dict_map)
region_num = df_XYcorr.regloop.max()
# first value of tuple is y and second is x
d = list(product(range(0, y_rows, 1), range(0, x_columns, 1)))
e = list(range(1, region_num + 1, 1))
dict_corr = {}
dict_corr = dict(zip(e, d))
# Adding the pixels with the dictionary
for reg_num in list(df_XYcorr["regloop"].unique()):
df_XYcorr["Xcorr"].loc[df_XYcorr["regloop"] == reg_num] = (
df_XYcorr["x"].loc[df_XYcorr["regloop"] == reg_num]
+ dict_corr[reg_num][1] * X_pix
)
for reg_num in list(df_XYcorr["regloop"].unique()):
df_XYcorr["Ycorr"].loc[df_XYcorr["regloop"] == reg_num] = (
df_XYcorr["y"].loc[df_XYcorr["regloop"] == reg_num]
+ dict_corr[reg_num][0] * Y_pix
)
df_XYcorr.drop(columns=["regloop"], inplace=True)
return df_XYcorr
# Get rid of noisy cells from dataset
[docs]
def remove_noise(df, col_num, z_sum_thres, z_count_thres):
"""
Removes noisy cells from the dataset based on the given thresholds.
Parameters
----------
df : DataFrame
The input data from which noisy cells are to be removed.
col_num : int
The column number up to which the operation is performed.
z_sum_thres : float
The threshold for the sum of z-scores. Cells with a sum of z-scores greater than this threshold are considered noisy.
z_count_thres : int
The threshold for the count of z-scores. Cells with a count of z-scores greater than this threshold are considered noisy.
Returns
-------
df_want : DataFrame
The cleaned data with noisy cells removed.
cc : DataFrame
The data of the noisy cells that were removed from the original data.
"""
df_z_1_copy = df.copy()
df_z_1_copy["Count"] = df_z_1_copy.iloc[:, : col_num + 1].ge(0).sum(axis=1)
df_z_1_copy["z_sum"] = df_z_1_copy.iloc[:, : col_num + 1].sum(axis=1)
cc = df_z_1_copy[
(df_z_1_copy["z_sum"] > z_sum_thres) | (df_z_1_copy["Count"] > z_count_thres)
]
df_want = df_z_1_copy[
~((df_z_1_copy["z_sum"] > z_sum_thres) | (df_z_1_copy["Count"] > z_count_thres))
]
percent_removed = np.round(
1 - (df_want.shape[0] / df_z_1_copy.shape[0]), decimals=3
)
print(str(percent_removed * 100) + "% cells are removed.")
df_want.drop(columns=["Count", "z_sum"], inplace=True)
df_want.reset_index(inplace=True, drop=True)
return df_want, cc
class ImageProcessor:
"""
A class used to process images and compute channel means and sums.
...
Attributes
----------
flatmasks : ndarray
2D numpy array containing masks for each cell.
Methods
-------
update_adjacency_value(adjacency_matrix, original, neighbor):
Updates the adjacency matrix based on the original and neighbor values.
update_adjacency_matrix(plane_mask_flattened, width, height, adjacency_matrix, index):
Updates the adjacency matrix based on the flattened plane mask.
compute_channel_means_sums_compensated(image):
Computes the channel means and sums for each cell and compensates them.
"""
def __init__(self, flatmasks):
"""
Constructs all the necessary attributes for the ImageProcessor object.
Parameters
----------
flatmasks : ndarray
2D numpy array containing masks for each cell.
"""
self.flatmasks = flatmasks
def update_adjacency_value(self, adjacency_matrix, original, neighbor):
# This function is copied from CellSeg
"""
Updates the adjacency matrix based on the original and neighbor values.
Parameters
----------
adjacency_matrix : ndarray
2D numpy array representing the adjacency matrix.
original : int
Original value.
neighbor : int
Neighbor value.
Returns
-------
bool
True if the original and neighbor values are different and not zero, False otherwise.
"""
border = False
if original != 0 and original != neighbor:
border = True
if neighbor != 0:
adjacency_matrix[int(original - 1), int(neighbor - 1)] += 1
return border
def update_adjacency_matrix(
self, plane_mask_flattened, width, height, adjacency_matrix, index
):
# This function is copied from CellSeg
"""
Updates the adjacency matrix based on the flattened plane mask.
Parameters
----------
plane_mask_flattened : ndarray
1D numpy array representing the flattened plane mask.
width : int
Width of the plane mask.
height : int
Height of the plane mask.
adjacency_matrix : ndarray
2D numpy array representing the adjacency matrix.
index : int
Index of the current cell in the flattened plane mask.
"""
mod_value_width = index % width
origin_mask = plane_mask_flattened[index]
left, right, up, down = False, False, False, False
if mod_value_width != 0:
left = self.update_adjacency_value(
adjacency_matrix, origin_mask, plane_mask_flattened[index - 1]
)
if mod_value_width != width - 1:
right = self.update_adjacency_value(
adjacency_matrix, origin_mask, plane_mask_flattened[index + 1]
)
if index >= width:
up = self.update_adjacency_value(
adjacency_matrix, origin_mask, plane_mask_flattened[index - width]
)
if index <= len(plane_mask_flattened) - 1 - width:
down = self.update_adjacency_value(
adjacency_matrix, origin_mask, plane_mask_flattened[index + width]
)
if left or right or up or down:
adjacency_matrix[int(origin_mask - 1), int(origin_mask - 1)] += 1
def compute_channel_means_sums_compensated(self, image):
# This function is copied from CellSeg but modified to solve the least squares problem with torch instead of numpy
"""
Computes the channel means and sums for each cell and compensates them.
Parameters
----------
image : ndarray
3D numpy array representing the image.
Returns
-------
compensated_means : ndarray
2D numpy array representing the compensated means for each cell.
means : ndarray
2D numpy array representing the means for each cell.
channel_counts : ndarray
1D numpy array representing the counts for each cell.
"""
height, width, n_channels = image.shape
mask_height, mask_width = self.flatmasks.shape
n_masks = len(np.unique(self.flatmasks)) - 1
channel_sums = np.zeros((n_masks, n_channels))
channel_counts = np.zeros((n_masks, n_channels))
if n_masks == 0:
return channel_sums, channel_sums, channel_counts
squashed_image = np.reshape(image, (height * width, n_channels))
# masklocs = np.nonzero(self.flatmasks)
# plane_mask = np.zeros((mask_height, mask_width), dtype = np.uint32)
# plane_mask[masklocs[0], masklocs[1]] = masklocs[2] + 1
# plane_mask = plane_mask.flatten()
plane_mask = self.flatmasks.flatten()
adjacency_matrix = np.zeros((n_masks, n_masks))
for i in range(len(plane_mask)):
self.update_adjacency_matrix(
plane_mask, mask_width, mask_height, adjacency_matrix, i
)
mask_val = plane_mask[i] - 1
if mask_val != -1:
channel_sums[mask_val.astype(np.int32)] += squashed_image[i]
channel_counts[mask_val.astype(np.int32)] += 1
# Normalize adjacency matrix
for i in range(n_masks):
adjacency_matrix[i] = adjacency_matrix[i] / (
max(adjacency_matrix[i, i], 1) * 2
)
adjacency_matrix[i, i] = 1
means = np.true_divide(
channel_sums,
channel_counts,
out=np.zeros_like(channel_sums, dtype="float"),
where=channel_counts != 0,
)
# Convert your numpy arrays to PyTorch tensors
adjacency_matrix_torch = torch.from_numpy(adjacency_matrix)
means_torch = torch.from_numpy(means)
# Solve the least squares problem
results_torch = torch.linalg.lstsq(adjacency_matrix_torch, means_torch).solution
# Convert the result back to a numpy array if needed
# Convert the result back to a numpy array if needed
results = results_torch.numpy()
compensated_means = np.maximum(results, np.zeros(results.shape))
return compensated_means, means, channel_counts[:, 0]
[docs]
def compensate_cell_matrix(df, image_dict, masks, overwrite=True):
"""
Compensate cell matrix by computing channel means and sums.
Parameters
----------
df : DataFrame
The DataFrame to which the compensated means will be added.
image_dict : dict
Dictionary containing images for each channel.
masks : ndarray
3D numpy array containing masks for each cell.
overwrite : bool, optional
If True, overwrite existing columns in df. If False, add new columns to df. Default is True.
Returns
-------
DataFrame
The DataFrame with added compensated means.
Notes
-----
The function computes the channel means and sums for each cell, compensates them, and adds them to the DataFrame.
The compensated means are added to the DataFrame with column names from the keys of the image_dict.
If overwrite is True, existing columns in the DataFrame are overwritten. If overwrite is False, new columns are added to the DataFrame.
"""
print(
"This function uses code from Lee, M.Y., Bedia, J.S., Bhate, S.S. et al. CellSeg: a robust, pre-trained nucleus segmentation and pixel quantification software for highly multiplexed fluorescence images. BMC Bioinformatics 23, 46 (2022) please consider citing the original work."
)
masks = masks.squeeze()
image_list = [image_dict[channel_name] for channel_name in image_dict.keys()]
# Stack the 2D numpy arrays along the third dimension to create a 3D numpy array
image = np.stack(image_list, axis=-1)
# Now you can use `image` as the input for the function
processor = ImageProcessor(masks)
(
compensated_means,
means,
channel_counts,
) = processor.compute_channel_means_sums_compensated(image)
# Get the keys
keys = list(image_dict.keys())
# Cycle over the keys
for i in range(len(keys)):
# Add the compensated_means to the DataFrame with column names from keys
if overwrite == True:
df[keys[i]] = compensated_means[:, i]
else:
df[keys[i] + "_compensated"] = compensated_means[:, i]
return df