Source code for anhima.util

# -*- coding: utf-8 -*-
"""
Miscellaneous utilities.

"""


from __future__ import division, print_function, absolute_import
from anhima.compat import range


# third party dependencies
import numpy as np
import pandas


[docs]def block_take2d(dataset, row_indices, col_indices=None, block_size=None): """Select rows and optionally columns from a Numpy array or HDF5 dataset with 2 or more dimensions. Parameters ---------- dataset : array_like or HDF5 dataset The input dataset. row_indices : sequence of ints The indices of the selected rows. N.B., will be sorted in ascending order. col_indices : sequence of ints, optional The indices of the selected columns. If not provided, all columns will be returned. block_size : int, optional The size (in number of rows) of the block of data to process at a time. Returns ------- out : ndarray An array containing the selected rows and columns. See Also -------- anhima.util.block_compress2d, anhima.h5.take2d_pointsel Notes ----- This function is mainly a work-around for the fact that fancy indexing via h5py is currently slow, and fancy indexing along more than one axis is not supported. The function works by reading the entire dataset in blocks of `block_size` rows, and processing each block in memory using numpy. """ # N.B., make sure row_indices are sorted row_indices = np.asarray(row_indices) row_indices.sort() # how many rows are we selecting? n_rows_in = dataset.shape[0] n_rows_out = len(row_indices) # how many columns are we selecting? n_cols_in = dataset.shape[1] if col_indices: n_cols_out = len(col_indices) else: n_cols_out = n_cols_in # setup output array out_shape = (n_rows_out, n_cols_out) + dataset.shape[2:] out = np.empty(out_shape, dtype=dataset.dtype) # determine block size if block_size is None: if hasattr(dataset, 'chunks') and dataset.chunks is not None: # use dataset chunk height block_size = dataset.chunks[0] else: # use arbitrary number block_size = 1000 # iterate block-wise offset = 0 for block_start in range(0, n_rows_in, block_size): block_stop = min(block_start+block_size, n_rows_in) # how many indices to process in this block? i = np.searchsorted(row_indices, block_start) j = np.searchsorted(row_indices, block_stop) n = j-i ridx = row_indices[i:j] # only do anything if there are indices for this block if n: # load data for this block a = dataset[block_start:block_stop] # take rows b = np.take(a, ridx-block_start, axis=0) # take columns if col_indices: b = np.take(b, col_indices, axis=1) # store output out[offset:offset+n, ...] = b # keep track of offset offset += n return out
[docs]def block_compress2d(dataset, row_condition, col_condition=None, block_size=None): """Select rows and optionally columns from a Numpy array or HDF5 dataset with 2 or more dimensions. Parameters ---------- dataset : array_like or HDF5 dataset The input dataset. row_condition : array_like, bool A boolean array indicating the selected rows. col_indices : array_like, bool, optonal A boolean array indicated the selected columns. If not provided, all columns will be returned. block_size : int, optional The size (in number of rows) of the block of data to process at a time. Returns ------- out : ndarray An array containing the selected rows and columns. See Also -------- anhima.util.block_take2d, anhima.h5.take2d_pointsel Notes ----- This function is mainly a work-around for the fact that fancy indexing via h5py is currently slow, and fancy indexing along more than one axis is not supported. The function works by reading the entire dataset in blocks of `block_size` rows, and processing each block in memory using numpy. """ row_indices = np.nonzero(row_condition)[0] col_indices = np.nonzero(col_condition)[0] if col_condition is not None \ else None return block_take2d(dataset, row_indices, col_indices, block_size=block_size)
[docs]def block_apply(f, dataset, block_size=None, out=None): """Apply function `f` to `dataset` split along the first axis into contiguous slices of `block_size`. The result should be equivalent to calling ``f(dataset)`` directly, however may require less total memory, especially if `dataset` is an HDF5 dataset. Parameters ---------- f : function The function to apply. dataset : array_like or HDF5 dataset The input dataset. block_size : int, optional The size (in number of items along `axis`) of the blocks passed to `f`. out : array_like or HDF5 dataset, optional If given, used to store the output. Returns ------- out : ndarray The result of applying `f` to `dataset` blockwise. """ # determine block size if block_size is None: if hasattr(dataset, 'chunks') and dataset.chunks is not None: # use dataset chunk size along slice axis block_size = dataset.chunks[0] else: # use arbitrary number block_size = 1000 # determine total size along slice axis dim_size = dataset.shape[0] # iterate over blocks for block_start in range(0, dim_size, block_size): block_stop = min(block_start + block_size, dim_size) # load input block x = dataset[block_start:block_stop, ...] # compute output block y = f(x) if out is None: # initialise output array out_shape = list(y.shape) out_shape[0] = dim_size out = np.empty(out_shape, y.dtype) # store output block out[block_start:block_stop, ...] = y return out
def state_transitions(x, states): """Find state transitions. Parameters ---------- x : array_like One-dimensional array of state values. states : set-like Set of states to consider (other state values are ignored). Returns ------- switch_points : ndarray Two-dimensional array of switch points, where each row stores a pair of indices corresponding to the indices either side of a switch. transitions : ndarray Two-dimensional array of transitions, where each row stores a pair of state values corresponding to states either side of a switch. """ # check inputs x = np.asarray(x) assert x.ndim == 1 # setup output variables switch_points = list() transitions = list() # utility variables prv = None prv_idx = None # iterate over state values for cur_idx, cur in enumerate(x): if cur not in states: # ignore pass else: if prv is None: # first informative state pass elif cur != prv: # record a state transition switch = prv_idx, cur_idx switch_points.append(switch) transition = prv, cur transitions.append(transition) # advance prv = cur prv_idx = cur_idx return np.array(switch_points), np.array(transitions) def tabulate_state_transitions(x, states, pos): """TODO """ # check inputs x = np.asarray(x) assert x.ndim == 1 pos = np.asarray(pos) assert pos.ndim == 1 assert x.size == pos.size switch_points, transitions = state_transitions(x, states) switch_positions = np.take(pos, switch_points) data = [('lidx', switch_points[:, 0]), ('ridx', switch_points[:, 1]), ('lpos', switch_positions[:, 0]), ('rpos', switch_positions[:, 1]), ('lval', transitions[:, 0]), ('rval', transitions[:, 1])] df = pandas.DataFrame.from_items(data) return df def tabulate_state_blocks(x, states, pos): """TODO """ # check inputs x = np.asarray(x) assert x.ndim == 1 n = x.size pos = np.asarray(pos) assert pos.ndim == 1 assert pos.size == n blocks = list() df_switches = tabulate_state_transitions(x, states, pos) for i, switch in enumerate(df_switches.values): lidx, ridx, lpos, rpos, lval, rval = switch block_stop_min_idx = lidx block_stop_max_idx = ridx block_stop_min_pos = lpos block_stop_max_pos = rpos block_state = lval if i == 0: # special case the first switch block_start_min_idx = 0 block_start_max_idx = 0 block_start_min_pos = pos[0] block_start_max_pos = pos[0] block_is_marginal = True block_size_min = block_stop_min_idx block_size_max = block_stop_max_idx - 1 else: previous_switch = df_switches.iloc[i-1] block_start_min_idx = previous_switch.lidx block_start_max_idx = previous_switch.ridx block_start_min_pos = previous_switch.lpos block_start_max_pos = previous_switch.rpos block_is_marginal = False block_size_min = block_stop_min_idx - block_start_max_idx + 1 block_size_max = block_stop_max_idx - block_start_min_idx - 1 y = x[block_start_max_idx:block_stop_min_idx+1] block_support = np.count_nonzero(y == block_state) block_length_min = block_stop_min_pos - block_start_max_pos block_length_max = block_stop_max_pos - block_start_min_pos block = (block_start_min_idx, block_start_max_idx, block_stop_min_idx, block_stop_max_idx, block_start_min_pos, block_start_max_pos, block_stop_min_pos, block_stop_max_pos, block_state, block_support, block_size_min, block_size_max, block_length_min, block_length_max, block_is_marginal) blocks.append(block) # special case the last block previous_switch = df_switches.iloc[-1] block_start_min_idx = previous_switch.lidx block_start_max_idx = previous_switch.ridx block_start_min_pos = previous_switch.lpos block_start_max_pos = previous_switch.rpos block_is_marginal = True block_stop_min_idx = n-1 block_stop_max_idx = n-1 block_stop_min_pos = pos[-1] block_stop_max_pos = pos[-1] block_state = previous_switch.rval y = x[block_start_max_idx:block_stop_min_idx+1] block_support = np.count_nonzero(y == block_state) block_size_min = block_stop_min_idx - block_start_max_idx + 1 block_size_max = block_stop_max_idx - block_start_min_idx block_length_min = block_stop_min_pos - block_start_max_pos block_length_max = block_stop_max_pos - block_start_min_pos block = (block_start_min_idx, block_start_max_idx, block_stop_min_idx, block_stop_max_idx, block_start_min_pos, block_start_max_pos, block_stop_min_pos, block_stop_max_pos, block_state, block_support, block_size_min, block_size_max, block_length_min, block_length_max, block_is_marginal) blocks.append(block) columns = ('start_min_idx', 'start_max_idx', 'stop_min_idx', 'stop_max_idx', 'start_min_pos', 'start_max_pos', 'stop_min_pos', 'stop_max_pos', 'state', 'support', 'size_min', 'size_max', 'length_min', 'length_max', 'is_marginal') df = pandas.DataFrame.from_records(blocks, columns=columns) return df