""" This script is a combined workflow for merge/slope calculation/offset modelling using canopy height/new coordinates generation Meant for CHTC processing """ import argparse import glob import os import gdal import numpy as np import statsmodels.api as sm from skimage import measure def npz_merge(image, npz_fold, output_name): """ Merging npz results back to the source image dimension Args: image: image directory and name npz_fold: folder containing all npz files output_name: output name for the merged result Returns: """ source = gdal.Open(image) img_base = os.path.basename(image) offset_array = np.ones((source.RasterYSize, source.RasterXSize, 3)) * -99 # npz_fold = "%s/Downloads/cheese_npz/" % home npzs = glob.glob("%s/%s*.npz" % (npz_fold, img_base)) for npz in npzs: start, end = [int(x) for x in os.path.splitext(npz)[0].split('_')[-2:]] temp = np.load(npz)['a'] offset_array[start:end, :, :] = temp # output_name = "%s/%s" % (output_dir, img_base.replace("g.tif", "offset.tif")) datatype = gdal.GDT_Float32 # Set the output raster transform and projection properties driver = gdal.GetDriverByName("GTIFF") tiff = driver.Create(output_name, source.RasterXSize, source.RasterYSize, 3, datatype) tiff.SetGeoTransform(source.GetGeoTransform()) tiff.SetProjection(source.GetProjection()) # Write bands to file for band in range(3): tiff.GetRasterBand(band + 1).WriteArray(offset_array[:, :, band]) tiff.GetRasterBand(band + 1).SetNoDataValue(-99) del tiff, driver def slope_cal(image): """ Calculate the slope for the offset result Args: image: directory and filename for the offset image (output from npz_merge) Returns: """ for i, d in enumerate(['y', 'x']): output_filename = image.replace(".tif", "_slope_%s.tif" % d) os.system("gdaldem slope %s %s -b %s" % (image, output_filename, i + 1)) def export_new_coords(src_img, image, CanopyHeight): """ model offsets based on the pixel location and canopy height. Generate new coords for each pixel. Args: src_img: directory and filename for the original green band image image:directory and filename for the offset image. CanopyHeight: directory and filename for canopy height image Returns: write the new coords image to the same location """ def offset_label(off, labels, r): """ mask out the invalid offset values based on connectivity labels note: for this function to be compatible with jit, use off.flatten(), labels.flatten(), etc. Args: off: np array The offset matrx labels: np array. Connectivity label matrix for the offset r: the offset correlation coefficient. Same dimension as offset Returns: mased offset data """ for label in np.unique(labels): class_mask = labels == label if class_mask.sum() < 300: off[class_mask] = np.nan if r[class_mask].mean() < 20: off[class_mask] = np.nan return off slopex = gdal.Open("%s_slope_%s.tif" % (image[:-4], 'x')) slopex = slopex.GetRasterBand(1).ReadAsArray() slopey = gdal.Open("%s_slope_%s.tif" % (image[:-4], 'y')) slopey = slopey.GetRasterBand(1).ReadAsArray() offset = gdal.Open(image) # Canopy Height CH = gdal.Open(CanopyHeight) CH = CH.GetRasterBand(1).ReadAsArray() #CH = CH[0:-1, 0:-1] source = gdal.Open(src_img) off_y = offset.GetRasterBand(1).ReadAsArray().astype(float) off_x = offset.GetRasterBand(2).ReadAsArray().astype(float) r = offset.GetRasterBand(3).ReadAsArray().astype(float) off_x[off_x == -99] = np.nan off_x[slopex > 1] = np.nan labels_x = measure.label(off_x, connectivity=1) off_x = offset_label(off_x, labels_x, r) off_y[off_y == -99] = np.nan off_y[slopey > 1] = np.nan labels_y = measure.label(off_y, connectivity=1) off_y = offset_label(off_y, labels_y, r) mask_y = ~np.isnan(off_y) mask_x = ~np.isnan(off_x) Y_idx = np.where(mask_y) # using row, col, and canopy height as predictors X = np.column_stack([Y_idx[0], Y_idx[1], CH[mask_y]]) y = off_y[mask_y] model_y = sm.OLS(y, sm.add_constant(X)).fit() x_idx = np.where(mask_x) y = off_x[mask_x] # using row, col, and canopy height as predictors X = np.column_stack([x_idx[0], x_idx[1], CH[mask_x]]) model_x = sm.OLS(y, sm.add_constant(X)).fit() # Make prediction using the model: row, col = np.indices(CH.shape) i, a, b, c = model_y.params y_offset = i + a * row + b * col + c * CH i, a, b, c = model_x.params x_offset = i + a * row + b * col + c * CH # Obtain the map coordinate for the raw image: GT = source.GetGeoTransform() Xgeo = GT[0] + col * GT[1] + row * GT[2] Ygeo = GT[3] + col * GT[4] + row * GT[5] Xgeo += x_offset Ygeo -= y_offset # Write to disk: # Set the output raster transform and projection properties output_name = image.replace('offset.tif', 'newCoordinate') driver = gdal.GetDriverByName("GTIFF") tiff = driver.Create(output_name, source.RasterXSize, source.RasterYSize, 2, gdal.GDT_Float32) tiff.SetGeoTransform(source.GetGeoTransform()) tiff.SetProjection(source.GetProjection()) # Write bands to file tiff.GetRasterBand(1).WriteArray(Xgeo) tiff.GetRasterBand(2).WriteArray(Ygeo) del tiff, driver def main(): # argument parsing: parser = argparse.ArgumentParser(description='offset npz merging and post processing') parser.add_argument('-img', help='directory and filename for the original img', required=True, type=str) parser.add_argument('-npz_fold', help='directory for the offset npzs', required=True, type=str) parser.add_argument('-out_fold', help='directory for the output results', required=True, type=str) parser.add_argument('-CHimg', help='directory and filename for the canopy height img', required=True, type=str) args = parser.parse_args() # merge the npz: img_base = os.path.basename(args.img) output_name = args.out_fold + img_base.replace("g.tif", "offset.tif") npz_merge(args.img, args.npz_fold, output_name) # calculate slope: image = output_name slope_cal(image) # construct new coords: export_new_coords(args.img, image, args.CHimg) if __name__ == "__main__": main()