Source code for runmacs.spec.io.envifile

# -*- coding: utf-8 -*-
# Filename: envifile.py

#########################################################################
#
# envifile.py - This file is part of the Munich Aerosol Cloud Scanner package.
#
# Copyright (C) 2013 Tobias Kölling
#
# runMACS is free software; you can redistribute it and/
# or modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# Spectral Python is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software; if not, write to
#
# Free Software Foundation, Inc.
# 59 Temple Place, Suite 330
# Boston, MA 02111-1307
# USA
#
#########################################################################
#
# Send comments to:
# Tobias Kölling, tobias.koelling@physik.uni-muenchen.de
#

"""
envifile
========

This modules loads general envifiles via mmap, to get good random access performance.
"""

import numpy as np
from runmacs.spec.io.envi import dtype_map, read_envi_header

dtype_dict = dict(dtype_map)

[docs]class EnviFile(np.memmap): """ Represents an bil, bip or bsq envi-file as numpy array. Load a file, by passing the filename (without .hdr or .raw) to the constructor. After loading, the object behaves like a standary numpy array:: data = EnviFile('basefilename') The dimensions are ordered as follows:: data[lines, samples, bands] """ def __new__(cls, filename=None, suffix='raw'): if filename is not None: envi_header = read_envi_header(filename + '.hdr') try: bands = int(envi_header['bands']) #spectral resolution except KeyError: bands = 1 lines = int(envi_header['lines']) #temporal resolution samples = int(envi_header['samples']) #spatial resolution try: interleave = envi_header['interleave'] except KeyError: interleave = 'bil' # to comply with spectral python, transpose to bip shape if interleave == 'bil': shape = (lines, bands, samples) transpose = (0,2,1) elif interleave == 'bip': shape = (lines, samples, bands) transpose = None elif interleave == 'bsq': shape = (bands, lines, samples) transpose = (1,2,0) else: raise ValueError('undefined interleave scheme "%s"'%interleave) try: headerskip = int(envi_header['header offset']) except KeyError: headerskip = 0 if len(suffix) > 0: fn = filename + '.' + suffix else: fn = filename obj = np.memmap.__new__(cls, fn, dtype=dtype_dict[envi_header['data type']], mode='r', shape=shape, offset=headerskip) if transpose is not None: obj = obj.transpose(transpose) obj.envi_header = envi_header obj.envi_bands = bands obj.envi_lines = lines obj.envi_samples = samples #obj.envi_fps = float(envi_header['fps']) obj.envi_cumulative = float(envi_header.get('cumulative', 0)) obj.suffix = suffix obj.interleave = interleave else: obj = np.memmap.__new__(cls) return obj def __array_finalize__(self, obj): if obj is None: return # we are either view casting or creating from template, so copy additional infos # todo: check if it is wise to change dimensional information self.envi_header = getattr(obj, 'envi_header', None) self.envi_bands = getattr(obj, 'envi_bands', None) self.envi_lines = getattr(obj, 'envi_lines', None) self.envi_samples = getattr(obj, 'envi_samples', None) #self.envi_fps = getattr(obj, 'envi_fps', None) self.suffix = getattr(obj, 'suffix', None) self.interleave = getattr(obj, 'interleave', None)
[docs]def E(stats): """ compute average of cumulative EnviFile """ return stats[0,...]/stats.envi_cumulative
[docs]def std(stats): """ compute standard deviation of cumulative EnviFile """ return ((stats[1,...]/stats.envi_cumulative) - E(stats)**2)**.5
[docs]class EnviFileRaw(object): """ This class allows to read an ENVI file as raw bytes with some meta information, the idea is to provide a sequential an serializable interface to support parallel workers which look at the whole file. """ def __init__(self, filename, suffix='raw'): envi_header = read_envi_header(filename + '.hdr') self.envi_header = envi_header try: self.bands = int(envi_header['bands']) #spectral resolution except KeyError: self.bands = 1 self.lines = int(envi_header['lines']) #temporal resolution self.samples = int(envi_header['samples']) #spatial resolution try: self.interleave = envi_header['interleave'] except KeyError: self.interleave = 'bil' #self.fps = float(envi_header['fps']) self.dtype = np.dtype(dtype_dict[envi_header['data type']]) if len(suffix) > 0: self.datafile = open(filename + '.' + suffix) else: self.datafile = open(filename) self.currentLine = 0 self.linesize = self.dtype.itemsize * self.samples * self.bands if self.interleave == 'bil': self.lineshape = (self.bands, self.samples) elif self.interleave == 'bip': self.lineshape = (self.samples, self.bands) def read_line(self, lineno): if self.interleave not in ('bil', 'bip'): raise ValueError('cannot read continous line from "%s" interleaving'%(self.interleave,)) if lineno >= self.lines or lineno < 0: raise IndexError('lineno %d is out of range (0 ... %d)'%(lineno, self.lines-1)) if lineno != self.currentLine: self.datafile.seek(lineno * self.linesize) self.currentLine = lineno + 1 #+1 is because of the next read request return self.datafile.read(self.linesize) def close(self): self.datafile.close()