Source code for pykanto.signal.cluster

# ─── DESCRIPTION ──────────────────────────────────────────────────────────────

"""Perform dimensionality reduction and clustering."""

# ─── DEPENDENCIES ─────────────────────────────────────────────────────────────

from __future__ import annotations

import itertools
import pickle
import warnings
from logging import warn
from typing import TYPE_CHECKING, Any, Dict, List, Tuple

import numpy as np
import pandas as pd
import psutil
import ray
import umap
from hdbscan import HDBSCAN
from umap import UMAP

from pykanto.signal.spectrogram import pad_spectrogram
from pykanto.utils.compute import (
    calc_chunks,
    flatten_list,
    get_chunks,
    print_parallel_info,
    to_iterator,
    with_pbar,
)

if TYPE_CHECKING:
    from pykanto.dataset import KantoData
try:
    from cuml import UMAP as cumlUMAP
except:
    warnings.warn(
        "The cuML library is not not available: defaulting to "
        "slower CPU UMAP implementation",
        ImportWarning,
    )
    _has_cuml = False
else:
    _has_cuml = True


# ──── FUNCTIONS ───────────────────────────────────────────────────────────────


[docs]def umap_reduce( data: np.ndarray, n_neighbors: int = 15, n_components: int = 2, min_dist: float = 0.1, random_state: int | None = None, verbose: bool = False, **kwargs_umap, ) -> Tuple[np.ndarray, umap.UMAP]: """ Uniform Manifold Approximation and Projection. Uses either the cuml GPU-accelerated version or the 'regular' umap version. See the documentation of either for valid kwargs. Args: data (array-like, shape = (n_samples, n_features)): Data to reduce. n_neighbors (int, optional): See UMAP docs. Defaults to 15. n_components (int, optional): See UMAP docs. Defaults to 2. min_dist (float, optional): See UMAP docs. Defaults to 0.1. random_state (int, optional): See UMAP docs. Defaults to None. kwargs_umap: extra named arguments passed to umap.UMAP or cuml.umap.UMAP. Returns: Tuple[np.ndarray, umap.UMAP]: Embedding coordinates and UMAP reducer. """ kwargs_umap.setdefault("n_neighbors", n_neighbors) kwargs_umap.setdefault("n_components", n_components) kwargs_umap.setdefault("min_dist", min_dist) kwargs_umap.setdefault("random_state", random_state) if _has_cuml: reducer = cumlUMAP(**kwargs_umap) embedding = reducer.fit_transform(data) else: if verbose: warnings.warn( "The cuML library is not not available: defaulting to " "slower CPU UMAP implementation." ) reducer = UMAP(**kwargs_umap) embedding = reducer.fit_transform(data) return embedding, reducer
[docs]def hdbscan_cluster( embedding: np.ndarray, min_cluster_size: int = 5, min_samples: None | int = None, **kwargs_hdbscan, ) -> HDBSCAN: """ Perform HDBSCAN clustering from vector array or distance matrix. Convenience wrapper. See the `HDBSCAN* docs <https://hdbscan.readthedocs.io/en/latest/parameter_selection.html>`_. Args: embedding (np.ndarray): Data to cluster. See hdbscan documentation for more. min_cluster_size (int, optional): Minimum number of samples to consider a cluster. Defaults to 5. min_samples (int, optional): Controls how 'conservative' clustering is. Larger values = more points will be declared as noise. Defaults to None. kwargs_hdbscan: Extra named arguments passed to HDBSCAN. Returns: HDBSCAN: HDBSCAN object. Labels are at `self.labels_`. """ min_cluster_size = kwargs_hdbscan.setdefault( "min_cluster_size", min_cluster_size ) kwargs_hdbscan.setdefault("min_samples", min_samples) kwargs_hdbscan.setdefault("cluster_selection_method", "eom") if min_cluster_size < 2: warnings.warn("`min_cluster_size` too small, setting it to 2") kwargs_hdbscan["min_cluster_size"] = 2 clusterer = HDBSCAN(**kwargs_hdbscan) clusterer.fit(embedding) return clusterer
[docs]def reduce_and_cluster( dataset: KantoData, ID: str, song_level: bool = False, min_sample: int = 10, kwargs_umap: Dict[str, Any] = {}, kwargs_hdbscan: Dict[str, Any] = {}, ) -> pd.DataFrame | None: # TODO: pass UMAP and HDBSCAN params! """ Args: dataset (KantoData): Data to be used. ID (str): Grouping factor. song_level (bool, optional): Whether to use the average of all units in each vocalisation instead of all units. Defaults to False. min_sample (int, optional): Minimum number of vocalisations or units. Defaults to 10. kwargs_umap (dict): dictionary of UMAP parameters. kwargs_hdbscan (dict): dictionary of HDBSCAN+ parameters. Returns: pd.DataFrame | None: Dataframe with columns ['vocalisation_key', 'ID', 'idx', 'umap_x', 'umap_y', 'auto_class'] or None if sample size is too small """ # Retrieve units or averaged units for one individual units = pickle.load( open( dataset.files.query("ID==@ID")[ "average_units" if song_level else "units" ][0], "rb", ) ) units_keys = ( [key for key in units] if song_level else [ key + f"_{i}" for key, vals in units.items() for i in range(len(vals)) ] ) # Check if sample size is sufficient if len(units_keys) < min_sample: warnings.warn(f"Insufficient sample size for {ID}. " "Returning None.") return # Pad if necessary # REVIEW if song_level: if not {unit.shape[1] for unit in units.values()} == 1: max_frames = max([unit.shape[1] for unit in units.values()]) units = { key: pad_spectrogram(spec, max_frames) for key, spec in units.items() } else: if not {unit.shape[1] for ls in units.values() for unit in ls} == 1: max_frames = max( [unit.shape[1] for ls in units.values() for unit in ls] ) units = { key: [pad_spectrogram(spec, max_frames) for spec in ls] for key, ls in units.items() } # Flatten units if song_level: flat_units = [unit.flatten() for unit in units.values()] else: unitkeys = list( itertools.chain.from_iterable( [[key] * len(value) for key, value in units.items()] ) ) flat_units = np.array( [unit.flatten() for ls in units.values() for unit in ls] ) # Run UMAP embedding, _ = umap_reduce(flat_units, **kwargs_umap) # Cluster using HDBSCAN clusterer = hdbscan_cluster(embedding, **kwargs_hdbscan) # Put together in a dataframe cluster_df = pd.DataFrame(units_keys, columns=["index"]) cluster_df.set_index("index", inplace=True) if song_level is False: cluster_df["vocalisation_key"] = unitkeys cluster_df["ID"] = ID cluster_df["idx"] = [idx[-1] for idx in cluster_df.index] cluster_df["umap_x"] = embedding[:, 0] cluster_df["umap_y"] = embedding[:, 1] cluster_df["auto_class"] = list(clusterer.labels_) cluster_df["auto_class"] = cluster_df["auto_class"].astype(str) return cluster_df
[docs]def reduce_and_cluster_parallel( dataset: KantoData, kwargs_umap: dict = {}, kwargs_hdbscan: dict = {}, min_sample: int = 10, num_cpus: float | None = None, ) -> pd.DataFrame | None: """ Parallel implementation of :func:`~pykanto.signal.cluster.reduce_and_cluster`. """ song_level = dataset.parameters.song_level IDS = set(dataset.files["ID"]) # IDS = dataset.DIRS.AVG_UNITS if song_level else dataset.DIRS.UNITS # Calculate and make chunks n = len(IDS) if not n: raise KeyError( "No sound file keys were passed to " "reduce_and_cluster." ) chunk_info = calc_chunks( n, n_workers=num_cpus, verbose=dataset.parameters.verbose ) chunk_length, n_chunks = chunk_info[3], chunk_info[2] chunks = get_chunks(list(IDS), chunk_length) print_parallel_info(n, "individual IDs", n_chunks, chunk_length) # Copy dataset to local object store dataset_ref = ray.put(dataset) # Distribute with ray @ray.remote( num_cpus=num_cpus, num_gpus=1 / psutil.cpu_count() if _has_cuml else 0 ) # type: ignore def _reduce_and_cluster_r( dataset: KantoData, IDS: List[str], song_level: bool = False, min_sample: int = 10, kwargs_umap=kwargs_umap, kwargs_hdbscan=kwargs_hdbscan, ) -> List[pd.DataFrame | None]: return [ reduce_and_cluster( dataset, ID, song_level=song_level, min_sample=min_sample, kwargs_umap=kwargs_umap, kwargs_hdbscan=kwargs_hdbscan, ) for ID in IDS ] obj_ids = [ _reduce_and_cluster_r.remote( dataset_ref, i, song_level=song_level, min_sample=min_sample, kwargs_umap=kwargs_umap, kwargs_hdbscan=kwargs_hdbscan, ) for i in chunks ] pbar = { "desc": "Projecting and clustering vocalisations", "total": n_chunks, } dfls = [obj_id for obj_id in with_pbar(to_iterator(obj_ids), **pbar)] # Minimally check output try: df = pd.concat(flatten_list(dfls)) return df except ValueError as e: if str(e) == "All objects passed were None": raise TypeError( f"{str(e)}. Possible reasons include:" "\n1: Sample sizes were insufficient for each and " "every individual in the dataset." "\n2: Inadequate parametrisation." )