diff --git a/kafka/inference/broadbandSAIL_tools.py b/kafka/inference/broadbandSAIL_tools.py new file mode 100644 index 0000000..447fc91 --- /dev/null +++ b/kafka/inference/broadbandSAIL_tools.py @@ -0,0 +1,97 @@ +import logging +import gdal +import numpy as np +import os + +from .utils import block_diag +from .kf_tools import propagate_single_parameter + + +def tip_prior_values(): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + # broadly TLAI 0->7 for 1sigma + sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) + x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*1.5)]) + # The individual covariance matrix + little_p = np.diag(sigma**2).astype(np.float32) + little_p[5, 2] = 0.8862*0.0959*0.2 + little_p[2, 5] = 0.8862*0.0959*0.2 + + inv_p = np.linalg.inv(little_p) + return x0, little_p, inv_p + + +class JRCPrior(object): + """Dummpy 2.7/3.6 prior class following the same interface as 3.6 only + version.""" + + def __init__(self, parameter_list, state_mask): + """It makes sense to have the list of parameters and state mask + defined at this point, as they won't change during processing.""" + self.mean, self.covar, self.inv_covar = self._tip_prior() + self.parameter_list = parameter_list + if isinstance(state_mask, (np.ndarray, np.generic) ): + self.state_mask = state_mask + else: + self.state_mask = self._read_mask(state_mask) + + def _read_mask(self, fname): + """Tries to read the mask as a GDAL dataset""" + if not os.path.exists(fname): + raise IOError("State mask is neither an array or a file that exists!") + g = gdal.Open(fname) + if g is None: + raise IOError("{:s} can't be opened with GDAL!".format(fname)) + mask = g.ReadAsArray() + return mask + + def _tip_prior(self): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + mean, c_prior, c_inv_prior = tip_prior_values() + self.mean = mean + self.covar = c_prior + self.inv_covar = c_inv_prior + return mean, c_prior, c_inv_prior + + def process_prior ( self, time, inv_cov=True): + # Presumably, self._inference_prior has some method to retrieve + # a bunch of files for a given date... + n_pixels = self.state_mask.sum() + x0 = np.array([self.mean for i in range(n_pixels)]).flatten() + if inv_cov: + inv_covar_list = [self.inv_covar for m in range(n_pixels)] + inv_covar = block_diag(inv_covar_list, dtype=np.float32) + return x0, inv_covar + else: + covar_list = [self.covar for m in range(n_pixels)] + covar = block_diag(covar_list, dtype=np.float32) + return x0, covar + + +def propagate_LAI_broadbandSAIL(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + prior=None, state_propagator=None, date=None): + """Propagate a single parameter and + set the rest of the parameter propagations to the prior. + This should be used with a prior for the remaining parameters""" + nparameters = 7 + lai_position = 6 + x_prior, c_prior, c_inv_prior = tip_prior_values() + return propagate_single_parameter(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + nparameters, lai_position, + x_prior, c_inv_prior) diff --git a/kafka/inference/kf_tools.py b/kafka/inference/kf_tools.py index 127685e..46ed9cb 100644 --- a/kafka/inference/kf_tools.py +++ b/kafka/inference/kf_tools.py @@ -16,59 +16,47 @@ class NoHessianMethod(Exception): def __init__(self, message): self.message = message -def band_selecta(band): - if band == 0: - return np.array([0, 1, 6, 2]) - else: - return np.array([3, 4, 6, 5]) - -def hessian_correction_pixel(gp, x0, C_obs_inv, innovation, band, nparams): - selecta = band_selecta(band) - ddH = gp.hessian(np.atleast_2d(x0[selecta])) - big_ddH = np.zeros((nparams, nparams)) - for i, ii in enumerate(selecta): - for j, jj in enumerate(selecta): - big_ddH[ii, jj] = ddH.squeeze()[i, j] - big_hessian_corr = big_ddH*C_obs_inv*innovation - return big_hessian_corr +def hessian_correction_pixel(hessian, C_obs_inv, innovation): + hessian_corr = hessian*C_obs_inv*innovation + return hessian_corr -def hessian_correction(gp, x0, R_mat, innovation, mask, state_mask, band, +def hessian_correction(hessian, R_mat, innovation, mask, state_mask, nparams): """Calculates higher order Hessian correction for the likelihood term. Needs the GP, the Observational uncertainty, the mask....""" - if not hasattr(gp, "hessian"): + if hessian is None: # The observation operator does not provide a Hessian method. We just # return 0, meaning no Hessian correction. return 0. C_obs_inv = R_mat.diagonal()[state_mask.flatten()] mask = mask[state_mask].flatten() + little_hess = [] - for i, (innov, C, m) in enumerate(zip(innovation, C_obs_inv, mask)): + for i, (innov, C, m, hess) in enumerate(zip(innovation, C_obs_inv, mask, hessian)): if not m: # Pixel is masked hessian_corr = np.zeros((nparams, nparams)) else: - # Get state for current pixel - x0_pixel = x0.squeeze()[(nparams*i):(nparams*(i + 1))] # Calculate the Hessian correction for this pixel - hessian_corr = m * hessian_correction_pixel(gp, x0_pixel, C, - innov, band, nparams) + hessian_corr = m * hessian_correction_pixel(hess, C, innov) little_hess.append(hessian_corr) - hessian_corr = block_diag(little_hess) + + hessian_corr = block_diag(hessian) return hessian_corr -def hessian_correction_multiband(gp, x0, R_mats, innovations, masks, state_mask, n_bands, - nparams): +def hessian_correction_multiband(hessians, R_mats, innovations, + masks, state_mask, n_bands, nparams): """ Non linear correction for the Hessian of the cost function. This handles multiple bands. """ little_hess_cor = [] - for R, innovation, mask, band in zip(R_mats, innovations, masks, range(n_bands)): - little_hess_cor.append(hessian_correction(gp, x0, R, innovation, mask, state_mask, band, - nparams)) - hessian_corr = sum(little_hess_cor) #block_diag(little_hess_cor) + for R, hessian, innovation, mask in zip( + R_mats, hessians, innovations, masks): + little_hess_cor.append(hessian_correction(hessian, R, innovation, + mask, state_mask, nparams)) + hessian_corr = sum(little_hess_cor) return hessian_corr @@ -96,43 +84,6 @@ def blend_prior(prior_mean, prior_cov_inverse, x_forecast, P_forecast_inverse): return x_combined, combined_cov_inv -def tip_prior(): - """The JRC-TIP prior in a convenient function which is fun for the whole - family. Note that the effective LAI is here defined in transformed space - where TLAI = exp(-0.5*LAIe). - - Returns - ------- - The mean prior vector, covariance and inverse covariance matrices.""" - # broadly TLAI 0->7 for 1sigma - sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) - x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*1.5)]) - # The individual covariance matrix - little_p = np.diag(sigma**2).astype(np.float32) - little_p[5, 2] = 0.8862*0.0959*0.2 - little_p[2, 5] = 0.8862*0.0959*0.2 - - inv_p = np.linalg.inv(little_p) - return x0, little_p, inv_p - -def tip_prior_noLAI(prior): - n_pixels = prior['n_pixels'] - mean, prior_cov_inverse = tip_prior(prior) - - -def tip_prior_full(prior): - # This is yet to be properly defined. For now it will create the TIP prior and - # prior just contains the size of the array - this function will be replaced with - # the real code when we know what the priors look like. - x_prior, c_prior, c_inv_prior = tip_prior() - n_pixels = prior['n_pixels'] - mean = np.array([x_prior for i in range(n_pixels)]).flatten() - c_inv_prior_mat = [c_inv_prior for n in range(n_pixels)] - prior_cov_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) - - return mean, prior_cov_inverse - - def propagate_and_blend_prior(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): @@ -240,10 +191,11 @@ def propagate_information_filter_SLOW(x_analysis, P_analysis, P_analysis_inverse S= P_analysis_inverse.dot(Q_matrix) A = (sp.eye(n) + S).tocsc() P_forecast_inverse = spl.spsolve(A, P_analysis_inverse) - logging.info("DOne with propagation") + logging.info("Done with propagation") return x_forecast, None, P_forecast_inverse + def propagate_information_filter_approx_SLOW(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): @@ -289,35 +241,40 @@ def propagate_information_filter_approx_SLOW(x_analysis, P_analysis, P_analysis_ return x_forecast, None, P_forecast_inverse -def propagate_information_filter_LAI(x_analysis, P_analysis, - P_analysis_inverse, - M_matrix, Q_matrix, - prior=None, state_propagator=None, date=None): - +def propagate_single_parameter(x_analysis, P_analysis, P_analysis_inverse, + M_matrix, Q_matrix, n_param, location, + x_prior, c_inv_prior): + """ Propagate a single parameter and + set the rest of the parameter propagations to the prior. + This should be used with a prior for the remaining parameters""" x_forecast = M_matrix.dot(x_analysis) - x_prior, c_prior, c_inv_prior = tip_prior() - n_pixels = len(x_analysis)//7 - x0 = np.array([x_prior for i in range(n_pixels)]).flatten() - x0[6::7] = x_forecast[6::7] # Update LAI - lai_post_cov = P_analysis_inverse.diagonal()[6::7] - lai_Q = Q_matrix.diagonal()[6::7] + n_pixels = len(x_analysis)//n_param + x0 = np.tile(x_prior, n_pixels) + x0[location::n_param] = x_forecast[location::n_param] # Update LAI + lai_post_cov = P_analysis_inverse.diagonal()[location::n_param] + lai_Q = Q_matrix.diagonal()[location::n_param] c_inv_prior_mat = [] - for n in range(n_pixels): + for cov, Q in zip(lai_post_cov, lai_Q): # inflate uncertainty - lai_inv_cov = 1.0/((1.0/lai_post_cov[n])+lai_Q[n]) + lai_inv_cov = 1.0/((1.0/cov)+Q) little_P_forecast_inverse = c_inv_prior.copy() - little_P_forecast_inverse[6, 6] = lai_inv_cov + little_P_forecast_inverse[location::location] = lai_inv_cov c_inv_prior_mat.append(little_P_forecast_inverse) P_forecast_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) - return x0, None, P_forecast_inverse + def no_propagation(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): - """No propagation. In this case, we return the original prior. As the + """ + THIS PROPAGATOR SHOULD NOT BE USED ANY MORE. It is better to set + the state_propagator to None and to use the Prior exlicitly. + + THIS IS ONLY SUITABLE FOR BROADBAND SAIL uses TIP prior + No propagation. In this case, we return the original prior. As the information filter behaviour is the standard behaviour in KaFKA, we only return the inverse covariance matrix. **NOTE** the input parameters are there to comply with the API, but are **UNUSED**. diff --git a/kafka/inference/narrowbandSAIL_tools.py b/kafka/inference/narrowbandSAIL_tools.py new file mode 100644 index 0000000..e2e3608 --- /dev/null +++ b/kafka/inference/narrowbandSAIL_tools.py @@ -0,0 +1,82 @@ +import logging +import gdal +import numpy as np +import os + +from .utils import block_diag +from .kf_tools import propagate_single_parameter + +def sail_prior_values(): + """ + :returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + #parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', + # 'lai', 'ala', 'bsoil', 'psoil'] + mean = np.array([2.1, np.exp(-60. / 100.), + np.exp(-7.0 / 100.), 0.1, + np.exp(-50 * 0.0176), np.exp(-100. * 0.002), + np.exp(-4. / 2.), 70. / 90., 0.5, 0.9]) + sigma = np.array([0.01, 0.2, + 0.01, 0.05, + 0.01, 0.01, + 0.50, 0.1, 0.1, 0.1]) + + covar = np.diag(sigma ** 2).astype(np.float32) + inv_covar = np.diag(1. / sigma ** 2).astype(np.float32) + return mean, covar, inv_covar + +class SAILPrior(object): + def __init__(self, parameter_list, state_mask): + self.parameter_list = parameter_list + if isinstance(state_mask, (np.ndarray, np.generic)): + self.state_mask = state_mask + else: + self.state_mask = self._read_mask(state_mask) + mean, c_prior, c_inv_prior = sail_prior_values() + self.mean = mean + self.covar = c_prior + self.inv_covar = c_inv_prior + + + def _read_mask(self, fname): + """Tries to read the mask as a GDAL dataset""" + if not os.path.exists(fname): + raise IOError("State mask is neither an array or a file that exists!") + g = gdal.Open(fname) + if g is None: + raise IOError("{:s} can't be opened with GDAL!".format(fname)) + mask = g.ReadAsArray() + return mask + + def process_prior ( self, time, inv_cov=True): + # Presumably, self._inference_prior has some method to retrieve + # a bunch of files for a given date... + n_pixels = self.state_mask.sum() + x0 = np.array([self.mean for i in range(n_pixels)]).flatten() + if inv_cov: + inv_covar_list = [self.inv_covar for m in range(n_pixels)] + inv_covar = block_diag(inv_covar_list, dtype=np.float32) + return x0, inv_covar + else: + covar_list = [self.covar for m in range(n_pixels)] + covar = block_diag(covar_list, dtype=np.float32) + return x0, covar + + +def propagate_LAI_narrowbandSAIL(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + prior=None, state_propagator=None, date=None): + ''' Propagate a single parameter and + set the rest of the parameter propagations to zero + This should be used with a prior for the remaining parameters''' + nparameters = 10 + lai_position = 6 + + x_prior, c_prior, c_inv_prior = sail_prior_values() + return propagate_single_parameter(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + nparameters, lai_position, + x_prior, c_inv_prior) diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index 1a43e4b..c7bd5c4 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -125,7 +125,8 @@ def create_linear_observation_operator(obs_op, n_params, metadata, def create_nonlinear_observation_operator(n_params, emulator, metadata, - mask, state_mask, x_forecast, band): + mask, state_mask, x_forecast, + band, calc_hess=False): """Using an emulator of the nonlinear model around `x_forecast`. This case is quite special, as I'm focusing on a BHR SAIL version (or the JRC TIP), which have spectral parameters @@ -171,12 +172,25 @@ def create_nonlinear_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") - return (H0, H_matrix.tocsr()) + if calc_hess: + ddH = emulator.hessian(x0[mask[state_mask]]) + hess = np.zeros((n_times, n_params, n_params)) + for n, (lil_hess, m) in enumerate(zip(ddH, mask[state_mask].flatten())): + if m: + big_hess = np.zeros((n_params, n_params)) + for i, ii in enumerate(state_mapper): + for j, jj in enumerate(state_mapper): + big_hess[ii, jj] = lil_hess.squeeze()[i, j] + hess[n,...] = big_hess + + return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) + def create_prosail_observation_operator(n_params, emulator, metadata, - mask, state_mask, x_forecast, band): + mask, state_mask, x_forecast, + band, calc_hess=False): """Using an emulator of the nonlinear model around `x_forecast`. This case is quite special, as I'm focusing on a BHR SAIL version (or the JRC TIP), which have spectral parameters @@ -213,7 +227,18 @@ def create_prosail_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") - return (H0, H_matrix.tocsr()) + if calc_hess: + hess = np.zeros((n_times, n_params, n_params), + dtype=np.float32) + hess_ = emulator.hessian(x0[mask[state_mask]]) + + n = 0 + for i, m in enumerate(mask[state_mask].flatten()): + if m: + hess[i, ...] = hess_[n] + n += 1 + + return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) @@ -340,7 +365,7 @@ def spsolve2(a, b): a_lu = spl.splu(a.tocsc()) # LU decomposition for sparse a out = sp.lil_matrix((a.shape[1], b.shape[1]), dtype=np.float32) b_csc = b.tocsc() - for j in xrange(b.shape[1]): + for j in range(b.shape[1]): bb = np.array(b_csc[j, :].todense()).squeeze() out[j, j] = a_lu.solve(bb)[j] return out.tocsr() diff --git a/kafka/input_output/observations.py b/kafka/input_output/observations.py index cb6022e..8e291f6 100644 --- a/kafka/input_output/observations.py +++ b/kafka/input_output/observations.py @@ -325,7 +325,7 @@ def get_band_data(self, the_date, band_no): class KafkaOutput(object): """A very simple class to output the state.""" def __init__(self, parameter_list, geotransform, projection, folder, - fmt="GTiff"): + fmt="GTiff", state_folder:str=None): """The inference engine works on tiles, so we get the tilewidth (we assume the tiles are square), the GDAL-friendly geotransform and projection, as well as the destination directory and the @@ -333,6 +333,7 @@ def __init__(self, parameter_list, geotransform, projection, folder, self.geotransform = geotransform self.projection = projection self.folder = folder + self.state_folder = state_folder self.fmt = fmt self.parameter_list = parameter_list @@ -352,6 +353,7 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, A = np.zeros(state_mask.shape, dtype=np.float32) A[state_mask] = x_analysis[ii::n_params] dst_ds.GetRasterBand(1).WriteArray(A) + for ii, param in enumerate(self.parameter_list): fname = os.path.join(self.folder, "%s_%s_unc.tif" % (param, timestep.strftime("A%Y%j"))) @@ -366,7 +368,20 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, A[state_mask] = 1./np.sqrt(P_analysis_inv.diagonal()[ii::n_params]) dst_ds.GetRasterBand(1).WriteArray(A) + if not self.state_folder == None and os.path.exists( self.state_folder ): + # Dump to disk P_analysis_inv as sparse matrix in npz + fname = os.path.join(self.state_folder, "P_analysis_inv_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + sp.save_npz( fname, P_analysis_inv ) + + fname = os.path.join(self.state_folder, "P_analysis_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + np.savez( fname, P_analysis ) + # Dump as well the whole x_analysis in a single vector + fname = os.path.join(self.state_folder, "x_analysis_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + np.savez( fname, x_analysis ) if __name__ == "__main__": emulator = "../SAIL_emulator_both_500trainingsamples.pkl" diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index f4d1ce0..c9d9e05 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -37,7 +37,6 @@ from .inference import create_linear_observation_operator from .inference import create_nonlinear_observation_operator from .inference import iterate_time_grid -from .inference import propagate_information_filter_LAI # eg from .inference import hessian_correction from .inference import hessian_correction_multiband from .inference.kf_tools import propagate_and_blend_prior @@ -64,7 +63,7 @@ class LinearKalman (object): rather grotty "0-th" order models!""" def __init__(self, observations, output, state_mask, create_observation_operator, parameters_list, - state_propagation=propagate_information_filter_LAI, + state_propagation=None, linear=True, diagnostics=True, prior=None): """The class creator takes (i) an observations object, (ii) an output writer object, (iii) the state mask (a boolean 2D array indicating which @@ -184,8 +183,8 @@ def run(self, time_grid, x_forecast, P_forecast, P_forecast_inverse, x_forecast, P_forecast, P_forecast_inverse = self.advance( x_analysis, P_analysis, P_analysis_inverse, self.trajectory_model, self.trajectory_uncertainty) - is_first = False + if len(locate_times) == 0: # Just advance the time x_analysis = x_forecast @@ -257,14 +256,22 @@ def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, # Also extract single band information from nice package # this allows us to use the same interface as current # Deferring processing to a new solver method in solvers.py - - H_matrix_= self._create_observation_operator(self.n_params, + + try: + H_matrix_= self._create_observation_operator(self.n_params, data.emulator, data.metadata, data.mask, self.state_mask, x_prev, - band) + band, + calc_hess = False) + except ValueError as e: + if (sum(data.mask[self.state_mask])== 0): + LOG.error("All observations masked out") + raise + + H_matrix.append(H_matrix_) Y.append(data.observations) MASK.append(data.mask) @@ -305,12 +312,27 @@ def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, # Once we have converged... # Correct hessian for higher order terms #split_points = [m.sum( ) for m in MASK] + HESSIAN = [] INNOVATIONS = np.split(innovations, n_bands) - P_correction = hessian_correction_multiband(data.emulator, x_analysis, + for band, data in enumerate(current_data): + # calculate the hessian for the solution + _,_,hessian= self._create_observation_operator(self.n_params, + data.emulator, + data.metadata, + data.mask, + self.state_mask, + x_analysis, + band, + calc_hess = True) + HESSIAN.append(hessian) + P_correction = hessian_correction_multiband(HESSIAN, UNC, INNOVATIONS, MASK, self.state_mask, n_bands, self.n_params) P_analysis_inverse = P_analysis_inverse - P_correction + # Rarely, this returns a small negative value. For now set to nan. + # May require further investigation in the future + P_analysis_inverse[P_analysis_inverse<0] = np.nan # Done with this observation, move along... @@ -407,8 +429,17 @@ def assimilate_band(self, band, timestep, x_forecast, P_forecast, data.uncertainty, innovations, data.mask, self.state_mask, band, self.n_params) + # UPDATE HESSIAN WITH HIGHER ORDER CONTRIBUTION P_analysis_inverse = P_analysis_inverse - P_correction - # P_analysis_inverse = UPDATE HESSIAN WITH HIGHER ORDER CONTRIBUTION + # Rarely, this returns a small negative value. For now set to nan. + # May require further investigation in the future + negative_values = P_analysis_inverse<0 + if any(negative_values): + P_analysis_inverse[negative_values] = np.nan + LOG.warning("{} negative values in inverse covariance matrix".format( + sum(negative_values))) + + import matplotlib.pyplot as plt M = self.state_mask*1. M[self.state_mask] = x_analysis[6::7] diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index a1d86f1..1c4e889 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -1,17 +1,16 @@ #!/usr/bin/env python import logging -logging.basicConfig( - level=logging.getLevelName(logging.DEBUG), +logging.basicConfig( + level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="the_log.log") + filename="logfiles/MODIS_demo.log") import os from datetime import datetime, timedelta import numpy as np import gdal import osr -import scipy.sparse as sp # from multiply.inference-engine blah blah blah try: @@ -19,18 +18,17 @@ except ImportError: pass -import kafka from kafka.input_output import BHRObservations, KafkaOutput from kafka import LinearKalman -from kafka.inference import block_diag -from kafka.inference import propagate_information_filter_LAI -from kafka.inference import no_propagation +from kafka.inference.broadbandSAIL_tools import propagate_LAI_broadbandSAIL from kafka.inference import create_nonlinear_observation_operator +from kafka.inference.broadbandSAIL_tools import JRCPrior # Probably should be imported from somewhere else, but I can't see # where from ATM... No biggy + def reproject_image(source_img, target_img, dstSRSs=None): """Reprojects/Warps an image to fit exactly another image. Additionally, you can set the destination SRS if you want @@ -54,73 +52,6 @@ def reproject_image(source_img, target_img, dstSRSs=None): dstSRS=dstSRS) return g - - -###class DummyInferencePrior(_WrappingInferencePrior): - ###""" - ###This class is merely a dummy. - ###""" - - ###def process_prior(self, parameters: List[str], time: Union[str, datetime], state_grid: np.array, - - -class JRCPrior(object): - """Dummpy 2.7/3.6 prior class following the same interface as 3.6 only - version.""" - - def __init__ (self, parameter_list, state_mask): - """It makes sense to have the list of parameters and state mask - defined at this point, as they won't change during processing.""" - self.mean, self.covar, self.inv_covar = self._tip_prior() - self.parameter_list = parameter_list - if isinstance(state_mask, (np.ndarray, np.generic) ): - self.state_mask = state_mask - else: - self.state_mask = self._read_mask(state_mask) - - def _read_mask(self, fname): - """Tries to read the mask as a GDAL dataset""" - if not os.path.exists(fname): - raise IOError("State mask is neither an array or a file that exists!") - g = gdal.Open(fname) - if g is None: - raise IOError("{:s} can't be opened with GDAL!".format(fname)) - mask = g.ReadAsArray() - return mask - - def _tip_prior(self): - """The JRC-TIP prior in a convenient function which is fun for the whole - family. Note that the effective LAI is here defined in transformed space - where TLAI = exp(-0.5*LAIe). - - Returns - ------- - The mean prior vector, covariance and inverse covariance matrices.""" - # broadly TLAI 0->7 for 1sigma - sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) - x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*2.)]) - # The individual covariance matrix - little_p = np.diag(sigma**2).astype(np.float32) - little_p[5, 2] = 0.8862*0.0959*0.2 - little_p[2, 5] = 0.8862*0.0959*0.2 - - inv_p = np.linalg.inv(little_p) - return x0, little_p, inv_p - - def process_prior ( self, time, inv_cov=True): - # Presumably, self._inference_prior has some method to retrieve - # a bunch of files for a given date... - n_pixels = self.state_mask.sum() - x0 = np.array([self.mean for i in range(n_pixels)]).flatten() - if inv_cov: - inv_covar_list = [self.inv_covar for m in range(n_pixels)] - inv_covar = block_diag(inv_covar_list, dtype=np.float32) - return x0, inv_covar - else: - covar_list = [self.covar for m in xrange(n_pixels)] - covar = block_diag(covar_list, dtype=np.float32) - return x0, covar - class KafkaOutputMemory(object): """A very simple class to output the state.""" def __init__(self, parameter_list): @@ -134,57 +65,101 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, self.output[timestep] = solution +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + + if __name__ == "__main__": - + + # Set up logging + Log = logging.getLogger(__name__+".kafka_test_x.py") + + runname = 'Bondville_0-05' #Used in output directory as a unique identifier + + # To run without propagation set propagator to None and set a + # prior in LinearKalman. + # If propagator is set to propagate_LAI_broadbandSAIL then the + # prior in LinearKalman must be set to None because this propagator + # includes a prior + propagator = propagate_LAI_broadbandSAIL parameter_list = ["w_vis", "x_vis", "a_vis", - "w_nir", "x_nir", "a_nir", "TeLAI"] - + "w_nir", "x_nir", "a_nir", "TeLAI"] + + ## parameters for Bondville data. tile = "h11v04" - start_time = "2006185" - + start_time = "2006001" + time_grid_start = datetime(2006, 1, 1) + num_days = 366 + mcd43a1_dir="/data/MODIS/h11v04/MCD43" + ## Bondville chip + masklim = ((2200, 2400), (450, 700)) # Bondville, h11v04 + + ## Parameters for Spanish tile + #tile = "h17v05" # Spain + #start_time = "2017001" # Spain + #time_grid_start = datetime(2017, 1, 1) + #num_days = 366 + #mcd43a1_dir="/data/MODIS/h17v05/MCD43" + ## chips in h17v05 Spain, select one + #masklim = ((650, 730), (1180, 1280)) # Arros, rice + #masklim = ((900,940), (1300,1340)) = True # Alcornocales + #masklim = ((640,700), (1400,1500)) = True # Campinha + + + + path = "/home/npounder/output/kafka/demo/MODIS/kafkaout_{}".format(runname) + if not os.path.exists(path): + mkdir_p(path) + emulator = "./SAIL_emulator_both_500trainingsamples.pkl" - - if os.path.exists("/storage/ucfajlg/Ujia/MCD43/"): - mcd43a1_dir = "/storage/ucfajlg/Ujia/MCD43/" - else: - mcd43a1_dir="/data/MODIS/h11v04/MCD43" - ####tilewidth = 75 - ###n_pixels = tilewidth*tilewidth + + mask = np.zeros((2400,2400),dtype=np.bool8) - #mask[900:940, 1300:1340] = True # Alcornocales - #mask[640:700, 1400:1500] = True # Campinha - #mask[650:730, 1180:1280] = True # Arros - mask[ 2200:2395, 450:700 ] = True # Bondville, h11v04 + mask[masklim[0][0]:masklim[0][1], + masklim[1][0]:masklim[1][1]] = True + + bhr_data = BHRObservations(emulator, tile, mcd43a1_dir, start_time, + end_time=None, mcd43a2_dir=None, period=8) - bhr_data = BHRObservations(emulator, tile, mcd43a1_dir, start_time, - end_time=None, mcd43a2_dir=None) + Log.info("propagator = {}".format(propagator)) + Log.info("tile = {}".format(tile)) + Log.info("start_time = {}".format(start_time)) + Log.info("mask = {}".format(masklim)) projection, geotransform = bhr_data.define_output() - output = KafkaOutput(parameter_list, geotransform, projection, "/tmp/") + output = KafkaOutput(parameter_list, geotransform, projection, path) + # If using a separate prior then this is passed to LinearKalman + # Otherwise this is just used to set the starting state vector. the_prior = JRCPrior(parameter_list, mask) - - kf = LinearKalman(bhr_data, output, mask, + + # prior = None when using propagate_LAI_broadbandSAIL + kf = LinearKalman(bhr_data, output, mask, create_nonlinear_observation_operator,parameter_list, - state_propagation=None, - prior=the_prior, + state_propagation=propagator, + prior=None, linear=False) # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) - + + # Inflation amount for propagation Q = np.zeros_like(x_forecast) - Q[6::7] = 0.025 - + Q[6::7] = 0.05 + kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) - - base = datetime(2006,7,4) - num_days = 180 + + # This determines the time grid of the retrieved state parameters time_grid = [] - for x in range( 0, num_days, 16): - time_grid.append( base + timedelta(days = x) ) + for x in range( 0, num_days, 8): + time_grid.append(time_grid_start + timedelta(days=x)) - kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) - + kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) \ No newline at end of file diff --git a/kafka_test_S2.py b/kafka_test_S2.py index 6f7c7b9..01b106e 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -4,7 +4,8 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="the_log.log") + filename="logfiles/demo_Campinha_2017_Q0-05.log") + import os from datetime import datetime, timedelta import numpy as np @@ -28,14 +29,13 @@ import kafka from kafka.input_output import Sentinel2Observations, KafkaOutput from kafka import LinearKalman -from kafka.inference import block_diag -from kafka.inference import propagate_information_filter_LAI -from kafka.inference import no_propagation +from kafka.inference.narrowbandSAIL_tools import propagate_LAI_narrowbandSAIL from kafka.inference import create_prosail_observation_operator - +from kafka.inference.narrowbandSAIL_tools import SAILPrior # Probably should be imported from somewhere else, but I can't see # where from ATM... No biggy + def reproject_image(source_img, target_img, dstSRSs=None): """Reprojects/Warps an image to fit exactly another image. Additionally, you can set the destination SRS if you want @@ -59,82 +59,6 @@ def reproject_image(source_img, target_img, dstSRSs=None): dstSRS=dstSRS) return g - - -###class DummyInferencePrior(_WrappingInferencePrior): - ###""" - ###This class is merely a dummy. - ###""" - - ###def process_prior(self, parameters: List[str], time: Union[str, datetime], state_grid: np.array, - -class SAILPrior(object): - def __init__ (self, parameter_list, state_mask): - self.parameter_list = parameter_list - if isinstance(state_mask, (np.ndarray, np.generic) ): - self.state_mask = state_mask - else: - self.state_mask = self._read_mask(state_mask) - #parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', - # 'lai', 'ala', 'bsoil', 'psoil'] - #self.mean = np.array([1.19, np.exp(-14.4/100.), - #np.exp(-4.0/100.), 0.1, - #np.exp(-50*0.68), np.exp(-100./21.0), - #np.exp(-3.97/2.),70./90., 0.5, 0.9]) - #sigma = np.array([0.69, 0.016, - #0.0086, 0.1, - #1.71e-2, 0.017, - #0.20, 0.5, 0.5, 0.5]) - self.mean = np.array([2.1, np.exp(-60./100.), - np.exp(-7.0/100.), 0.1, - np.exp(-50*0.0176), np.exp(-100.*0.002), - np.exp(-4./2.), 70./90., 0.5, 0.9]) - sigma = np.array([0.01, 0.2, - 0.01, 0.05, - 0.01, 0.01, - 0.50, 0.1, 0.1, 0.1]) - - self.covar = np.diag(sigma**2).astype(np.float32) - self.inv_covar = np.diag(1./sigma**2).astype(np.float32) - #self.inv_covar[3,3]=0 - ########self.mean = - ########self.variance = - ########lai_m2_m2: [3.1733 1.7940] - - ########cab_ug_cm2: [14.4369 1.8284] - - ########car_u_cm2: [3.9650 0.8948] - - ########cdm_g_cm2: [20.9675 6.4302] - - ########cw_cm: [0.6781 0.6749] - - ########N_: [1.1937 0.6902] - - def _read_mask(self, fname): - """Tries to read the mask as a GDAL dataset""" - if not os.path.exists(fname): - raise IOError("State mask is neither an array or a file that exists!") - g = gdal.Open(fname) - if g is None: - raise IOError("{:s} can't be opened with GDAL!".format(fname)) - mask = g.ReadAsArray() - return mask - - def process_prior ( self, time, inv_cov=True): - # Presumably, self._inference_prior has some method to retrieve - # a bunch of files for a given date... - n_pixels = self.state_mask.sum() - x0 = np.array([self.mean for i in range(n_pixels)]).flatten() - if inv_cov: - inv_covar_list = [self.inv_covar for m in range(n_pixels)] - inv_covar = block_diag(inv_covar_list, dtype=np.float32) - return x0, inv_covar - else: - covar_list = [self.covar for m in range(n_pixels)] - covar = block_diag(covar_list, dtype=np.float32) - return x0, covar - class KafkaOutputMemory(object): """A very simple class to output the state.""" def __init__(self, parameter_list): @@ -147,55 +71,106 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, solution[param] = x_analysis[ii::7] self.output[timestep] = solution +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise if __name__ == "__main__": - + + # Set up logging + Log = logging.getLogger(__name__+".kafka_test_x.py") + + + runname = 'Campinha_2017_Q0-05' #Used in output directory as a unique identifier + + # To run without propagation set propagator to None and set a + # prior in LinearKalman. + # If propagator is set to propagate_LAI_narrowbandSAIL then the + # prior in LinearKalman must be set to None because this propagator + # includes a prior + propagator = propagate_LAI_narrowbandSAIL + parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', 'lai', 'ala', 'bsoil', 'psoil'] - - start_time = "2017001" - + + #start_time = "2017001" + #time_grid_start = datetime(2017, 1, 1) + start_time = "2017049" + time_grid_start = datetime(2017, 2, 18) + num_days = 366 + + Log.info("propagator = {}".format(propagator)) + Log.info("start_time = {}".format(start_time)) + + path = "/tmp/kafka/demo/S2/kafkaout_{}".format(runname) + if not os.path.exists(path): + mkdir_p(path) + #emulator_folder = "/home/ucfafyi/DATA/Multiply/emus/sail/" - emulator_folder = "/home/glopez/Multiply/src/py36/emus/sail" - + #emulator_folder = "/home/glopez/Multiply/src/py36/emus/sail" + emulator_folder = "/data/archive/emulators/s2_prosail" + #data_folder = "/data/nemesis/S2_data/30/S/WJ/" - data_folder = "/data/001_planet_sentinel_study/sentinel/30/S/WJ" + #data_folder = "/data/001_planet_sentinel_study/sentinel/30/S/WJ" + data_folder = "/data/S2_L2/30/S/WJ/" state_mask = "./Barrax_pivots.tif" - + + s2_observations = Sentinel2Observations(data_folder, - emulator_folder, + emulator_folder, state_mask) projection, geotransform = s2_observations.define_output() output = KafkaOutput(parameter_list, geotransform, - projection, "/tmp/") + projection, path) + # If using a separate prior then this is passed to LinearKalman + # Otherwise this is just used to set the starting state vector. the_prior = SAILPrior(parameter_list, state_mask) g = gdal.Open(state_mask) mask = g.ReadAsArray().astype(np.bool) + # prior = None when using propagate_LAI_narrowbandSAIL kf = LinearKalman(s2_observations, output, mask, create_prosail_observation_operator, parameter_list, - state_propagation=None, - prior=the_prior, + state_propagation=propagator, + prior=None,#the_prior, linear=False) - # Get starting state... We can request the prior object for this - x_forecast, P_forecast_inv = the_prior.process_prior(None) - + # Check if there's a prior from a previous timestep + P_inv_fname = "P_analysis_inv_%s.npz" % time_grid_start.strftime("A%Y%j") + P_inv_fname = os.path.join( path, P_inv_fname ) + + x_fname = "x_analysis_%s.npz" % time_grid_start.strftime("A%Y%j") + x_fname = os.path.join( path, x_fname ) + + if os.path.exists( P_inv_fname ) and os.path.exists( x_fname ): + # Load stored matrices... + x_forecast = np.load( x_fname )['arr_0'] + P_forecast_inv = sp.load_npz( P_inv_fname ) + else: + # Get starting state... We can request the prior object for this + x_forecast, P_forecast_inv = the_prior.process_prior(None) + + # Inflation amount for propagation Q = np.zeros_like(x_forecast) - + Q[6::10] = 0.05 + kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) - - base = datetime(2017,7,3) - num_days = 10 - time_grid = list((base + timedelta(days=x) + + # This determines the time grid of the retrieved state parameters + time_grid = list((time_grid_start + timedelta(days=x) for x in range(0, num_days, 2))) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) - +