Source code for ogrppha

#!/usr/bin/env python
"""Group pha files using optimal binning based on instrument resolution"""

import argparse
import os

import numpy as np
from astropy.io import fits
from scipy.ndimage import gaussian_filter1d


[docs] def do_ogrppha(args): """core of ogrppha Parameters ---------- args: return of argparse.ArgumentParser.parse_args """ # process input # spec_file = args.spec_file out_file = args.out_file min_snr = args.snr min_counts = args.counts osample_fac = max(args.osample_fac, 1) use_formula = args.use_formula write_chan = args.write_chan # ------------------------------------------- # # Read the response and background file names # with fits.open(spec_file) as filep: src_exposure = filep['SPECTRUM'].header['EXPOSURE'] try: src_c = np.array(filep['SPECTRUM'].data.field('COUNTS'), np.double) except KeyError: src_c = np.array(filep['SPECTRUM'].data.field('RATE'), np.double) * src_exposure try: src_bsc = filep['SPECTRUM'].header['BACKSCAL'] except KeyError: src_bsc = 1.0 # this will fail if there is no RESPFILE rsp_file = filep['SPECTRUM'].header['RESPFILE'] if rsp_file.lower() == 'none': raise ValueError(f'Response file "{rsp_file}" does not exist') try: bgd_file = filep['SPECTRUM'].header['BACKFILE'] except KeyError: bgd_file = None # ------------------------------------------- # # --------------------- # # get background counts # bgd_c = np.zeros_like(src_c) if not bgd_file in [None, 'none', 'NONE']: with fits.open(bgd_file) as filep: bgd_exposure = filep['SPECTRUM'].header['EXPOSURE'] try: bgd_c = np.array(filep['SPECTRUM'].data.field('COUNTS'), np.double) except KeyError: bgd_c = np.array(filep['SPECTRUM'].data.field('RATE'), np.double) * bgd_exposure try: bgd_bsc = filep['SPECTRUM'].header['BACKSCAL'] except KeyError: bgd_bsc = 1.0 bgd_c *= (src_bsc/bgd_bsc) * (src_exposure/bgd_exposure) # incase bgd_c is extreme, cap it cap = -20 iextr = (src_c - bgd_c) < cap bgd_c[iextr] = src_c[iextr] - cap # --------------------- # # ------------------------- # # energy-channel conversion # with fits.open(rsp_file) as filep: edata = filep['EBOUNDS'].data chan = edata.field(0) energy = (edata.field(1)+edata.field(2)) / 2. try: matrix = filep['MATRIX'].data except KeyError: matrix = filep['SPECRESP MATRIX'].data nchan = len(chan) nen = len(matrix) men = np.array([(mat[1]+mat[0])/2 for mat in matrix]) # ------------------------- # # --------------------- # # loop through channels # ibin = np.zeros(nchan) - 1 ich, ibin[0] = 0, 1 smooth = nen * 1. / nchan while ich < nchan: # get the response curve at ich # ien = np.argmin(np.abs(men - energy[ich])) istart, ilen = matrix[ien][3], matrix[ien][4] if not isinstance(istart, (list, np.ndarray)): istart, ilen = [istart], [ilen] if len(istart) == 0 or len(ilen) == 0: ich += 1 continue #iarr = np.concatenate([np.arange(i1, i1+i2) for i1,i2 in zip(istart, ilen)]) rarr = matrix[ien][5] rarr = gaussian_filter1d(rarr, smooth) # work out fwhm at ich # if not np.allclose(rarr, 0) and nchan > 100: imax = np.argmax(rarr) # limits of fwhm in energy grid units ic1 = np.argmin(np.abs(rarr[:imax]-rarr[imax]/2.)) if imax !=0 else 0 ic2 = np.argmin(np.abs(rarr[imax:]-rarr[imax]/2.)) + imax width = ic2 - ic1 else: #width = nchan - ich width = 1 # get oversampling factor if we use_formula is requested # if use_formula: width = np.max([1, width]) ind = range(ich, ich+width) if ind[-1] >= nchan: ind = range(ich, nchan) counts = np.max([sum(src_c[ind] - bgd_c[ind]), 1e-10] ) counts *= 1./width # per resolution element xarr = np.log(counts*(1 + 0.20 *np.log(width))) osample_fac = 1. if xarr<2.119 else (0.08+7./xarr + 1.8/xarr**2)/(1+5.9/xarr) osample_fac = 1. / osample_fac # ----- # # bin width in channel units using oversample_fac width = np.int64(np.round(np.max([1, width*1./osample_fac]))) ind = range(ich, np.min([ich+width, nchan]) ) # increase width until snr > min_snr # if min_snr: snr = sum(src_c[ind] - bgd_c[ind]) while snr < 1 and ich+width<=nchan: width += 1 ind = range(ich, min(ich+width, nchan) ) snr = sum(src_c[ind] - bgd_c[ind]) if snr > 0: snr /= np.sqrt(sum(src_c[ind] + bgd_c[ind])) while (snr < min_snr) and (ich+width < nchan): ind = range(ich, ich+width) if ind[-1] >= nchan: ind = range(ich, nchan) break snr = sum(src_c[ind] - bgd_c[ind]) if snr <= 0: width += 1 continue snr /= np.sqrt(sum(src_c[ind] + bgd_c[ind])) width += 1 # do we have a min_counts requirement? # if min_counts: counts = sum(src_c[ind] - bgd_c[ind]) while (counts < min_counts) and (ich+width < nchan): ind = range(ich, ich+width) if ind[-1] >= nchan: ind = range(ich, nchan) break counts = sum(src_c[ind] - bgd_c[ind]) width += 1 ich += len(ind) if ich < nchan: ibin[ich] = 1 # --------------------- # # ------------------ # # write the grouping # if write_chan: # write an ascii file and use grppha # ichan = np.arange(nchan)[ibin==1] if ichan[-1] < nchan-1: ichan = np.append(ichan, nchan) bins = [] for ich in range(len(ichan)-1): bins.append([ichan[ich], ichan[ich+1]-1, ichan[ich+1]-ichan[ich]]) txt = '\n'.join([f'{xval[0]} {xval[1]} {xval[2]}' for xval in bins]) with open('tmp_chans.dat', 'w', encoding='utf-8') as filep: filep.write(txt) if os.path.exists(out_file): os.system(f'rm {out_file}') os.system(f'grppha {spec_file} {out_file} "group tmp_chans.dat&exit"') else: # modify the file with pyfits # if os.path.exists(out_file): os.system(f'rm {out_file}') os.system(f'grppha {spec_file} {out_file} "group min 20&exit" &> /dev/null') with fits.open(out_file) as filep: hdu = filep['SPECTRUM'] orig_cols = hdu.columns orig_cols['GROUPING'].array = np.array(ibin, np.int64) cols = fits.ColDefs(orig_cols) tbl = fits.BinTableHDU.from_columns(cols) hdu.header.update(tbl.header.copy()) tbl.header = hdu.header.copy() grp = fits.HDUList([filep[0],tbl]) grp.writeto(out_file, overwrite=True) print(f'Grouped file {out_file} written sucessfully')
# ------------------ #
[docs] def main(): """Group pha files using optimal binning based on the instrument resoluions. We use the formula in Kaastra+Bleeker 16, with the addition of a signal to noise constraint. The input spectrum is assumed to have the RESPFILE and BACKFILE keywords. """ parser = argparse.ArgumentParser(description=main.__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("spec_file" , metavar="spec_file", type=str, help="The name of the input spectrum file.") parser.add_argument("out_file" , metavar="out_file", type=str, help="The name of the output spectrum file.") parser.add_argument('-s', '--snr', metavar='min_snr', type=float, default=3, help='The minimum signal to noise ration per bin. -1 to ignore') parser.add_argument('-c', '--counts', metavar='min_counts', type=float, default=-1, help='The minimum counts per bin. -1 to ignore') parser.add_argument('-f', '--osample_fac', metavar='osample_fac', type=float, default=3, help='The oversampling factor in units of the local FWHM') parser.add_argument("--use_formula", action='store_true', default=False, help=('Calculate the oversampling factor using formula 36-37 ' 'in Kaastra & Bleeker 2016')) parser.add_argument("--write_chan", action='store_true', default=False, help=('Write grouping file, then use standard grppha.' 'This requires heasoft and ngroups < 100')) do_ogrppha(parser.parse_args())
if __name__ == '__main__': main()