diff --git a/doc/build/html/_sources/pages/HBR_NormativeModel_FCONdata_Tutorial.rst.txt b/doc/build/html/_sources/pages/HBR_NormativeModel_FCONdata_Tutorial.rst.txt index c40108fb..4cc9d3a0 100644 --- a/doc/build/html/_sources/pages/HBR_NormativeModel_FCONdata_Tutorial.rst.txt +++ b/doc/build/html/_sources/pages/HBR_NormativeModel_FCONdata_Tutorial.rst.txt @@ -23,7 +23,8 @@ Step 0: Install necessary libraries & grab data files .. code:: ipython3 - ! pip install pcntoolkit==0.20 + ! pip uninstall -y Theano-PyMC # conflicts with Theano on some environments + ! pip install pcntoolkit==0.22 For this tutorial we will use data from the `Functional Connectom Project FCON1000 `__ to create a @@ -169,49 +170,21 @@ then displayed. idxte = fcon_te['site'] == s print(i,s, sum(idx), sum(idxte)) - # Uncomment the following lines if you want to keep a defined version of the sets - # fcon_tr.to_csv('/Users/andmar/data/sairut/data/fcon1000_tr.csv') - # fcon_te.to_csv('/Users/andmar/data/sairut/data/fcon1000_te.csv') - # icbm_tr.to_csv('/Users/andmar/data/sairut/data/fcon1000_icbm_tr.csv') - # icbm_te.to_csv('/Users/andmar/data/sairut/data/fcon1000_icbm_te.csv') + fcon_tr.to_csv(processing_dir + '/fcon1000_tr.csv') + fcon_te.to_csv(processing_dir + '/fcon1000_te.csv') + icbm_tr.to_csv(processing_dir + '/fcon1000_icbm_tr.csv') + icbm_te.to_csv(processing_dir + '/fcon1000_icbm_te.csv') +Otherwise you can uncomment the following lines and just load these pre defined subsets: -.. parsed-literal:: - - sample size check - 0 AnnArbor_a 10 14 - 1 AnnArbor_b 11 21 - 2 Atlanta 11 17 - 3 Baltimore 10 13 - 4 Bangor 9 11 - 5 Beijing_Zang 108 90 - 6 Berlin_Margulies 13 13 - 7 Cambridge_Buckner 94 104 - 8 Cleveland 15 16 - 9 Leiden_2180 6 6 - 10 Leiden_2200 11 8 - 11 Milwaukee_b 29 17 - 12 Munchen 6 9 - 13 NewYork_a 48 35 - 14 NewYork_a_ADHD 17 8 - 15 Newark 13 6 - 16 Oulu 55 47 - 17 Oxford 9 13 - 18 PaloAlto 7 10 - 19 Pittsburgh 1 2 - 20 Queensland 6 13 - 21 SaintLouis 12 19 - - -Otherwise you can just load these pre defined subsets: .. code:: ipython3 # Optional - fcon_tr = pd.read_csv('https://raw.githubusercontent.com/predictive-clinical-neuroscience/PCNtoolkit-demo/main/data/fcon1000_tr.csv') - fcon_te = pd.read_csv('https://raw.githubusercontent.com/predictive-clinical-neuroscience/PCNtoolkit-demo/main/data/fcon1000_te.csv') - icbm_tr = pd.read_csv('https://raw.githubusercontent.com/predictive-clinical-neuroscience/PCNtoolkit-demo/main/data/fcon1000_icbm_tr.csv') - icbm_te = pd.read_csv('https://raw.githubusercontent.com/predictive-clinical-neuroscience/PCNtoolkit-demo/main/data/fcon1000_icbm_te.csv') + #fcon_tr = pd.read_csv('https://raw.githubusercontent.com/predictive-clinical-neuroscience/PCNtoolkit-demo/main/data/fcon1000_tr.csv') + #fcon_te = pd.read_csv('https://raw.githubusercontent.com/predictive-clinical-neuroscience/PCNtoolkit-demo/main/data/fcon1000_te.csv') + #icbm_tr = pd.read_csv('https://raw.githubusercontent.com/predictive-clinical-neuroscience/PCNtoolkit-demo/main/data/fcon1000_icbm_tr.csv') + #icbm_te = pd.read_csv('https://raw.githubusercontent.com/predictive-clinical-neuroscience/PCNtoolkit-demo/main/data/fcon1000_icbm_te.csv') Step 2: Configure HBR inputs: covariates, measures and batch effects -------------------------------------------------------------------- diff --git a/pcntoolkit/dataio/fileio.py b/pcntoolkit/dataio/fileio.py index f4b85aff..d48a3810 100644 --- a/pcntoolkit/dataio/fileio.py +++ b/pcntoolkit/dataio/fileio.py @@ -377,7 +377,7 @@ def load(filename, mask=None, text=False, vol=True): if file_type(filename) == 'cifti': x = load_cifti(filename, vol=vol) elif file_type(filename) == 'nifti': - x = load_nifti(filename, mask) + x = load_nifti(filename, mask, vol=vol) elif text or file_type(filename) == 'text': x = load_ascii(filename) elif file_type(filename) == 'binary': diff --git a/pcntoolkit/model/bayesreg.py b/pcntoolkit/model/bayesreg.py index 0e14c7a9..c353081d 100755 --- a/pcntoolkit/model/bayesreg.py +++ b/pcntoolkit/model/bayesreg.py @@ -387,8 +387,8 @@ def estimate(self, hyp0, X, y, **kwargs): Xv = kwargs.get('var_covariates', None) # options for l-bfgs-b - l = kwargs.get('l', 0.1) - epsilon = kwargs.get('epsilon', 0.1) + l = float(kwargs.get('l', 0.1)) + epsilon = float(kwargs.get('epsilon', 0.1)) norm = kwargs.get('norm', 'l2') if optimizer.lower() == 'cg': # conjugate gradients diff --git a/pcntoolkit/model/gp.py b/pcntoolkit/model/gp.py index e7af393b..b2fb1b75 100644 --- a/pcntoolkit/model/gp.py +++ b/pcntoolkit/model/gp.py @@ -12,7 +12,7 @@ try: # Run as a package if installed - from pcntoolkit.utils import squared_dist + from pcntoolkit.util.utils import squared_dist except ImportError: pass diff --git a/pcntoolkit/model/hbr.py b/pcntoolkit/model/hbr.py index 908155bf..52e13fb2 100644 --- a/pcntoolkit/model/hbr.py +++ b/pcntoolkit/model/hbr.py @@ -17,7 +17,7 @@ from scipy import stats import bspline from bspline import splinelab - +from pcntoolkit.util.utils import cartesian_product def bspline_fit(X, order, nknots): @@ -133,10 +133,12 @@ def hbr(X, y, batch_effects, batch_effects_size, configs, trace=None): all_idx.append(np.int16(np.unique(batch_effects[:,i]))) be_idx = list(product(*all_idx)) - X = theano.shared(X) - y = theano.shared(y) with pm.Model() as model: + + X = pm.Data('X', X) + y = pm.Data('y', y) + # Priors if trace is not None: # Used for transferring the priors mu_prior_intercept = from_posterior('mu_prior_intercept', @@ -320,7 +322,8 @@ def hbr(X, y, batch_effects, batch_effects_size, configs, trace=None): else: alpha = 0 - y_like = pm.SkewNormal('y_like', mu=y_hat, sigma=sigma_y, alpha=alpha, observed=y) + y_like = pm.SkewNormal('y_like', mu=y_hat, sigma=sigma_y, alpha=alpha, + observed=y) return model @@ -336,8 +339,6 @@ def nn_hbr(X, y, batch_effects, batch_effects_size, configs, trace=None): all_idx.append(np.int16(np.unique(batch_effects[:,i]))) be_idx = list(product(*all_idx)) - X = theano.shared(X) - y = theano.shared(y) # Initialize random weights between each layer for the mu: init_1 = pm.floatX(np.random.randn(feature_num, n_hidden) * np.sqrt(1/feature_num)) @@ -361,6 +362,10 @@ def nn_hbr(X, y, batch_effects, batch_effects_size, configs, trace=None): std_init_2_noise = pm.floatX(np.random.rand(n_hidden, n_hidden)) with pm.Model() as model: + + X = pm.Data('X', X) + y = pm.Data('y', y) + if trace is not None: # Used when estimating/predicting on a new site weights_in_1_grp = from_posterior('w_in_1_grp', trace['w_in_1_grp'], distribution='normal', freedom=configs['freedom']) @@ -650,6 +655,9 @@ def estimate(self, X, y, batch_effects): target_accept=self.configs['target_accept'], init=self.configs['init'], n_init=50000, cores=self.configs['cores']) + # Filling the theano shared variables with dummy zero variables to ensure the privacy. + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'polynomial': X = create_poly_basis(X, self.configs['order']) with hbr(X, y, batch_effects, self.batch_effects_size, @@ -660,6 +668,8 @@ def estimate(self, X, y, batch_effects): target_accept=self.configs['target_accept'], init=self.configs['init'], n_init=50000, cores=self.configs['cores']) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'bspline': self.bsp = bspline_fit(X, self.configs['order'], self.configs['nknots']) X = bspline_transform(X, self.bsp) @@ -671,6 +681,8 @@ def estimate(self, X, y, batch_effects): target_accept=self.configs['target_accept'], init=self.configs['init'], n_init=50000, cores=self.configs['cores']) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'nn': with nn_hbr(X, y, batch_effects, self.batch_effects_size, self.configs): @@ -680,6 +692,7 @@ def estimate(self, X, y, batch_effects): target_accept=self.configs['target_accept'], init=self.configs['init'], n_init=50000, cores=self.configs['cores']) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) return self.trace @@ -697,17 +710,21 @@ def predict(self, X, batch_effects, pred = 'single'): if self.model_type == 'linear': with hbr(X, y, batch_effects, self.batch_effects_size, self.configs): ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) elif self.model_type == 'polynomial': X = create_poly_basis(X, self.configs['order']) with hbr(X, y, batch_effects, self.batch_effects_size, self.configs): ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) elif self.model_type == 'bspline': X = bspline_transform(X, self.bsp) with hbr(X, y, batch_effects, self.batch_effects_size, self.configs): ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) elif self.model_type == 'nn': with nn_hbr(X, y, batch_effects, self.batch_effects_size, self.configs): ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) pred_mean = ppc['y_like'].mean(axis=0) pred_var = ppc['y_like'].var(axis=0) @@ -715,7 +732,7 @@ def predict(self, X, batch_effects, pred = 'single'): return pred_mean, pred_var - def estimate_on_new_site(self, X, y, batch_effects): + def adapt(self, X, y, batch_effects): """ Function to adapt the model """ if len(X.shape)==1: @@ -739,6 +756,8 @@ def estimate_on_new_site(self, X, y, batch_effects): target_accept=self.configs['target_accept'], init=self.configs['init'], n_init=50000, cores=self.configs['cores']) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'polynomial': X = create_poly_basis(X, self.configs['order']) with hbr(X, y, batch_effects, self.batch_effects_size, @@ -749,6 +768,8 @@ def estimate_on_new_site(self, X, y, batch_effects): target_accept=self.configs['target_accept'], init=self.configs['init'], n_init=50000, cores=self.configs['cores']) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + if self.model_type == 'bspline': X = bspline_transform(X, self.bsp) with hbr(X, y, batch_effects, self.batch_effects_size, @@ -759,6 +780,8 @@ def estimate_on_new_site(self, X, y, batch_effects): target_accept=self.configs['target_accept'], init=self.configs['init'], n_init=50000, cores=self.configs['cores']) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'nn': with nn_hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace = self.trace): @@ -768,6 +791,7 @@ def estimate_on_new_site(self, X, y, batch_effects): target_accept=self.configs['target_accept'], init=self.configs['init'], n_init=50000, cores=self.configs['cores']) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) return self.trace @@ -783,19 +807,34 @@ def predict_on_new_site(self, X, batch_effects): samples = self.configs['n_samples'] y = np.zeros([X.shape[0],1]) if self.model_type == 'linear': - with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace = self.trace): - ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, + trace = self.trace): + ppc = pm.sample_posterior_predictive(self.trace, samples=samples, + progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'polynomial': X = create_poly_basis(X, self.configs['order']) - with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace = self.trace): - ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, + trace = self.trace): + ppc = pm.sample_posterior_predictive(self.trace, samples=samples, + progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'bspline': X = bspline_transform(X, self.bsp) - with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace = self.trace): - ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, + trace = self.trace): + ppc = pm.sample_posterior_predictive(self.trace, samples=samples, + progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'nn': - with nn_hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace = self.trace): - ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + with nn_hbr(X, y, batch_effects, self.batch_effects_size, self.configs, + trace = self.trace): + ppc = pm.sample_posterior_predictive(self.trace, samples=samples, + progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) pred_mean = ppc['y_like'].mean(axis=0) pred_var = ppc['y_like'].var(axis=0) @@ -814,18 +853,29 @@ def generate(self, X, batch_effects, samples): y = np.zeros([X.shape[0],1]) if self.model_type == 'linear': with hbr(X, y, batch_effects, self.batch_effects_size, self.configs): - ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + ppc = pm.sample_posterior_predictive(self.trace, samples=samples, + progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'polynomial': X = create_poly_basis(X, self.configs['order']) with hbr(X, y, batch_effects, self.batch_effects_size, self.configs): - ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + ppc = pm.sample_posterior_predictive(self.trace, samples=samples, + progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'bspline': X = bspline_transform(X, self.bsp) with hbr(X, y, batch_effects, self.batch_effects_size, self.configs): - ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + ppc = pm.sample_posterior_predictive(self.trace, samples=samples, + progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'nn': with nn_hbr(X, y, batch_effects, self.batch_effects_size, self.configs): - ppc = pm.sample_posterior_predictive(self.trace, samples=samples, progressbar=True) + ppc = pm.sample_posterior_predictive(self.trace, samples=samples, + progressbar=True) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) generated_samples = np.reshape(ppc['y_like'].squeeze().T, [X.shape[0]*samples, 1]) X = np.repeat(X, samples) @@ -857,21 +907,47 @@ def sample_prior_predictive(self, X, batch_effects, samples, trace=None): with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace): ppc = pm.sample_prior_predictive(samples=samples) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'polynomial': X = create_poly_basis(X, self.configs['order']) with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace): ppc = pm.sample_prior_predictive(samples=samples) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'bspline': self.bsp = bspline_fit(X, self.configs['order'], self.configs['nknots']) X = bspline_transform(X, self.bsp) with hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace): ppc = pm.sample_prior_predictive(samples=samples) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) + elif self.model_type == 'nn': with nn_hbr(X, y, batch_effects, self.batch_effects_size, self.configs, trace): ppc = pm.sample_prior_predictive(samples=samples) + pm.set_data({'X':np.zeros([10,2]), 'y':np.zeros([10,1])}) return ppc + + + def create_dummy_inputs(self, covariate_ranges = [[0.1,0.9,0.01]]): + + arrays = [] + for i in range(len(covariate_ranges)): + arrays.append(np.arange(covariate_ranges[i][0],covariate_ranges[i][1], + covariate_ranges[i][2])) + X = cartesian_product(arrays) + X_dummy = np.concatenate([X for i in range(np.prod(self.batch_effects_size))]) + + arrays = [] + for i in range(self.batch_effects_num): + arrays.append(np.arange(0, self.batch_effects_size[i])) + batch_effects = cartesian_product(arrays) + + batch_effects_dummy = np.repeat(batch_effects, X.shape[0], axis=0) + + return X_dummy, batch_effects_dummy \ No newline at end of file diff --git a/pcntoolkit/normative.py b/pcntoolkit/normative.py index b2960422..8f8a0207 100755 --- a/pcntoolkit/normative.py +++ b/pcntoolkit/normative.py @@ -41,7 +41,7 @@ from dataio import fileio from util.utils import compute_pearsonr, CustomCV, explained_var, compute_MSLL - from util.utils import scaler + from util.utils import scaler, get_package_versions from normative_model.norm_utils import norm_init PICKLE_PROTOCOL = configs.PICKLE_PROTOCOL @@ -333,6 +333,9 @@ def estimate(covfile, respfile, **kwargs): if savemodel and not os.path.isdir('Models'): os.mkdir('Models') + # which output metrics to compute + metrics = ['Rho', 'RMSE', 'SMSE', 'EXPV', 'MSLL','NLL', 'BIC'] + # load data print("Processing data in " + respfile) X = fileio.load(covfile) @@ -394,12 +397,15 @@ def estimate(covfile, respfile, **kwargs): scaler_resp = [] scaler_cov = [] mean_resp = [] # this is just for computing MSLL - std_resp = [] # this is just for computing MSLL + std_resp = [] # this is just for computing MSLL if warp is not None: Ywarp = np.zeros_like(Yhat) - mean_resp_warp = [np.zeros(Y.shape[1]) for s in range(splits.n_splits)] - std_resp_warp = [np.zeros(Y.shape[1]) for s in range(splits.n_splits)] + + # for warping we need to compute metrics separately for each fold + results_folds = dict() + for m in metrics: + results_folds[m]= np.zeros((Nmod, cvfolds)) for idx in enumerate(splits.split(X)): @@ -436,9 +442,9 @@ def estimate(covfile, respfile, **kwargs): fileio.save(be[ts,:], 'be_kfold_ts_tempfile.pkl') kwargs['trbefile'] = 'be_kfold_tr_tempfile.pkl' kwargs['tsbefile'] = 'be_kfold_ts_tempfile.pkl' - + # estimate the models for all subjects - for i in range(0, len(nz)): + for i in range(0, len(nz)): print("Estimating model ", i+1, "of", len(nz)) nm = norm_init(Xz_tr, Yz_tr[:, i], alg=alg, **kwargs) @@ -463,17 +469,28 @@ def estimate(covfile, respfile, **kwargs): nlZ[nz[i], fold] = nm.neg_log_lik if (run_cv or testresp is not None): - # warp the labels? - # TODO: Warping for scaled data if warp is not None: + # TODO: Warping for scaled data warp_param = nm.blr.hyp[1:nm.blr.warp.get_n_params()+1] Ywarp[ts, nz[i]] = nm.blr.warp.f(Y[ts, nz[i]], warp_param) Ytest = Ywarp[ts, nz[i]] # Save warped mean of the training data (for MSLL) yw = nm.blr.warp.f(Y[tr, nz[i]], warp_param) - mean_resp_warp[fold][i] = np.mean(yw) - std_resp_warp[fold][i] = np.std(yw) + + # create arrays for evaluation + Yhati = Yhat[ts, nz[i]] + Yhati = Yhati[:, np.newaxis] + S2i = S2[ts, nz[i]] + S2i = S2i[:, np.newaxis] + + # evaluate and save results + mf = evaluate(Ytest[:, np.newaxis], Yhati, S2=S2i, + mY=np.std(yw), sY=np.mean(yw), + nlZ=nm.neg_log_lik, nm=nm, Xz_tr=Xz_tr, + alg=alg, metrics = metrics) + for k in metrics: + results_folds[k][nz[i]][fold] = mf[k] else: Ytest = Y[ts, nz[i]] @@ -517,16 +534,14 @@ def estimate(covfile, respfile, **kwargs): results = evaluate(Y[testids, :], Yhat[testids, :], S2=S2[testids, :], mY=mean_resp[0], sY=std_resp[0], nlZ=nlZ, nm=nm, Xz_tr=Xz_tr, alg=alg, - metrics = ['Rho', 'RMSE', 'SMSE', 'EXPV', - 'MSLL', 'NLL', 'BIC']) + metrics = metrics) else: - results = evaluate(Ywarp[testids, :], Yhat[testids, :], - S2=S2[testids, :], mY=mean_resp_warp[0], - sY=std_resp_warp[0], nlZ=nlZ, nm=nm, Xz_tr=Xz_tr, - alg=alg, metrics = ['Rho', 'RMSE', 'SMSE', - 'EXPV', 'MSLL', - 'NLL', 'BIC']) - + # for warped data we just aggregate across folds + results = dict() + for m in ['Rho', 'RMSE', 'SMSE', 'EXPV', 'MSLL']: + results[m] = np.mean(results_folds[m], axis=1) + results['NLL'] = results_folds['NLL'] + results['BIC'] = results_folds['BIC'] # Set writing options if saveoutput: @@ -649,6 +664,8 @@ def predict(covfile, respfile, maskfile=None, **kwargs): :param outputsuffix: Text string to add to the output filenames :param batch_size: batch size (for use with normative_parallel) :param job_id: batch id + :param fold: which cross-validation fold to use (default = 0) + :param fold: list of model IDs to predict (if not specified all are computed) All outputs are written to disk in the same format as the input. These are: @@ -666,6 +683,8 @@ def predict(covfile, respfile, maskfile=None, **kwargs): inputsuffix = kwargs.pop('inputsuffix', 'estimate') inputsuffix = "_" + inputsuffix.replace("_", "") alg = kwargs.pop('alg') + fold = kwargs.pop('fold',0) + models = kwargs.pop('models', None) if respfile is not None and not os.path.exists(respfile): print("Response file does not exist. Only returning predictions") @@ -702,8 +721,12 @@ def predict(covfile, respfile, maskfile=None, **kwargs): X = X[:, np.newaxis] sample_num = X.shape[0] - feature_num = len(glob.glob(os.path.join(model_path, 'NM_*' + inputsuffix + - '.pkl'))) + if models is not None: + feature_num = len(models) + else: + feature_num = len(glob.glob(os.path.join(model_path, 'NM_'+ str(fold) + + '_*' + inputsuffix + '.pkl'))) + models = range(feature_num) Yhat = np.zeros([sample_num, feature_num]) S2 = np.zeros([sample_num, feature_num]) @@ -715,11 +738,11 @@ def predict(covfile, respfile, maskfile=None, **kwargs): Xz = X # estimate the models for all subjects - for i in range(feature_num): + for i, m in enumerate(models): print("Prediction by model ", i+1, "of", feature_num) nm = norm_init(Xz) - nm = nm.load(os.path.join(model_path, 'NM_' + str(0) + '_' + - str(i) + inputsuffix + '.pkl')) + nm = nm.load(os.path.join(model_path, 'NM_' + str(fold) + '_' + + str(m) + inputsuffix + '.pkl')) if (alg!='hbr' or nm.configs['transferred']==False): yhat, s2 = nm.predict(Xz, **kwargs) else: @@ -744,6 +767,11 @@ def predict(covfile, respfile, maskfile=None, **kwargs): else: Y, maskvol = load_response_vars(respfile, maskfile) + Y = Y[:, m] + if meta_data: + mY = mY[m] + sY = sY[m] + if len(Y.shape) == 1: Y = Y[:, np.newaxis] @@ -944,23 +972,46 @@ def transfer(covfile, respfile, testcov=None, testresp=None, maskfile=None, def extend(covfile, respfile, maskfile=None, **kwargs): + ''' + This function extends an existing HBR model with data from new sites/scanners. + + Basic usage:: + + extend(covfile, respfile [extra_arguments]) + + where the variables are defined below. + + :param covfile: covariates for new data + :param respfile: response variables for new data + :param maskfile: mask used to apply to the data (nifti only) + :param model_path: Directory containing the normative model and metadata + :param trbefile: file address to batch effects file for new data + :param batch_size: batch size (for use with normative_parallel) + :param job_id: batch id + :param output_path: the path for saving the the extended model + :param informative_prior: a flag to decide whether to use the initial model + prior or learn it from scrach (default is False). + :param generation_factor: the number of samples generated for each combination + of covariates and batch effects. Default is 10. + + + All outputs are written to disk in the same format as the input. + + ''' + alg = kwargs.pop('alg') if alg != 'hbr': print('Model extention is only possible for HBR models.') return elif (not 'model_path' in list(kwargs.keys())) or \ (not 'output_path' in list(kwargs.keys())) or \ - (not 'trbefile' in list(kwargs.keys())) or \ - (not 'dummycovfile' in list(kwargs.keys()))or \ - (not 'dummybefile' in list(kwargs.keys())): + (not 'trbefile' in list(kwargs.keys())): print('InputError: Some mandatory arguments are missing.') return else: model_path = kwargs.pop('model_path') output_path = kwargs.pop('output_path') trbefile = kwargs.pop('trbefile') - dummycovfile = kwargs.pop('dummycovfile') - dummybefile = kwargs.pop('dummybefile') outputsuffix = kwargs.pop('outputsuffix', 'extend') outputsuffix = "_" + outputsuffix.replace("_", "") @@ -994,15 +1045,11 @@ def extend(covfile, respfile, maskfile=None, **kwargs): X = fileio.load(covfile) Y, maskvol = load_response_vars(respfile, maskfile) batch_effects_train = fileio.load(trbefile) - X_dummy = fileio.load(dummycovfile) - batch_effects_dummy = fileio.load(dummybefile) if len(Y.shape) == 1: Y = Y[:, np.newaxis] if len(X.shape) == 1: X = X[:, np.newaxis] - if len(X_dummy.shape) == 1: - X_dummy = X_dummy[:, np.newaxis] feature_num = Y.shape[1] # estimate the models for all subjects @@ -1019,8 +1066,8 @@ def extend(covfile, respfile, maskfile=None, **kwargs): nm = nm.load(os.path.join(model_path, 'NM_0_' + str(i) + inputsuffix +'.pkl')) - nm = nm.extend(X, Y[:,i:i+1], batch_effects_train, X_dummy, - batch_effects_dummy, samples=generation_factor, + nm = nm.extend(X, Y[:,i:i+1], batch_effects_train, + samples=generation_factor, informative_prior=informative_prior) if batch_size is not None: @@ -1031,6 +1078,223 @@ def extend(covfile, respfile, maskfile=None, **kwargs): else: nm.save(os.path.join(output_path, 'NM_0_' + str(i) + outputsuffix + '.pkl')) + + + +def tune(covfile, respfile, maskfile=None, **kwargs): + + ''' + This function tunes an existing HBR model with real data. + + Basic usage:: + + tune(covfile, respfile [extra_arguments]) + + where the variables are defined below. + + :param covfile: covariates for new data + :param respfile: response variables for new data + :param maskfile: mask used to apply to the data (nifti only) + :param model_path: Directory containing the normative model and metadata + :param trbefile: file address to batch effects file for new data + :param batch_size: batch size (for use with normative_parallel) + :param job_id: batch id + :param output_path: the path for saving the the extended model + :param informative_prior: a flag to decide whether to use the initial model + prior or learn it from scrach (default is False). + :param generation_factor: the number of samples generated for each combination + of covariates and batch effects. Default is 10. + + + All outputs are written to disk in the same format as the input. + + ''' + + alg = kwargs.pop('alg') + if alg != 'hbr': + print('Model extention is only possible for HBR models.') + return + elif (not 'model_path' in list(kwargs.keys())) or \ + (not 'output_path' in list(kwargs.keys())) or \ + (not 'trbefile' in list(kwargs.keys())): + print('InputError: Some mandatory arguments are missing.') + return + else: + model_path = kwargs.pop('model_path') + output_path = kwargs.pop('output_path') + trbefile = kwargs.pop('trbefile') + + outputsuffix = kwargs.pop('outputsuffix', 'tuned') + outputsuffix = "_" + outputsuffix.replace("_", "") + inputsuffix = kwargs.pop('inputsuffix', 'estimate') + inputsuffix = "_" + inputsuffix.replace("_", "") + informative_prior = kwargs.pop('informative_prior', 'False') == 'True' + generation_factor = int(kwargs.pop('generation_factor', '10')) + job_id = kwargs.pop('job_id', None) + batch_size = kwargs.pop('batch_size', None) + if batch_size is not None: + batch_size = int(batch_size) + job_id = int(job_id) - 1 + + if not os.path.isdir(model_path): + print('Models directory does not exist!') + return + else: + if os.path.exists(os.path.join(model_path, 'meta_data.md')): + with open(os.path.join(model_path, 'meta_data.md'), 'rb') as file: + meta_data = pickle.load(file) + if (meta_data['inscaler'] != 'None' or + meta_data['outscaler'] != 'None'): + print('Models extention on scaled data is not possible!') + return + + if not os.path.isdir(output_path): + os.mkdir(output_path) + + # load data + print("Loading data ...") + X = fileio.load(covfile) + Y, maskvol = load_response_vars(respfile, maskfile) + batch_effects_train = fileio.load(trbefile) + + if len(Y.shape) == 1: + Y = Y[:, np.newaxis] + if len(X.shape) == 1: + X = X[:, np.newaxis] + feature_num = Y.shape[1] + + # estimate the models for all subjects + for i in range(feature_num): + + nm = norm_init(X) + if batch_size is not None: # when using nirmative_parallel + print("Tuning model ", job_id*batch_size+i) + nm = nm.load(os.path.join(model_path, 'NM_0_' + + str(job_id*batch_size+i) + inputsuffix + + '.pkl')) + else: + print("Tuning model ", i+1, "of", feature_num) + nm = nm.load(os.path.join(model_path, 'NM_0_' + str(i) + + inputsuffix +'.pkl')) + + nm = nm.tune(X, Y[:,i:i+1], batch_effects_train, + samples=generation_factor, + informative_prior=informative_prior) + + if batch_size is not None: + nm.save(os.path.join(output_path, 'NM_0_' + + str(job_id*batch_size+i) + outputsuffix + '.pkl')) + nm.save(os.path.join('Models', 'NM_0_' + + str(i) + outputsuffix + '.pkl')) + else: + nm.save(os.path.join(output_path, 'NM_0_' + + str(i) + outputsuffix + '.pkl')) + + +def merge(covfile=None, respfile=None, **kwargs): + + ''' + This function extends an existing HBR model with data from new sites/scanners. + + Basic usage:: + + merge(model_path1, model_path2 [extra_arguments]) + + where the variables are defined below. + + :param covfile: Not required. Always set to None. + :param respfile: Not required. Always set to None. + :param model_path1: Directory containing the normative model and metadata + of the first model. + :param model_path2: Directory containing the normative model and metadata + of the second model. + :param batch_size: batch size (for use with normative_parallel) + :param job_id: batch id + :param output_path: the path for saving the the extended model + :param generation_factor: the number of samples generated for each combination + of covariates and batch effects. Default is 10. + + + All outputs are written to disk in the same format as the input. + + ''' + + alg = kwargs.pop('alg') + if alg != 'hbr': + print('Merging models is only possible for HBR models.') + return + elif (not 'model_path1' in list(kwargs.keys())) or \ + (not 'model_path2' in list(kwargs.keys())) or \ + (not 'output_path' in list(kwargs.keys())): + print('InputError: Some mandatory arguments are missing.') + return + else: + model_path1 = kwargs.pop('model_path1') + model_path2 = kwargs.pop('model_path2') + output_path = kwargs.pop('output_path') + + outputsuffix = kwargs.pop('outputsuffix', 'merge') + outputsuffix = "_" + outputsuffix.replace("_", "") + inputsuffix = kwargs.pop('inputsuffix', 'estimate') + inputsuffix = "_" + inputsuffix.replace("_", "") + generation_factor = int(kwargs.pop('generation_factor', '10')) + job_id = kwargs.pop('job_id', None) + batch_size = kwargs.pop('batch_size', None) + if batch_size is not None: + batch_size = int(batch_size) + job_id = int(job_id) - 1 + + if (not os.path.isdir(model_path1)) or (not os.path.isdir(model_path2)): + print('Models directory does not exist!') + return + else: + if batch_size is None: + with open(os.path.join(model_path1, 'meta_data.md'), 'rb') as file: + meta_data1 = pickle.load(file) + with open(os.path.join(model_path2, 'meta_data.md'), 'rb') as file: + meta_data2 = pickle.load(file) + if meta_data1['valid_voxels'].shape[0] != meta_data2['valid_voxels'].shape[0]: + print('Two models are trained on different features!') + return + else: + feature_num = meta_data1['valid_voxels'].shape[0] + else: + feature_num = batch_size + + + if not os.path.isdir(output_path): + os.mkdir(output_path) + + # mergeing the models + for i in range(feature_num): + + nm1 = norm_init(np.random.rand(100,10)) + nm2 = norm_init(np.random.rand(100,10)) + if batch_size is not None: # when using nirmative_parallel + print("Merging model ", job_id*batch_size+i) + nm1 = nm1.load(os.path.join(model_path1, 'NM_0_' + + str(job_id*batch_size+i) + inputsuffix + + '.pkl')) + nm2 = nm2.load(os.path.join(model_path2, 'NM_0_' + + str(job_id*batch_size+i) + inputsuffix + + '.pkl')) + else: + print("Merging model ", i+1, "of", feature_num) + nm1 = nm1.load(os.path.join(model_path1, 'NM_0_' + str(i) + + inputsuffix +'.pkl')) + nm2 = nm1.load(os.path.join(model_path2, 'NM_0_' + str(i) + + inputsuffix +'.pkl')) + + nm_merged = nm1.merge(nm2, samples=generation_factor) + + if batch_size is not None: + nm_merged.save(os.path.join(output_path, 'NM_0_' + + str(job_id*batch_size+i) + outputsuffix + '.pkl')) + nm_merged.save(os.path.join('Models', 'NM_0_' + + str(i) + outputsuffix + '.pkl')) + else: + nm_merged.save(os.path.join(output_path, 'NM_0_' + + str(i) + outputsuffix + '.pkl')) def main(*args): diff --git a/pcntoolkit/normative_model/norm_gpr.py b/pcntoolkit/normative_model/norm_gpr.py index 280adb25..a74cc95b 100644 --- a/pcntoolkit/normative_model/norm_gpr.py +++ b/pcntoolkit/normative_model/norm_gpr.py @@ -6,8 +6,8 @@ import numpy as np try: # run as a package if installed - from pcntoolkit.gp import GPR, CovSum - from pcntoolkit.gp.normative_model.normbase import NormBase + from pcntoolkit.model.gp import GPR, CovSum + from pcntoolkit.normative_model.norm_base import NormBase except ImportError: pass diff --git a/pcntoolkit/normative_model/norm_hbr.py b/pcntoolkit/normative_model/norm_hbr.py index ad60a508..2391fae0 100644 --- a/pcntoolkit/normative_model/norm_hbr.py +++ b/pcntoolkit/normative_model/norm_hbr.py @@ -123,7 +123,7 @@ def predict(self, Xs, X=None, Y=None, **kwargs): def estimate_on_new_sites(self, X, y, batch_effects): - self.hbr.estimate_on_new_site(X, y, batch_effects) + self.hbr.adapt(X, y, batch_effects) self.configs['transferred'] = True return self @@ -134,13 +134,19 @@ def predict_on_new_sites(self, X, batch_effects): return yhat, s2 - def extend(self, X, y, batch_effects, X_dummy, batch_effects_dummy, - samples=10, informative_prior=False): + def extend(self, X, y, batch_effects, X_dummy_ranges= [[0.1,0.9,0.01]], + merge_batch_dim=0, samples=10, informative_prior=False): + + X_dummy, batch_effects_dummy = self.hbr.create_dummy_inputs(X_dummy_ranges) X_dummy, batch_effects_dummy, Y_dummy = self.hbr.generate(X_dummy, batch_effects_dummy, samples) + + batch_effects[:, merge_batch_dim] = batch_effects[:, merge_batch_dim] + \ + np.max(batch_effects_dummy[:, merge_batch_dim]) + 1 + if informative_prior: - self.hbr.estimate_on_new_sites(np.concatenate((X_dummy, X)), + self.hbr.adapt(np.concatenate((X_dummy, X)), np.concatenate((Y_dummy, y)), np.concatenate((batch_effects_dummy, batch_effects))) else: @@ -151,6 +157,54 @@ def extend(self, X, y, batch_effects, X_dummy, batch_effects_dummy, return self + def tune(self, X, y, batch_effects, X_dummy_ranges= [[0.1,0.9,0.01]], + merge_batch_dim=0, samples=10, informative_prior=False): + + tune_ids = list(np.unique(batch_effects[:, merge_batch_dim])) + + X_dummy, batch_effects_dummy = self.hbr.create_dummy_inputs(X_dummy_ranges) + + for idx in tune_ids: + X_dummy = X_dummy[batch_effects_dummy[:, merge_batch_dim] != idx, :] + batch_effects_dummy = batch_effects_dummy[batch_effects_dummy[:, merge_batch_dim] != idx, :] + + X_dummy, batch_effects_dummy, Y_dummy = self.hbr.generate(X_dummy, + batch_effects_dummy, samples) + + if informative_prior: + self.hbr.adapt(np.concatenate((X_dummy, X)), + np.concatenate((Y_dummy, y)), + np.concatenate((batch_effects_dummy, batch_effects))) + else: + self.hbr.estimate(np.concatenate((X_dummy, X)), + np.concatenate((Y_dummy, y)), + np.concatenate((batch_effects_dummy, batch_effects))) + + return self + + + def merge(self, nm, X_dummy_ranges= [[0.1,0.9,0.01]], merge_batch_dim=0, + samples=10): + + X_dummy1, batch_effects_dummy1 = self.hbr.create_dummy_inputs(X_dummy_ranges) + X_dummy2, batch_effects_dummy2 = nm.hbr.create_dummy_inputs(X_dummy_ranges) + + X_dummy1, batch_effects_dummy1, Y_dummy1 = self.hbr.generate(X_dummy1, + batch_effects_dummy1, samples) + X_dummy2, batch_effects_dummy2, Y_dummy2 = nm.hbr.generate(X_dummy2, + batch_effects_dummy2, samples) + + batch_effects_dummy2[:, merge_batch_dim] = batch_effects_dummy2[:, merge_batch_dim] + \ + np.max(batch_effects_dummy1[:, merge_batch_dim]) + 1 + + self.hbr.estimate(np.concatenate((X_dummy1, X_dummy2)), + np.concatenate((Y_dummy1, Y_dummy2)), + np.concatenate((batch_effects_dummy1, + batch_effects_dummy2))) + + return self + + def generate(self, X, batch_effects, samples=10): X, batch_effects, generated_samples = self.hbr.generate(X, batch_effects, diff --git a/pcntoolkit/normative_model/norm_np.py b/pcntoolkit/normative_model/norm_np.py index 29834d31..7b632522 100644 --- a/pcntoolkit/normative_model/norm_np.py +++ b/pcntoolkit/normative_model/norm_np.py @@ -21,7 +21,7 @@ try: # run as a package if installed from pcntoolkit.normative_model.normbase import NormBase - from pcntoolkit.NPR import NPR, np_loss + from pcntoolkit.model.NPR import NPR, np_loss except ImportError: pass diff --git a/pcntoolkit/normative_model/norm_rfa.py b/pcntoolkit/normative_model/norm_rfa.py index 275ba4f1..f60e731a 100644 --- a/pcntoolkit/normative_model/norm_rfa.py +++ b/pcntoolkit/normative_model/norm_rfa.py @@ -6,8 +6,8 @@ import numpy as np try: # run as a package if installed - from pcntoolkit.normative_model.normbase import NormBase - from pcntoolkit.rfa import GPRRFA + from pcntoolkit.normative_model.norm_base import NormBase + from pcntoolkit.model.rfa import GPRRFA except ImportError: pass diff --git a/pcntoolkit/normative_model/norm_utils.py b/pcntoolkit/normative_model/norm_utils.py index 30cc3a16..48e79980 100644 --- a/pcntoolkit/normative_model/norm_utils.py +++ b/pcntoolkit/normative_model/norm_utils.py @@ -1,8 +1,15 @@ -from norm_blr import NormBLR -from norm_gpr import NormGPR -from norm_rfa import NormRFA -from norm_hbr import NormHBR -from norm_np import NormNP +try: # run as a package if installed + from pcntoolkit.normative_model.norm_blr import NormBLR + from pcntoolkit.normative_model.norm_gpr import NormGPR + from pcntoolkit.normative_model.norm_rfa import NormRFA + from pcntoolkit.normative_model.norm_hbr import NormHBR + from pcntoolkit.normative_model.norm_np import NormNP +except: + from norm_blr import NormBLR + from norm_gpr import NormGPR + from norm_rfa import NormRFA + from norm_hbr import NormHBR + from norm_np import NormNP def norm_init(X, y=None, theta=None, alg='gpr', **kwargs): if alg == 'gpr': diff --git a/pcntoolkit/normative_parallel.py b/pcntoolkit/normative_parallel.py index 1207a025..1d8f5358 100755 --- a/pcntoolkit/normative_parallel.py +++ b/pcntoolkit/normative_parallel.py @@ -27,14 +27,17 @@ import shutil import pickle import fileinput +import time import numpy as np import pandas as pd -from subprocess import call +from subprocess import call, check_output + try: import pcntoolkit as ptk import pcntoolkit.dataio.fileio as fileio from pcntoolkit import configs + from pcntoolkit.util.utils import yes_or_no ptkpath = ptk.__path__[0] except ImportError: pass @@ -43,6 +46,7 @@ sys.path.append(ptkpath) import dataio.fileio as fileio import configs + from util.utils import yes_or_no PICKLE_PROTOCOL = configs.PICKLE_PROTOCOL @@ -58,6 +62,7 @@ def execute_nm(processing_dir, duration, normative_path=None, func='estimate', + interactive=False, **kwargs): ''' Execute parallel normative models @@ -93,6 +98,7 @@ def execute_nm(processing_dir, cv_folds = kwargs.get('cv_folds', None) testcovfile_path = kwargs.get('testcovfile_path', None) testrespfile_path= kwargs.get('testrespfile_path', None) + outputsuffix = kwargs.get('outputsuffix', 'estimate') cluster_spec = kwargs.pop('cluster_spec', 'torque') log_path = kwargs.pop('log_path', None) binary = kwargs.pop('binary', False) @@ -114,8 +120,8 @@ def execute_nm(processing_dir, file_extentions = '.txt' kwargs.update({'batch_size':str(batch_size)}) + job_ids = [] for n in range(1, number_of_batches+1): - print(n) kwargs.update({'job_id':str(n)}) if testrespfile_path is not None: if cv_folds is not None: @@ -143,10 +149,11 @@ def execute_nm(processing_dir, batch_respfile_path, func=func, **kwargs) - qsub_nm(job_path=batch_job_path, + job_id = qsub_nm(job_path=batch_job_path, log_path=log_path, memory=memory, duration=duration) + job_ids.append(job_id) elif cluster_spec == 'sbatch': # update the response file kwargs.update({'testrespfile_path': \ @@ -186,10 +193,11 @@ def execute_nm(processing_dir, batch_respfile_path, func=func, **kwargs) - qsub_nm(job_path=batch_job_path, + job_id = qsub_nm(job_path=batch_job_path, log_path=log_path, memory=memory, duration=duration) + job_ids.append(job_id) elif cluster_spec == 'sbatch': sbatchwrap_nm(batch_processing_dir, python_path, @@ -227,10 +235,11 @@ def execute_nm(processing_dir, batch_respfile_path, func=func, **kwargs) - qsub_nm(job_path=batch_job_path, + job_id = qsub_nm(job_path=batch_job_path, log_path=log_path, memory=memory, duration=duration) + job_ids.append(job_id) elif cluster_spec == 'sbatch': sbatchwrap_nm(batch_processing_dir, python_path, @@ -251,10 +260,59 @@ def execute_nm(processing_dir, qsub_nm(processing_dir=batch_processing_dir) # ] + if interactive: + + check_jobs(job_ids, delay=60) + + success = False + while (not success): + success = collect_nm(processing_dir, + job_name, + func=func, + collect=False, + binary=binary, + batch_size=batch_size, + outputsuffix=outputsuffix) + if success: + break + else: + if interactive == 'query': + response = yes_or_no('Rerun the failed jobs?') + if response: + rerun_nm(processing_dir, log_path=log_path, memory=memory, + duration=duration, binary=binary, + interactive=interactive) + else: + success = True + else: + print('Reruning the failed jobs ...') + rerun_nm(processing_dir, log_path=log_path, memory=memory, + duration=duration, binary=binary, + interactive=interactive) + + if interactive == 'query': + response = yes_or_no('Collect the results?') + if response: + success = collect_nm(processing_dir, + job_name, + func=func, + collect=True, + binary=binary, + batch_size=batch_size, + outputsuffix=outputsuffix) + else: + print('Collecting the results ...') + success = collect_nm(processing_dir, + job_name, + func=func, + collect=True, + binary=binary, + batch_size=batch_size, + outputsuffix=outputsuffix) + """routines that are environment independent""" - def split_nm(processing_dir, respfile_path, batch_size, @@ -409,7 +467,7 @@ def collect_nm(processing_dir, count = 0 batch_fail = [] - if func != 'fit': + if (func!='fit' and func!='extend' and func!='merge' and func!='tune'): file_example = [] # TODO: Collect_nm only depends on yhat, thus does not work when no # prediction is made (when test cov is not specified). @@ -658,7 +716,7 @@ def collect_nm(processing_dir, file_extentions) del bic_dfs - if func != 'predict' and func != 'extend': + if (func!='predict' and func!='extend' and func!='merge' and func!='tune'): if not os.path.isdir(processing_dir + 'Models') and \ os.path.exists(os.path.join(batches[0], 'Models')): os.mkdir(processing_dir + 'Models') @@ -685,8 +743,8 @@ def collect_nm(processing_dir, if meta_data['outscaler'] in ['standardize', 'minmax', 'robminmax']: Y_scalers.append(meta_data['scaler_resp']) - meta_data['mean_resp'] = np.squeeze(np.stack(mY)) - meta_data['std_resp'] = np.squeeze(np.stack(sY)) + meta_data['mean_resp'] = np.squeeze(np.column_stack(mY)) + meta_data['std_resp'] = np.squeeze(np.column_stack(sY)) meta_data['scaler_cov'] = X_scalers meta_data['scaler_resp'] = Y_scalers @@ -728,9 +786,9 @@ def collect_nm(processing_dir, file_extentions) if not batch_fail: - return 1 + return True else: - return 0 + return False def delete_nm(processing_dir, binary=False): @@ -799,7 +857,6 @@ def bashwrap_nm(processing_dir, testrespfile_path = kwargs.pop('testrespfile_path', None) alg = kwargs.pop('alg', None) configparam = kwargs.pop('configparam', None) - standardize = kwargs.pop('standardize', True) # change to processing dir os.chdir(processing_dir) @@ -857,10 +914,12 @@ def qsub_nm(job_path, log_path, memory, duration): + '''This function submits a job.sh scipt to the torque custer using the qsub command. Basic usage:: + qsub_nm(job_path, log_path, memory, duration) :param job_path: Full path to the job.sh file. @@ -882,17 +941,20 @@ def qsub_nm(job_path, duration + ' -o ' + log_path + ' -e ' + log_path] # submits job to cluster - call(qsub_call, shell=True) + #call(qsub_call, shell=True) + job_id = check_output(qsub_call, shell=True).decode(sys.stdout.encoding).replace("\n", "") + + return job_id def rerun_nm(processing_dir, log_path, memory, duration, - binary=False): + binary=False, + interactive=False): '''This function reruns all failed batched in processing_dir after collect_nm has identified the failed batches. - - Basic usage:: + Basic usage:: rerun_nm(processing_dir, log_path, memory, duration) @@ -902,6 +964,9 @@ def rerun_nm(processing_dir, written by (primarily) T Wolfers, (adapted) SM Kia, (adapted) S Rutherford. ''' + + job_ids = [] + if binary: file_extentions = '.pkl' @@ -911,10 +976,11 @@ def rerun_nm(processing_dir, for n in range(0, shape[0]): jobpath = failed_batches[n, 0] print(jobpath) - qsub_nm(job_path=jobpath, + job_id = qsub_nm(job_path=jobpath, log_path=log_path, memory=memory, duration=duration) + job_ids.append(job_id) else: file_extentions = '.txt' failed_batches = fileio.load_pd(processing_dir + @@ -923,10 +989,15 @@ def rerun_nm(processing_dir, for n in range(0, shape[0]): jobpath = failed_batches.iloc[n, 0] print(jobpath) - qsub_nm(job_path=jobpath, + job_id = qsub_nm(job_path=jobpath, log_path=log_path, memory=memory, duration=duration) + job_ids.append(job_id) + + if interactive: + check_jobs(job_ids, delay=60) + # COPY the rotines above here and aadapt those to your cluster # bashwarp_nm; qsub_nm; rerun_nm @@ -972,7 +1043,6 @@ def sbatchwrap_nm(processing_dir, testrespfile_path = kwargs.pop('testrespfile_path', None) alg = kwargs.pop('alg', None) configparam = kwargs.pop('configparam', None) - standardize = kwargs.pop('standardize', True) # change to processing dir os.chdir(processing_dir) @@ -1126,4 +1196,81 @@ def rerun_nm(processing_dir, with fileinput.FileInput(jobpath, inplace=True) as file: for line in file: print(line.replace(memory, new_memory), end='') - sbatch_nm(jobpath, log_path) + sbatch_nm(jobpath, + log_path) + + +def retrieve_jobs(): + """ + A utility function to retrieve task status from the outputs of qstat. + + :return: a dictionary of jobs. + + """ + + output = check_output('qstat', shell=True).decode(sys.stdout.encoding) + output = output.split('\n') + jobs = dict() + for line in output[2:-1]: + (Job_ID, Job_Name, User, Wall_Time, Status, Queue) = line.split() + jobs[Job_ID] = dict() + jobs[Job_ID]['name'] = Job_Name + jobs[Job_ID]['walltime'] = Wall_Time + jobs[Job_ID]['status'] = Status + + return jobs + + +def check_job_status(jobs): + """ + A utility function to count the tasks with different status. + + :param jobs: List of job ids. + :return: returns the number of taks athat are queued, running, completed, + and other status. + + """ + running_jobs = retrieve_jobs() + + r = 0 + c = 0 + q = 0 + u = 0 + for job in jobs: + try: + if running_jobs[job]['status'] == 'C': + c += 1 + elif running_jobs[job]['status'] == 'Q': + q += 1 + elif running_jobs[job]['status'] == 'R': + r += 1 + else: + u += 1 + except: # probably meanwhile the job is finished. + c += 1 + continue + + print('Total Jobs:%d, Queued:%d, Running:%d, Completed:%d, Unknown:%d' + %(len(jobs), q, r, c, u)) + return q,r,c,u + + +def check_jobs(jobs, delay=60): + """ + A utility function for chacking the status of submitted jobs. + + :param jobs: list of job ids. + :param delay: the delay (in seconds) between two consequative checks, + defaults to 60. + + """ + + n = len(jobs) + + while(True): + q,r,c,u = check_job_status(jobs) + if c == n: + print('All jobs are completed!') + break + time.sleep(delay) + diff --git a/pcntoolkit/util/utils.py b/pcntoolkit/util/utils.py index dd2f1e2d..b2656cd3 100644 --- a/pcntoolkit/util/utils.py +++ b/pcntoolkit/util/utils.py @@ -281,7 +281,7 @@ def calibration_descriptives(x): """ compute statistics useful to assess the calibration of normative models, including skew and kurtosis of the distribution, plus their standard - deviation and standar errors + deviation and standar errors (separately for each column in x) Basic usage:: stats = calibration_descriptives(Z) @@ -294,11 +294,11 @@ def calibration_descriptives(x): """ n = np.shape(x)[0] - m1 = np.mean(x) + m1 = np.mean(x,axis=0) m2 = sum((x-m1)**2) m3 = sum((x-m1)**3) m4 = sum((x-m1)**4) - s1 = np.std(x) + s1 = np.std(x,axis=0) skew = n*m3/(n-1)/(n-2)/s1**3 sdskew = np.sqrt( 6*n*(n-1) / ((n-2)*(n+1)*(n+3)) ) kurtosis = (n*(n+1)*m4 - 3*m2**2*(n-1)) / ((n-1)*(n-2)*(n-3)*s1**4) @@ -1365,3 +1365,47 @@ def anomaly_detection_auc(abn_p, labels, n_permutation=None): print('Feature %d of %d is done: p_value=%f' %(i,n_permutation,p_values[i])) return aucs, p_values + + +def cartesian_product(arrays): + + """ + This is a utility function for creating dummy data (covariates). It computes + the cartesian product of N 1D arrays. + + Example: + a = cartesian_product(np.arange(0,5), np.arange(6,10)) + + :param *arrays: a list of N input 1D numpy arrays with size d1,d2,dN + :return: A d1*d2*...*dN by N matrix of cartesian product of N arrays. + + """ + + la = len(arrays) + dtype = np.result_type(arrays[0]) + arr = np.empty([len(a) for a in arrays] + [la], dtype=dtype) + for i, a in enumerate(np.ix_(*arrays)): + arr[...,i] = a + + return arr.reshape(-1, la) + + +def yes_or_no(question): + + """ + Utility function for getting yes/no action from the user. + + :param question: String for user query. + + :return: Boolean of True for 'yes' and False for 'no'. + + + """ + + while "the answer is invalid": + reply = str(input(question+' (y/n): ')).lower().strip() + if reply[:1] == 'y': + return True + if reply[:1] == 'n': + return False + \ No newline at end of file diff --git a/setup.py b/setup.py index 7738ff9d..a7d9a2f9 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages setup(name='pcntoolkit', - version='0.21', + version='0.22', description='Predictive Clinical Neuroscience toolkit', url='http://github.com/amarquand/PCNtoolkit', author='Andre Marquand',