Source code for GAMCR.dataset.dataset

import numpy as np
from scipy.interpolate import BSpline
import matplotlib.pyplot as plt
import pickle
from .datagam import DataGAM
from pygam import LinearGAM, s, f
from collections import defaultdict
import scipy as sp
import scipy
import pandas as pd
from copy import deepcopy
import os

[docs] class Dataset(): """ A class used to preprocess, load and save data and models ... Attributes ---------- max_lag : int maximum lag time consider for the transfer functions features : dic dictionary of the different features used in the model n_splines : int number of splines considered for a GAM lam : positive float regularization parameter related to the smoothing penalty in the GAM Methods ------- load_model(path_model, lam=None) Load the model saved in the file with path: path_model save_model_parameters(save_folder, name='', add_dic={}) Save the model parameters save_batch_common_GAM(allGISID, save_folder, ntest=0, nstart=0, nfiles=40) Preprocess and save the data. It makes sure to use the same knots for the GAMs for the different sites. save_batch( save_folder, datafile, nstart=0, nfiles=100) Preprocess and save the data. get_fluxes(datafile, nstart=0, ntest=0, size_time_window=None) Load the data and get the PET, precipitation, dates and streamflow time series. This method is called by the 'save_batch' type methods. get_design(pet, x, y, dates) Compute the design matrix of the GAMs. This method is called by the 'save_batch' type methods. get_GAMdesign(X, J) Compute the matrix that is used in the convolution to get the streamflow values. load_data(save_folder, max_files=100, test_mode=False) Load the data that has been already proprecessed and saved using one of the 'save_batch' type methods. compute_spline_basis(show_splines = False) Compute the basis functions (B-splines) one which we decompose the transfer functions. """ def __init__(self, max_lag=24*30*2, features={}, n_splines=10, lam=10): self.m = max_lag self.compute_spline_basis() self.L = self.basis_splines.shape[0] if not('p' in features): features['p'] = [24*15,24*30,24*30*6,24*30*12,24*30*16] if not('pet' in features): features['pet'] = [24*15,24*30,24*30*2,24*30*6] self.features = features self.init = max([np.max(self.features['p']), np.max(self.features['pet'])]) self.gam = DataGAM(self.L, n_splines=n_splines, lam=lam)
[docs] def load_model(self, path_model, lam=None): """Load the model saved in the file with path: path_model Parameters ---------- path_model : str The location of the file where the model has been saved lam : positive float, optional Regularization parameter for the smoothing penalty used when fitting the GAM """ with open(path_model, 'rb') as handle: params = pickle.load(handle) if lam is None: lam = params['gam']['lam'] self.m = params['m'] self.compute_spline_basis() self.L = self.basis_splines.shape[0] self.init = params['init'] self.features = params['features'] self.feature_names = params['feature_names'] self.gam = DataGAM(self.L, n_splines=params['gam']['n_splines'], lam=lam) self.gam.init_gam_from_knots(params['gam']['edge_knots_'], params['gam']['m_features'], coeffs=params['gam'].get('coeffs', None))
[docs] def save_model_parameters(self, save_folder, name='', add_dic={}): params = add_dic params.update({'m':self.m, 'init':self.init, 'features':self.features, 'feature_names':self.feature_names, 'gam': self.gam.get_params() }) if name=='': with open('{0}/params.pkl'.format(save_folder), 'wb') as handle: pickle.dump(params, handle, protocol=pickle.HIGHEST_PROTOCOL) else: with open('{0}/{1}.pkl'.format(save_folder, name), 'wb') as handle: pickle.dump(params, handle, protocol=pickle.HIGHEST_PROTOCOL)
[docs] def save_batch_common_GAM(self, allGISID, save_folder, ntest=0, nstart=0, nfiles=40): """Preprocess and save the data for all sites in allGISID. It makes sure to use the same knots for the GAMs for the different sites. This was needed in our paper when trying to fit a model predicting model coefficients from catchment's features. Otherwise, coefficients learned at different sites would not correspond to the same quantities. Parameters ---------- allGISID : list List of the names of the sites save_folder : str Path where we can find a folder '{site}' for all '{site}' in allGISID. In this folder, the .txt file with the data at the corresponding site should be saved. ntest : int, optional Number of more recent time points that should be discarded. nstart : int, optional Number of older time points that should be discarded. nfiles : int, optional The preprocessed data will be splitted and saved in different files (to potentially speed up the loading process of the data if only some fraction of the total dataset is needed). This integer specifies the number of files used to split the data. """ init = self.init m = self.m common_edge_knots = None for GISID in allGISID: import os path_root = os.path.join(save_folder, str(GISID)) path = os.path.join(path_root, 'data/') if not os.path.exists(path): os.makedirs(path) pathfile = os.path.join(path_root, 'data_{0}.txt'.format(GISID)) pet_train, x_train, y_train, dates_train, timeyear_train, pet_test, x_test, y_test, dates_test, timeyear_test = self.get_fluxes(pathfile, ntest=ntest, nstart=nstart) X = self.get_design(pet_train, x_train, y_train, timeyear_train) self.gam.init_gam_from_design(X) if common_edge_knots is None: common_edge_knots = deepcopy(self.gam.edge_knots_) else: for i, l in enumerate(common_edge_knots): common_edge_knots[i][0] = min([self.gam.edge_knots_[i][0], l[0]]) common_edge_knots[i][1] = max([self.gam.edge_knots_[i][1], l[1]]) self.save_model_parameters(path, name='params_local') np.save(os.path.join(path,'X_temp.npy'), X) np.save(os.path.join(path,'y_temp.npy'), y_train) #np.save(save_folder+'indexes_temp.npy'.format(l), np.array([i for i in range()])) np.save(os.path.join(path,'timeyear_temp.npy'), np.array(timeyear_train)) np.save(os.path.join(path,'dates_temp.npy'), np.array(dates_train)) np.save(os.path.join(path,'J_temp.npy'), np.array(x_train)) self.gam.init_gam_from_knots(common_edge_knots, self.gam.m_features) for GISID in allGISID: path_root = os.path.join(save_folder, str(GISID)) path = os.path.join(path_root, 'data/') X = np.load(os.path.join(path,'X_temp.npy')) J = np.load(os.path.join(path,'J_temp.npy')) timeyear = np.load(os.path.join(path,'timeyear_temp.npy')) dates = np.load(os.path.join(path,'dates_temp.npy'), allow_pickle=True) y = np.load(os.path.join(path,'y_temp.npy')) # os.remove(path+'X_temp.npy') # os.remove(path+'timeyear_temp.npy') # os.remove(path+'dates_temp.npy') # os.remove(path+'y_temp.npy') # os.remove(path+'J_temp.npy') matJ = self.get_GAMdesign(X, J) X = X[m+init:,:] Y = y[m+init:] dates = dates[m+init:] timeyear = timeyear[m+init:] nt = matJ.shape[0] for l in range(nfiles-1): low, up = l*(nt//nfiles), (l+1)*(nt//nfiles) if (l==(nfiles-2)): up = nt np.save(os.path.join(path,'matJ_{0}.npy'.format(l)), matJ[low:up,:,:]) np.save(os.path.join(path,'X_{0}.npy'.format(l)), X[low:up,:]) np.save(os.path.join(path,'y_{0}.npy'.format(l)), Y[low:up]) np.save(os.path.join(path,'indexes_{0}.npy'.format(l)), np.array([i for i in range(low,up)])) np.save(os.path.join(path,'timeyear_{0}.npy'.format(l)), np.array(timeyear[low:up])) np.save(os.path.join(path,'dates_{0}.npy'.format(l)), np.array(dates[low:up])) self.save_model_parameters(path)
[docs] def save_batch(self, save_folder, datafile, nstart=0, nfiles=100): """Preprocess and save the data for all sites in allGISID. Parameters ---------- save_folder : str Path where we can find a folder '{site}' for the site under study. In this folder, the .txt file with the data at the corresponding site should be saved. nstart : int, optional Number of older time points that should be discarded. nfiles : int, optional The preprocessed data will be splitted and saved in different files (to potentially speed up the loading process of the data if only some fraction of the total dataset is needed). This integer specifies the number of files used to split the data. """ nfiles += 1 init = self.init m = self.m import os if not os.path.exists(save_folder): os.makedirs(save_folder) pet_train, x_train, y_train, dates_train, timeyear_train, pet_test, x_test, y_test, dates_test, timeyear_test = self.get_fluxes(datafile, ntest=0, nstart=nstart) X = self.get_design(pet_train, x_train, y_train, timeyear_train) self.gam.init_gam_from_design(X) matJ = self.get_GAMdesign(X, x_train) X = X[m+init:,:] Y = y_train[m+init:] dates = dates_train[m+init:] timeyear = timeyear_train[m+init:] nt = matJ.shape[0] for l in range(nfiles-1): low, up = l*(nt//nfiles), (l+1)*(nt//nfiles) if (l==(nfiles-2)): up = nt np.save(os.path.join(save_folder,'matJ_{0}.npy'.format(l)), matJ[low:up,:,:]) np.save(os.path.join(save_folder,'X_{0}.npy'.format(l)), X[low:up,:]) np.save(os.path.join(save_folder,'y_{0}.npy'.format(l)), Y[low:up]) np.save(os.path.join(save_folder,'indexes_{0}.npy'.format(l)), np.array([i for i in range(low,up)])) np.save(os.path.join(save_folder,'timeyear_{0}.npy'.format(l)), np.array(timeyear[low:up])) np.save(os.path.join(save_folder,'dates_{0}.npy'.format(l)), np.array(dates[low:up])) self.save_model_parameters(save_folder)
[docs] def get_fluxes(self, datafile, nstart=0, ntest=0, size_time_window=None): """Load the data and get the PET, precipitation, dates and streamflow time series. This method is called by the 'save_batch' type methods. Parameters ---------- datafile : str Path of the .txt file with the data at the corresponding site. nstart : int, optional Number of older time points that should be discarded. """ df = pd.read_csv(datafile) df = df.fillna(0) x = df['p'].to_numpy() y = df['q'].to_numpy() pet = df['pet'].to_numpy() m = self.m init = self.init if size_time_window is None: size_time_window = len(y) ntrain = init+size_time_window dates = df['date'].to_numpy() timeyear = df['timeyear'].to_numpy() n = len(dates) if ntest is None: ntest = init+size_time_window y_test = y[nstart+ntrain:min(nstart+ntrain+ntest,n)] x_test = x[nstart+ntrain:min(nstart+ntrain+ntest,n)] pet_test = pet[nstart+ntrain:min(nstart+ntrain+ntest,n)] dates_test = dates[nstart+ntrain:min(nstart+ntrain+ntest,n)] timeyear_test = timeyear[nstart+ntrain:min(nstart+ntrain+ntest,n)] y_train = y[nstart:nstart+ntrain] x_train = x[nstart:nstart+ntrain] pet_train = pet[nstart:nstart+ntrain] dates_train = dates[nstart:nstart+ntrain] timeyear_train = timeyear[nstart:nstart+ntrain] return pet_train, x_train, y_train, dates_train, timeyear_train, pet_test, x_test, y_test, dates_test, timeyear_test
[docs] def get_design(self, pet, x, y, dates): """Compute the matrix that is used in the convolution to get the streamflow values. Parameters ---------- pet : array Potential evapotranspiration. x : array Precipitation time series. y : array Streamflow time series. dates : array Time series with dates. """ m = self.m init = self.init nfeat = self.basis_splines.shape[0] nb_features = len(self.features['pet']) + len(self.features['p']) ages_max_x = self.features['p'] ages_max_pet = self.features['pet'] nfeatures_out_pet_p = 2 self.feature_names = ['precipitation intensity', 'last 2 hours precipitation intensity'] if self.features.get('date', False): nfeatures_out_pet_p += 1 self.feature_names.append('date') if self.features.get('timeyear', False): nfeatures_out_pet_p += 2 self.feature_names.append('cos yeartime') self.feature_names.append('sin yeartime') for el in ages_max_x: self.feature_names.append('Weighted avg precipitation over the last {0} hours'.format(el)) for el in ages_max_pet: self.feature_names.append('Weighted avg pet over the last {0} hours'.format(el)) nb_features += nfeatures_out_pet_p Nt = len(x)-1 X = np.zeros((Nt,nb_features)) J = x for i in range(init,Nt): t = int(i)+1 arg_perio = (dates[i]-int(dates[i]))*2*np.pi for j in range(nb_features): if j<=1: X[i,j] = np.sum(J[t-j-1:t]) elif self.feature_names[j]=='date': X[i,j] = dates[i]-dates[0] elif self.feature_names[j]=='cos yeartime': X[i,j] = np.cos(arg_perio) elif self.feature_names[j]=='sin yeartime': X[i,j] = np.sin(arg_perio) elif (j>=nfeatures_out_pet_p) and (j-nfeatures_out_pet_p<len(ages_max_x)): ls = np.cumsum(np.ones(ages_max_x[j-nfeatures_out_pet_p])) weights = np.flip( np.exp(-4*ls/ls[-1]) ) weights /= np.sum(weights) X[i,j] = np.sum(J[t-ages_max_x[j-nfeatures_out_pet_p]:t]*weights) else: jp = j-nfeatures_out_pet_p-len(ages_max_x) ls = np.cumsum(np.ones(ages_max_pet[jp])) weights = np.flip( np.exp(-4*ls/ls[-1]) ) weights /= np.sum(weights) X[i,j] = np.sum(pet[t-ages_max_pet[jp]:t]*weights) return X
[docs] def get_GAMdesign(self, X, J): """Compute the matrix that is used in the convolution to get the streamflow values. Parameters ---------- X : array Design matrix of the GAM compute from the method 'get_design'. X has dimension: number of timepoints x number of features J : array Precipitation time series. """ m = self.m init = self.init nfeat = self.basis_splines.shape[0] Nt = len(J)-1 A = self.gam._modelmat(X) matJ = np.zeros((Nt-m-init,nfeat,A.shape[1])) for i in range(m+init,Nt): vec = np.flip(J[i-m+1:i+1]) mat = np.flip(A[i-m+1:i+1,:], axis=0) # A: nt x nft basis: L x matJ[i-m-init,:,:] = np.sum(vec[None,:,None] * self.basis_splines[:,:,None] * mat[None,:,:], axis=1) return matJ
[docs] def load_data(self, save_folder, max_files=100, test_mode=False): """Load the data that has been already proprecessed and saved using one of the 'save_batch' type methods. Parameters ---------- save_folder : str Path of the folder where the data is stored. max_files : int Total number of files among the ones saved by the 'save_batch' type method to load. test_mode : bool If False, the oldest data will be loaded. Otherwise, the more recent one is loaded. """ id_sub_files = np.sort(np.array([name[5:-4] for name in os.listdir(save_folder) if ('matJ' in name)]).astype(int)) if test_mode: id_files_2_load = id_sub_files[-max_files:] else: id_files_2_load = id_sub_files[:max_files] for ite, id_file in enumerate(id_files_2_load): if ite==0: X = np.load(os.path.join(save_folder,'X_{0}.npy'.format(id_file))) matJ = np.load(os.path.join(save_folder,'matJ_{0}.npy'.format(id_file))) y = np.load(os.path.join(save_folder,'y_{0}.npy'.format(id_file))) timeyear = np.load(os.path.join(save_folder,'timeyear_{0}.npy'.format(id_file))) dates = np.load(os.path.join(save_folder,'dates_{0}.npy'.format(id_file)), allow_pickle=True) else: Xtemp = np.load(os.path.join(save_folder,'X_{0}.npy'.format(id_file))) matJtemp = np.load(os.path.join(save_folder,'matJ_{0}.npy'.format(id_file))) ytemp = np.load(os.path.join(save_folder,'y_{0}.npy'.format(id_file))) timeyeartemp = np.load(os.path.join(save_folder,'timeyear_{0}.npy'.format(id_file))) datestemp = np.load(os.path.join(save_folder,'dates_{0}.npy'.format(id_file)), allow_pickle=True) X = np.concatenate((X,Xtemp), axis=0) matJ = np.concatenate((matJ,matJtemp), axis=0) timeyear = np.concatenate((timeyear,timeyeartemp), axis=0) dates = np.concatenate((dates,datestemp), axis=0) y = np.concatenate((y,ytemp), axis=0) return X, matJ, y, timeyear, dates
[docs] def compute_spline_basis(self, show_splines = False): """Compute the basis functions (B-splines) one which we decompose the transfer functions. Parameters ---------- show_splines : bool, optional If True, a figure showing the basis functions will be produced. """ m = self.m knots_ref = [] val = 0 step = 1 while val<=m: knots_ref.append(val) val += step step *= 2 nk = len(knots_ref) degree = 2 n = nk+degree+1 knots = np.concatenate((min(knots_ref)*np.ones(degree),knots_ref,max(knots_ref)*np.ones(degree))) c = np.zeros(n) # Generate B-spline basis functions basis_functions = [] evaluation_points = np.linspace(min(knots), max(knots), m) basis_values = [] for i in range(n): c[i] = 1 basis = BSpline(knots, c, degree) basis_values.append(basis(evaluation_points)) c[i] = 0 basis_values = np.array(basis_values)[1:-3,:] if show_splines: # Plot the basis functions for i, basis_values_i in enumerate(basis_values): plt.plot(evaluation_points, basis_values_i, label=f'Basis {i + 1}') plt.title('B-spline Basis Functions') plt.xlabel('x') plt.ylabel('Basis Function Value') plt.show() self.basis_splines = basis_values self.knots_splines = knots_ref