Source code for pottok

# -*- coding: utf-8 -*-
# ================================================================
#              _   _        _
#             | | | |      | |
#  _ __   ___ | |_| |_ ___ | | __
# | '_ \ / _ \| __| __/ _ \| |/ /
# | |_) | (_) | |_| || (_) |   <
# | .__/ \___/ \__|\__\___/|_|\_\
# | |
# |_|
# ================================================================
# @author: Olivia Bernardoff, Nicolas Karasiak, Yousra Hamrouni & David Sheeren
# @git: https://github.com/obernardoff/pottok/
# ================================================================
"""
The :mod:`pottok` module gathers available classes and function for `pottok`.
"""

from . import datasets

# general libraries


import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.metrics import accuracy_score, mean_squared_error
from sklearn.preprocessing import StandardScaler  # centrer-réduire
from itertools import product
import ot
import museotoolbox as mtb
import gdal
import pickle

__version__ = "0.1-rc2"


[docs]class OptimalTransportGridSearch: """ Initialize Python Optimal Transport suitable for validation. Parameters ---------- transport_function: class of ot.da, optional (default=ot.da.MappingTransport) from ot.da. e.g ot params_ot : dict, optional (default=None) parameters of the optimal transport funtion. verbose : boolean, optional (default=True) Gives informations about the object """
[docs] def __init__(self, transport_function=ot.da.MappingTransport, params=None, verbose=True): # stockage self.transport_function = transport_function self.params_ot = params self.verbose = verbose
def _convert_to_float64(self,array): """ If array is integer, convert to float64. In any case, return array ! """ if np.issubdtype(array.dtype, np.integer): array = array.astype(np.float64) return array def preprocessing(self, Xs, ys=None, Xt=None, yt=None, group_s=None, group_t=None, scaler=False): """ Stock the input parameters in the object and scaled it if it is asked. Parameters ---------- Xs : array_like, shape (n_source_samples, n_features) Source domain array. ys : array_like, shape (n_source_samples,) Label source array (1d). Xt: array_like, shape (n_source_samples, n_features) Target domain array. yt: array_like, shape (n_source_samples,) Label target array (1d). scaler: scale function (default=False) The function used to scale Xs and Xt """ self._share_args(Xs=self._convert_to_float64(Xs), ys=ys, Xt=self._convert_to_float64(Xt), yt=yt, group_s=group_s, group_t=group_t, scaler=scaler) self._prefit(Xs, Xt) def fit_circular(self, metrics=mean_squared_error, greater_is_better=False): """ Learn domain adaptation model with circular tuning (fitting). Parameters ----------- metrics : function, optional (default=mean_squared_error) Need a function that takes two array as parameters greater_is_better : bool, optional (default=False) If mean_squared_error, the lower is the better. Else, if overall accuracy fo rexample greater_is_better is True. Returns -------- transport_model : object The output model fitted """ self.metrics = metrics self.greater_is_better = greater_is_better print(self.Xs) if self._is_grid_search(): self._find_best_parameters_circular( self.Xs, ys=self.ys, Xt=self.Xt, yt=self.yt) else: self.transport_model = self.transport_function(**self.params_ot) self.transport_model.fit( self.Xs, ys=self.ys, Xt=self.Xt, yt=self.yt) return self.transport_model def fit_crossed(self, cv_ai=StratifiedKFold(n_splits=2, shuffle=True, random_state=21), cv_ot=StratifiedKFold(n_splits=2, shuffle=True, random_state=42), classifier=RandomForestClassifier(random_state=42), parameters=dict(n_estimators=[100]), yt_use=True, Xt_valid=None, Xt_test=None, yt_valid=None, yt_test=None): """ Learn domain adaptation model with crossed tuning (fitting). Parameters ------- group_s: array_like, shape (n_source_samples,) Polygon group of each label (1d) group_t: array_like, shape (n_source_samples,) Polygon group of each label (1d) cv_ai: cross-validation function cv used for the classifier learning. Allowed function from museotoolbox as scikit-learn. cv_ot: cross-validation function cv used for the tratest_split. Allowed function from museotoolbox as scikit-learn. classifier: training algorithm (default=RandomForestClassifier) Returns ------- transport_model : object The output model fitted """ self._share_args( cv_ot=cv_ot, cv_ai=cv_ai, classifier=classifier, parameters=parameters, yt_use=yt_use) if Xt_valid is not None : self._share_args( Xt_valid=Xt_valid, Xt_test=Xt_test, yt_valid=yt_valid, yt_test=yt_test) else : if self.group_s is None: Xt_valid, Xt_test, yt_valid, yt_test = mtb.cross_validation.train_test_split( cv=cv_ot, X=self.Xt, y=self.yt) self._share_args( Xt_valid=Xt_valid, Xt_test=Xt_test, yt_valid=yt_valid, yt_test=yt_test) else: Xt_valid, Xt_test, yt_valid, yt_test, groupt_valid, groupt_test = mtb.cross_validation.train_test_split( cv=cv_ai, X=self.Xt, y=self.yt, groups=self.group_t) self._share_args( Xt_valid=Xt_valid, Xt_test=Xt_test, yt_valid=yt_valid, yt_test=yt_test, groupt_valid=groupt_valid, groupt_test=groupt_test) # model with input parameters self._model = GridSearchCV(classifier, parameters, cv=cv_ai) if self.params_ot is None and self.transport_function == ot.da.SinkhornTransport or self.transport_function == ot.da.EMDTransport: self.transport_model = self.transport_function() self.transport_model.fit(self.Xs, Xt=self.Xt) elif self._is_grid_search(): self._find_best_parameters_crossed( self.Xs, ys=self.ys, Xt=self.Xt, yt=self.yt, group_val=self.group_s) else : self.transport_model = self.transport_function() self.transport_model.fit(self.Xs, ys=self.ys, Xt=self.Xt, yt=self.yt) return self.transport_model def predict_transfer(self, data): """ Predict model using domain adaptation. Parameters ---------- data : arr. Vector to transfer Returns ------- data or data_non_scale : arr tranfered vector scaled or not scaled """ if self.scaler is not False: data = self.Xs_scaler.transform(data) data = self.transport_model.transform(Xs=data) if self.scaler is not False: data_non_scale = self.Xs_scaler.inverse_transform(data) return data_non_scale,data #non scale, scale else : return data #non scale def valid_fit_crossed(self, Xs_transform): """ OA comparison before and after OT with Xt_test Parameters ---------- Xs_transform : array_like, shape (n_source_samples, n_features) Source domain array transformed after OT. """ # avant transport self._model.fit(self.Xs, self.ys, self.group_s) y_pred_non_transport = self._model.predict(self.Xt_test) oa_non_transport = accuracy_score( self.yt_test, y_pred_non_transport) print("Cross-validation (valid)") print("OA before transport :", round(oa_non_transport,3)) # apres transport self._model.fit(Xs_transform, self.ys, self.group_s) y_pred_transport = self._model.predict(self.Xt_test) oa_transport = accuracy_score(self.yt_test, y_pred_transport) print("OA after transport", round(oa_transport,3)) print("There is a difference of",round(oa_transport-oa_non_transport,4), "after transport") def assess_transport(self, Xs_transform, record = False, path = None): """ OA comparison before and after OT Parameters ---------- Xs_transform : array_like, shape (n_source_samples, n_features) Source domain array transformed after OT. Returns ------- y_pred_non_transport : array_like yt_prediction before transport y_pred_transport : array_like yt_prediction after transport. """ # avant transport self._model.fit(self.Xs, self.ys, self.group_s) y_pred_non_transport = self._model.predict(self.Xt) oa_non_transport = accuracy_score(self.yt, y_pred_non_transport) print("All image") print("OA before transport", round(oa_non_transport,3)) # apres transport self._model.fit(Xs_transform, self.ys, self.group_s) if record == True : pickle.dump(self._model, open(path, 'wb')) print('Learning model has been record') y_pred_transport = self._model.predict(self.Xt) oa_transport = accuracy_score(self.yt, y_pred_transport) print( "OA after transport", round(oa_transport,3), "on all image") print( "There is a difference of", round( oa_transport - oa_non_transport, 4), "after transport (on all image)") return y_pred_non_transport, y_pred_transport def assess_transport_circular(self, Xs_transform, group_s=None, group_t=None, cv_ai=StratifiedKFold( n_splits=2, shuffle=True, random_state=21), classifier=RandomForestClassifier(random_state=42), parameters=dict(n_estimators=[100]), yt = None, ys=None): """ OA comparison before and after OT Parameters ---------- Xs_transform : array_like, shape (n_source_samples, n_features) Source domain array transformed after OT. Returns ------- y_pred_non_transport : array_like yt_prediction before transport y_pred_transport : array_like yt_prediction after transport. """ self.group_s = group_s self.group_t = group_t self.yt=yt self.ys=ys # avant transport self._model = GridSearchCV(classifier, parameters, cv=cv_ai) self._model.fit(self.Xs, self.ys, self.group_s) y_pred_non_transport = self._model.predict(self.Xt) oa_non_transport = accuracy_score(self.yt, y_pred_non_transport) print("OA before transport", round(oa_non_transport,3)) # apres transport self._model.fit(Xs_transform, self.ys, self.group_s) y_pred_transport = self._model.predict(self.Xt) oa_transport = accuracy_score(self.yt, y_pred_transport) print( "OA after transport", round(oa_transport,3), "sur toute l'image") print( "There is a difference of", round( oa_transport - oa_non_transport, 4), "after transport (on all image)") return y_pred_non_transport, y_pred_transport def _share_args(self, **params): """ Allow to stock each parameters enter by the user """ for key, value in params.items(): if key == 'scaler': if value is False: self._need_scale = False else: self._need_scale = True self.__dict__[key] = value def _prefit(self, Xs, Xt): """ Scale Xs and Xt Parameters ---------- Xs : array_like, shape (n_source_samples, n_features) Source domain array. ys : array_like, shape (n_source_samples,) Label source array (1d). """ if self.verbose: print('Learning Optimal Transport with ' + str(self.transport_function.__name__) + ' algorithm.') if self._need_scale: # permet de stocker le scaler fitté self.Xs_scaler = self._to_scale(Xs, self.scaler) self.Xs = self.Xs_scaler.transform(Xs) self.Xt_scaler = self._to_scale(Xt, self.scaler) self.Xt = self.Xt_scaler.transform(Xt) print("Xs and Xt are scaled") else: print("Xs and Xt are not scaled") def _to_scale(self, data, method): """ Scale Xs and Xt Parameters ---------- Xs : array_like, shape (n_source_samples, n_features) Source domain array. ys : array_like, shape (n_source_samples,) Label source array (1d). Return ---------- scaler : fitted scaler on data """ scaler = method() # pour vérifier qu'il y a quelque chose dedans après le .fit : # scaler.scaler_ scaler.fit(data) return scaler def _is_grid_search(self): # search for gridSearch if self.params_ot is not None: param_grid = [] for key in self.params_ot.keys(): if isinstance(self.params_ot.get(key), (list, np.ndarray)): param_grid.append(key) self.param_grid = param_grid self.params_ot = self.params_ot.copy() else : self.param_grid = False if self.param_grid: return True else: return False def _generate_params_from_grid_search(self): self.param_grids = [] hyper_param = {key: self.params_ot[key] for key in self.param_grid} items = sorted(hyper_param.items()) keys, values = zip(*items) for v in product(*values): params_to_add = dict(zip(keys, v)) self.params_ot.update(params_to_add) self.param_grids.append(self.params_ot) yield self.params_ot def _find_best_parameters_crossed(self, Xs, ys, Xt, yt, group_val): """ Find the best parameters of the transport function with crossed method Parameters ---------- Xs : array_like, shape (n_source_samples, n_features) Source domain array. ys : array_like, shape (n_source_samples,) Label source array (1d). Xt: array_like, shape (n_source_samples, n_features) Target domain array. yt: array_like, shape (n_source_samples,) Label target array (1d). group_val : array_like, shape (n_source_samples,) Polygon group of each label (1d) """ self.best_score = None # boucle qui test chaque hyperparametres for gridOT in self._generate_params_from_grid_search(): print(gridOT) # modele de transport pour chaque combinaison de parametres transport_model_tmp = self.transport_function(**gridOT) # transport if self.transport_function == ot.da.SinkhornTransport or self.transport_function == ot.da.EMDTransport : transport_model_tmp.fit(Xs=Xs, Xt=Xt) elif self.yt_use == False : transport_model_tmp.fit(Xs=Xs, Xt=Xt, ys=ys) else : transport_model_tmp.fit(Xs=Xs, ys=ys, Xt=Xt, yt=yt) Xs_transform = transport_model_tmp.transform( Xs=Xs) # transformation des Xs # apprentissage du nouveau modele sur Xs_transform self._model.fit(Xs_transform, ys, group_val) if self.verbose == True : print('Crossed validation OA : ' + str(self._model.best_score_)) print('Best parameter : ' + str(self._model.best_params_)) # prediction sur les Xt_valid yt_pred_valid = self._model.predict(self.Xt_valid) oa_transport = accuracy_score(self.yt_valid, yt_pred_valid) print("OA after transport", oa_transport) print("-------------------------------------------------") # meilleurs parametres if self.best_score is None or oa_transport > self.best_score: self.best_score = oa_transport self.best_params = gridOT.copy() # stocke les meilleurs parametres # stocke le modele fitté av les meilleurs parametres = gagne du # temps car pas besoin de le refaire à la fin du fit_crossed self.transport_model = transport_model_tmp if self.verbose: print('Best grid is ' + str(self.best_params)) print('Best score is ' + str(self.best_score)) print('Best grid is ' + str(self.best_params)) def _find_best_parameters_circular(self, Xs, ys, Xt, yt): """ Find the best parameters of the transport function with circular method Parameters ---------- Xs : array_like, shape (n_source_samples, n_features) Source domain array. ys : array_like, shape (n_source_samples,) Label source array (1d). Xt: array_like, shape (n_source_samples, n_features) Target domain array. yt: array_like, shape (n_source_samples,) Label target array (1d). """ self.best_score = None for gridOT in self._generate_params_from_grid_search(): transport_model_tmp = self.transport_function(**gridOT) transport_model_tmp.fit(Xs=Xs, ys=ys, Xt=Xt, yt=yt) transp_Xs = transport_model_tmp.inverse_transform( Xs=Xt, ys=yt, Xt=Xs, yt=ys) # regarde les différences d'aller-retour current_score = mean_squared_error(Xs, transp_Xs) if self.verbose: print( '{} is : {}'.format(self.metrics.__name__, current_score)) need_update_best_score = False if self.best_score is None: need_update_best_score = True else: if self.greater_is_better: # if greater is better, current score need to be higher need_update_best_score = self.best_score < current_score else: # if greater is not better, current score need to be lower need_update_best_score = self.best_score > current_score if need_update_best_score: # if need to update best score self.best_score = current_score # met à jour les meilleurs paramètres si on a obtenu le self.best_params = gridOT.copy() # meilleur score self.transport_model = transport_model_tmp if self.verbose: print('Best grid is ' + str(self.best_params)) print('Best score is ' + str(self.best_score)) def save_model(self, path): """ Save model 'myModel.npz' to be loaded later via `SuperLearner.load_model(path)` Parameters ---------- path : str. If path ends with npz, perfects, else will add '.npz' after your fileName. Returns ------- path : str. Path and filename with mtb extension. """ if not path.endswith('npz'): path += '.npz' np.savez_compressed(path, SL=self.__dict__) return path def load_model(self, path): """ Load model previously saved with `SuperLearner.save_model(path)`. Parameters ---------- path : str. If path ends with npy, perfects, else will add '.npy' after your fileName. """ if not path.endswith('npz'): path += '.npz' model = np.load(path, allow_pickle=True) self.__dict__.update(model['SL'].tolist())
[docs]class RasterOptimalTransport(OptimalTransportGridSearch):
[docs] def __init__(self, transport_function=ot.da.MappingTransport, params=None, verbose=True): """ Initialize Python Optimal Transport for raster processing. """ super().__init__(transport_function, params, verbose)
def preprocessing(self, image_source=None, image_target=None, vector_source=None, vector_target=None, label_source=None, label_target=None, group_source=None, group_target=None, scaler=StandardScaler): """ Scale all image (if it is asked) and stock the input parameters in the object . Parameters ----------- image_source : str. source image (gdal supported raster) -> path + file name image_target : str. target image (gdal supported raster) -> path + file name vector_source : str. labels (gdal supported vector) -> path + file name vector_target : str. labels (gdal supported vectorR) -> path + file name label_source : str. name of the label colum in vector file (source) label_source : str. name of the label colum in vector file (target) group_source : str. name of the group colum of each polygon in vector file (source) group_target : str. name of the group colum of each polygon in vector file (target) scaler: scale function (default=StandardScaler) The function used to scale source and target image """ self._share_args(image_source=image_source, image_target=image_target, vector_source=vector_source, vector_target=vector_target, label_source=label_source, label_target=label_target, group_source=group_source, group_target=group_target, scaler=scaler) if self.group_source is None : Xs, ys = mtb.processing.extract_ROI(self.image_source, self.vector_source, self.label_source) # Xsource ysource Xt, yt = mtb.processing.extract_ROI(self.image_target, self.vector_target, self.label_target) # Xsource ysource group_s, group_t = None,None else : Xs, ys, group_s = mtb.processing.extract_ROI(self.image_source, self.vector_source, self.label_source, self.group_source) # Xsource ysource Xt, yt, group_t= mtb.processing.extract_ROI(self.image_target, self.vector_target, self.label_target, self.group_target) # Xsource ysource self._share_args(Xs = self._convert_to_float64(Xs), Xt = self._convert_to_float64(Xt), ys = ys, yt = yt, group_s = group_s, group_t = group_t) source_array = mtb.processing.RasterMath(image_source,return_3d=False, verbose=False).get_image_as_array() target_array = mtb.processing.RasterMath(image_target,return_3d=False, verbose=False).get_image_as_array() if self._need_scale : self.Xs_non_scaled = Xs self.Xt_non_scaled = Xt self._prefit_image(source_array,target_array) self.Xs = self.source_scaler.transform(self.Xs_non_scaled) self.Xt = self.target_scaler.transform(self.Xt_non_scaled) print("Image is scaled") else : self.source = source_array.astype(np.float64) self.target = target_array.astype(np.float64) print("Image is not scaled") def predict_transfer(self, data): """ Predict model using domain adaptation. Parameters ---------- data : arr. Vector to transfer Return ---------- transport : arr tranfered vector """ data = self.transport_model.transform(data) if self.scaler is not False: data_non_scale = self.source_scaler.inverse_transform(data) return data_non_scale,data else : return data def _prefit_image(self, source, target): """ Scale source and target Parameters ---------- source : array_like, shape (n_source_samples, n_features) Source domain array. target : array_like, shape (n_source_samples,) Label source array (1d). """ if self._need_scale: # permet de stocker le scaler fitté self.source_scaler = self._to_scale(source, self.scaler) self.source = self.source_scaler.transform(source) self.target_scaler = self._to_scale(target, self.scaler) self.target = self.target_scaler.transform(target) print("source and target are scaled") else: print("source and target are not scaled")