"""
Utilities for locating samples and variants.
See also the examples at:
- http://nbviewer.ipython.org/github/alimanfoo/anhima/blob/master/examples/loc.ipynb
""" # noqa
from __future__ import division, print_function, unicode_literals, \
absolute_import
# standard library dependencies
import random
# third party dependencies
import numpy as np
import numexpr
import matplotlib.pyplot as plt
import scipy.stats
[docs]def view_sample(a, selection, all_samples=None):
"""View a single column from the array `a` corresponding to a selected
sample.
Parameters
----------
a : array_like
An array with 2 or more dimensions, where the second dimension
corresponds to samples.
selection : int or object
A sample identifier or column index.
all_samples : sequence, optional
A sequence (e.g., list) of sample identifiers corresponding to the
second dimension of `a`, used to map `selection` to a column index. If
not given, assume `selection` is a column index.
Returns
-------
b : ndarray
An array obtained from `a` by taking the column corresponding to the
selected sample.
"""
# check a is an array of 2 or more dimensions
a = np.asarray(a)
assert a.ndim > 1
if all_samples is None:
# assume selection is an index
index = selection
else:
# check length of samples dimension is as expected
assert a.shape[1] == len(all_samples)
# make sure it's a list
all_samples = list(all_samples)
# check selection is in all_samples
assert selection in all_samples
# determine indices for selected samples
index = all_samples.index(selection)
# view a column from the array
b = a[:, index, ...]
return b
[docs]def take_samples(a, selection, all_samples=None):
"""Extract columns from the array `a` corresponding to selected samples.
Parameters
----------
a : array_like
An array with 2 or more dimensions, where the second dimension
corresponds to samples.
selection : sequence of ints or objects
A sequence of sample identifiers or column indices.
all_samples : sequence, optional
A sequence (e.g., list) of sample identifiers corresponding to the
second dimension of `a`, used to map `selection` to column indices. If
not given, assume `selection` is a sequence of column indices.
Returns
-------
b : ndarray
An array obtained from `a` by taking columns corresponding to the
selected samples.
"""
# check a is an array of 2 or more dimensions
a = np.asarray(a)
assert a.ndim > 1
if all_samples is None:
# assume selection is column indices
indices = selection
else:
# check length of samples dimension is as expected
assert a.shape[1] == len(all_samples)
# make sure it's a list
all_samples = list(all_samples)
# check selections are in all_samples
assert all([s in all_samples for s in selection])
# determine indices for selected samples
indices = [all_samples.index(s) for s in selection]
# take columns from the array
b = np.take(a, indices, axis=1)
return b
[docs]def query_variants(expression, variants):
"""Evaluate `expression` with respect to the given `variants`.
Parameters
----------
expression : string
The query expression to apply. The expression will be evaluated by
:mod:`numexpr` against the provided `variants`.
variants : dict-like
The variables to include in scope for the expression evaluation.
Returns
-------
result : ndarray
The result of evaluating `expression` against `variants`.
"""
result = numexpr.evaluate(expression, local_dict=variants)
return result
[docs]def compress_variants(a, condition):
"""Extract rows from the array `a` corresponding to a boolean `condition`.
Parameters
----------
a : array_like
An array to extract rows from (e.g., genotypes).
condition : array_like, bool
A 1-D boolean array of the same length as the first dimension of `a`.
Returns
-------
b : ndarray
An array obtained from `a` by taking rows corresponding to the
selected variants.
See Also
--------
take_variants, numpy.compress
"""
# check dimensions and sizes
a = np.asarray(a)
condition = np.asarray(condition)
assert a.ndim >= 1
assert condition.ndim == 1
assert a.shape[0] == condition.shape[0]
# compress rows from the input array
b = np.compress(condition, a, axis=0)
return b
[docs]def take_variants(a, indices, mode='raise'):
"""Extract rows from the array `a` corresponding to `indices`.
Parameters
----------
a : array_like
An array to extract rows from (e.g., genotypes).
indices : sequence of integers
The variant indices to extract.
mode : {'raise', 'wrap', 'clip'}, optional
Specifies how out-of-bounds indices will behave.
Returns
-------
b : ndarray
An array obtained from `a` by taking rows corresponding to the
selected variants.
See Also
--------
compress_variants, numpy.take
"""
# check dimensions and sizes
a = np.asarray(a)
assert a.ndim >= 1
# take rows from the input array
b = np.take(a, indices, axis=0, mode=mode)
return b
[docs]def locate_position(pos, p):
"""Locate the index of coordinate `p` within sorted array of genomic
positions `pos`.
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig, with no duplicates.
p : int
The position to locate.
Returns
-------
index : int or None
The index of `p` in `pos` if present, else None.
See Also
--------
locate_positions, locate_interval, locate_intervals
"""
# check inputs
pos = np.asarray(pos)
# find position
index = np.searchsorted(pos, p)
print(p, index)
if index < pos.size and pos[index] == p:
return index
else:
return None
[docs]def view_position(a, pos, p):
"""View a slice along the first dimension of `a` corresponding to a
genome position.
Parameters
----------
a : array_like
The array to extract from.
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig, with no duplicates.
p : int
The position to locate.
Returns
-------
b : ndarray
A view of `a` obtained by slicing along the first dimension.
See Also
--------
locate_position
"""
# normalise inputs
a = np.asarray(a)
# determine region slice
index = locate_position(pos, p)
if index is not None:
return a[index, ...]
else:
return None
[docs]def locate_interval(pos, start_position=0, stop_position=None):
"""Locate the start and stop indices within the `pos` array that include all
positions within the `start_position` and `stop_position` range.
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
start_position : int
Start position of interval.
stop_position : int
Stop position of interval
Returns
-------
loc : slice
A slice object with the start and stop indices that include all
positions within the interval.
See Also
--------
locate_position, locate_positions, locate_intervals
"""
# check inputs
pos = np.asarray(pos)
# locate start and stop indices
start_index = np.searchsorted(pos, start_position)
stop_index = np.searchsorted(pos, stop_position, side=b'right') \
if stop_position is not None else None
loc = slice(start_index, stop_index)
return loc
[docs]def view_interval(a, pos, start_position, stop_position):
"""View a contiguous slice along the first dimension of `a`
corresponding to a genome interval defined by `start_position` and
`stop_position`.
Parameters
----------
a : array_like
The array to extract from.
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
start_position : int
Start position of interval.
stop_position : int
Stop position of interval
Returns
-------
b : ndarray
A view of `a` obtained by slicing along the first dimension.
See Also
--------
locate_interval
"""
# normalise inputs
a = np.asarray(a)
# determine region slice
loc = locate_interval(pos, start_position, stop_position)
return a[loc, ...]
[docs]def locate_positions(pos1, pos2):
"""Find the intersection of two sets of positions.
Parameters
----------
pos1, pos2 : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig, with no duplicates.
Returns
-------
cond1 : ndarray, bool
An array of the same length as `pos1` where an element is True if the
corresponding item in `pos1` is also found in `pos2`.
cond2 : ndarray, bool
An array of the same length as `pos2` where an element is True if the
corresponding item in `pos2` is also found in `pos1`.
See Also
--------
locate_position, locate_interval, locate_intervals
"""
# check inputs
pos1 = np.asarray(pos1)
pos2 = np.asarray(pos2)
# find intersection
cond1 = np.in1d(pos1, pos2, assume_unique=True)
cond2 = np.in1d(pos2, pos1, assume_unique=True)
return cond1, cond2
[docs]def locate_intervals(pos, start_positions, stop_positions):
"""Locate items within the `pos` array that fall within any of the
intervals given by `start_positions` and `stop_positions`.
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
start_positions : array_like, int
Start positions of intervals.
stop_positions : array_like, int
Stop positions of intervals
Returns
-------
cond1 : ndarray, bool
An array of the same length as `pos` where an element is True if the
corresponding item in `pos` is also found in any of the intervals.
cond2 : ndarray, bool
An array of the same length as the number of intervals, where an
element is True if the corresponding interval contains one or more
positions in `pos`.
See Also
--------
locate_position, locate_positions, locate_interval
"""
# check inputs
pos = np.asarray(pos)
assert pos.ndim == 1
start_positions = np.asarray(start_positions)
stop_positions = np.asarray(stop_positions)
assert start_positions.ndim == stop_positions.ndim == 1
assert start_positions.shape[0] == stop_positions.shape[0]
# find indices of start and stop positions in pos
start_indices = np.searchsorted(pos, start_positions)
stop_indices = np.searchsorted(pos, stop_positions, side=b'right')
# find intervals overlapping at least one position
cond2 = start_indices < stop_indices
# find positions within at least one interval
cond1 = np.zeros_like(pos, dtype=np.bool)
for i, j in zip(start_indices[cond2], stop_indices[cond2]):
cond1[i:j] = True
return cond1, cond2
[docs]def plot_variant_locator(pos, step=1, ax=None, start_position=None,
stop_position=None, flip=False, line_args=None):
"""
Plot lines indicating the physical genome location of variants. By
default the top x axis is in variant index space, and the bottom x axis
is in genome position space.
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
step : int, optional
Plot a line for every `step` variants.
ax : axes, optional
The axes on which to draw. If not provided, a new figure will be
created.
start_position : int, optional
The start position for the region over which to work.
stop_position : int, optional
The stop position for the region over which to work.
flip : bool, optional
Flip the plot upside down.
line_args : dict-like
Additional keyword arguments passed through to `plt.Line2D`.
Returns
-------
ax : axes
The axes on which the plot was drawn
"""
# set up axes
if ax is None:
fig = plt.figure(figsize=(7, 1))
ax = fig.add_subplot(111)
# determine x axis limits
if start_position is None:
start_position = np.min(pos)
if stop_position is None:
stop_position = np.max(pos)
ax.set_xlim(start_position, stop_position)
# plot the lines
if line_args is None:
line_args = dict()
line_args.setdefault('linewidth', .5)
n_variants = len(pos)
for i, p in enumerate(pos[::step]):
xfrom = p
xto = (
start_position +
((i * step / n_variants) * (stop_position-start_position))
)
l = plt.Line2D([xfrom, xto], [0, 1], **line_args)
ax.add_line(l)
# invert?
if flip:
ax.invert_yaxis()
ax.xaxis.tick_top()
else:
ax.xaxis.tick_bottom()
# tidy up
ax.set_yticks([])
for l in 'left', 'right':
ax.spines[l].set_visible(False)
return ax
[docs]def windowed_variant_counts(pos, window_size, start_position=None,
stop_position=None):
"""Count variants in non-overlapping windows over the genome.
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
window_size : int
The size in base-pairs of the windows.
start_position : int, optional
The start position for the region over which to work.
stop_position : int, optional
The stop position for the region over which to work.
Returns
-------
counts : ndarray, int
The number of variants in each window.
bin_edges : ndarray, int
The edge positions of each window. Note that this has length
``len(counts)+1``. To determine bin centers use
``(bin_edges[:-1] + bin_edges[1:]) / 2``. To determine bin widths use
``np.diff(bin_edges)``.
See Also
--------
windowed_variant_counts_plot, windowed_variant_density
"""
# determine bins
if stop_position is None:
stop_position = np.max(pos)
if start_position is None:
start_position = np.min(pos)
bin_edges = np.append(
np.arange(start_position, stop_position, window_size), stop_position
)
# make a histogram of positions
counts, _ = np.histogram(pos, bins=bin_edges)
return counts, bin_edges
[docs]def plot_windowed_variant_counts(pos, window_size, start_position=None,
stop_position=None,
ax=None, plot_kwargs=None):
"""Plot windowed variant counts.
Parameters
----------
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
window_size : int
The size in base-pairs of the windows.
start_position : int, optional
The start position for the region over which to work.
stop_position : int, optional
The stop position for the region over which to work.
ax : axes, optional
The axes on which to draw. If not provided, a new figure will be
created.
plot_kwargs : dict-like
Additional keyword arguments passed through to `plt.plot`.
Returns
-------
ax : axes
The axes on which the plot was drawn.
See Also
--------
windowed_variant_counts, windowed_variant_density_plot
"""
# set up axes
if ax is None:
fig = plt.figure(figsize=(7, 2))
ax = fig.add_subplot(111)
# count variants
y, bin_edges = windowed_variant_counts(pos, window_size,
start_position=start_position,
stop_position=stop_position)
# calculate bin centers
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
# plot data
if plot_kwargs is None:
plot_kwargs = dict()
plot_kwargs.setdefault('linestyle', '-')
plot_kwargs.setdefault('marker', None)
ax.plot(bin_centers, y, **plot_kwargs)
# tidy up
ax.set_ylim(bottom=0)
ax.set_xlabel('position')
ax.set_ylabel('count')
if start_position is None:
start_position = np.min(pos)
if stop_position is None:
stop_position = np.max(pos)
ax.set_xlim(start_position, stop_position)
return ax
[docs]def windowed_variant_density(pos, window_size, start_position=None,
stop_position=None):
"""Calculate per-base-pair density of variants in non-overlapping windows
over the genome.
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
window_size : int
The size in base-pairs of the windows.
start_position : int, optional
The start position for the region over which to work.
stop_position : int, optional
The stop position for the region over which to work.
Returns
-------
density : ndarray, int
The density of variants in each window.
bin_edges : ndarray, int
The edge positions of each window. Note that this has length
``len(density)+1``. To determine bin centers use
``(bin_edges[:-1] + bin_edges[1:]) / 2``. To determine bin widths use
``np.diff(bin_edges)``.
See Also
--------
windowed_variant_density_plot, windowed_variant_counts
"""
# count variants in windows
counts, bin_edges = windowed_variant_counts(pos, window_size,
start_position=start_position,
stop_position=stop_position)
bin_widths = np.diff(bin_edges)
# convert to per-base-pair density
density = counts / bin_widths
return density, bin_edges
[docs]def plot_windowed_variant_density(pos, window_size, start_position=None,
stop_position=None,
ax=None, plot_kwargs=None):
"""Plot windowed variant density.
Parameters
----------
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
window_size : int
The size in base-pairs of the windows.
start_position : int, optional
The start position for the region over which to work.
stop_position : int, optional
The stop position for the region over which to work.
ax : axes, optional
The axes on which to draw. If not provided, a new figure will be
created.
plot_kwargs : dict-like
Additional keyword arguments passed through to `plt.plot`.
Returns
-------
ax : axes
The axes on which the plot was drawn.
See Also
--------
windowed_variant_density, windowed_variant_counts_plot
"""
# set up axes
if ax is None:
fig = plt.figure(figsize=(7, 2))
ax = fig.add_subplot(111)
# count variants
y, bin_edges = windowed_variant_density(pos, window_size,
start_position=start_position,
stop_position=stop_position)
# calculate bin centers
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
# plot data
if plot_kwargs is None:
plot_kwargs = dict()
plot_kwargs.setdefault('linestyle', '-')
plot_kwargs.setdefault('marker', None)
ax.plot(bin_centers, y, **plot_kwargs)
# tidy up
ax.set_ylim(bottom=0)
ax.set_xlabel('position')
ax.set_ylabel('density')
if start_position is None:
start_position = np.min(pos)
if stop_position is None:
stop_position = np.max(pos)
ax.set_xlim(start_position, stop_position)
return ax
[docs]def windowed_statistic(pos, values, window_size,
start_position=None,
stop_position=None,
statistic='mean'):
"""Calculate a statistic for `values` binned in non-overlapping windows
over the genome.
Parameters
----------
pos : array_like
A sorted 1-dimensional array of genomic positions from a single
chromosome/contig.
values : array_like
A 1-D array of the same length as `pos`.
window_size : int
The size in base-pairs of the windows.
start_position : int, optional
The start position for the region over which to work.
stop_position : int, optional
The stop position for the region over which to work.
statistic : string or function
The function to apply to values in each bin.
Returns
-------
stats : ndarray
The values of the statistic within each bin.
bin_edges : ndarray
The edge positions of each window. Note that this has length
``len(stats)+1``. To determine bin centers use
``(bin_edges[:-1] + bin_edges[1:]) / 2``. To determine bin widths use
``np.diff(bin_edges)``.
"""
# determine bins
if stop_position is None:
stop_position = np.max(pos)
if start_position is None:
start_position = np.min(pos)
bin_edges = np.append(
np.arange(start_position, stop_position, window_size), stop_position
)
# compute binned statistic
stats, _, _ = scipy.stats.binned_statistic(pos, values=values,
statistic=statistic,
bins=bin_edges)
return stats, bin_edges
[docs]def evenly_downsample_variants(a, k):
"""Evenly downsample an array along the first dimension to length `k` (or
as near as possible), assuming the first dimension corresponds to variants.
Parameters
----------
a : array_like
The array to downsample.
k : int
The target number of variants.
Returns
-------
b : array_like
A downsampled view of `a`.
"""
# normalise inputs
a = np.asarray(a)
# determine length of first dimension
n_variants = a.shape[0]
# determine step
step = max(1, int(n_variants/k))
# take slice
b = a[::step, ...]
return b
[docs]def randomly_downsample_variants(a, k):
"""Evenly downsample an array along the first dimension to length `k`,
assuming the first dimension corresponds to variants.
Parameters
----------
a : array_like
The array to downsample.
k : int
The k number of variants.
Returns
-------
b : array_like
A downsampled copy of `a`.
"""
# normalise inputs
a = np.asarray(a)
# determine length of first dimension
n_variants = a.shape[0]
# sample indices
indices = sorted(random.sample(range(n_variants), k))
# apply selection
b = np.take(a, indices, axis=0)
return b