Source code for pySIMsalabim.utils.general

"""Functions for general use"""
######### Package Imports #########################################################################

import os, subprocess, uuid, shutil, time
import pandas as pd
from subprocess import run, PIPE
from pySIMsalabim.utils.device_parameters import *

######### Function Definitions ####################################################################

[docs] def fatal_error_message(errorcode): """When a 'standard Pascal' fatal error occurs, add the standard error message Parameters ---------- errorcode : int the error code """ message = '' if errorcode == 106: message = 'Invalid numeric format: Reported when a non-numeric value is read from a text file' elif errorcode == 200: message = 'Division by zero: The application attempted to divide a number by zero.' elif errorcode == 201: message = 'Range check error.' elif errorcode == 202: message = 'Stack overflow error: This error is only reported when stack checking is enabled.' elif errorcode == 205: message = 'Floating point overflow.' elif errorcode == 206: message = 'Floating point underflow.' else: message = 'A fatal error occured.' return message
[docs] def error_message(errorcode): """When a 'standard Pascal' fatal error occurs, add the standard error message to be used with parallel which does not read the Parameters ---------- errorcode : int the error code Returns ------- str the error message """ message = '' if errorcode >= 90 and errorcode < 100: message = 'Error '+str(errorcode) +': ' if errorcode == 90: message += 'Device parameter file corrupted.' elif errorcode == 91: message += 'Invalid input. Please check your input files for either incorrect layer definition (see SIMsalabim docs), wrong physics, or voltage in tVG_file too large).' elif errorcode == 92: message += 'Invalid input from command line.' elif errorcode == 93: message += 'Numerical failure.' elif errorcode == 94: message += 'Failed to converge, halt (FailureMode = 0).' elif errorcode == 95: message += ' Failed to converge at least 1 point, not halt (FailureMode != 0).' elif errorcode == 96: message += 'Missing input file.' elif errorcode == 97: message += 'Runtime exceeds limit set by timeout.' elif errorcode == 99: message += 'Programming error (i.e. not due to the user!).' elif errorcode > 100: message = 'Fatal error '+str(errorcode) +': ' if errorcode == 106: message += 'Invalid numeric format: Reported when a non-numeric value is read from a text file.' elif errorcode == 200: message += 'Division by zero: The application attempted to divide a number by zero.' elif errorcode == 201: message += 'Range check error.' elif errorcode == 202: message += 'Stack overflow error: This error is only reported when stack checking is enabled.' elif errorcode == 205: message += 'Floating point overflow.' elif errorcode == 206: message += 'Floating point underflow.' elif errorcode == 666: message += 'Multiple simulations failed with different error codes during parallel execution.' else: message = 'Unknown error code '+str(errorcode) + ' occurred.' elif errorcode == 3: message = 'Warning-like exit.' else: message = 'Unknown error code '+str(errorcode) + ' occurred.' return message
[docs] def construct_cmd(sim_type, cmd_pars): """Construct a single string to use as command to run a SIMsalabim executable Parameters ---------- sim_type : string Which program to run: simss or zimt cmd_pars : List List with parameters to add to the simss/zimt cmd line. Each parameter is a dict with par,val keys. Note: when relevant the first entry must be the deviceparameters file with a key: dev_par_file Returns ------- string Constructed string to run as cmd """ # Start with the executable name cmd_line = './' + sim_type # if system is 'Windows' use the .exe extension if os.name == 'nt': cmd_line = sim_type + '.exe' # Check whether a device parameters file has been defined for i in cmd_pars: # When specified, the device parameter file must be placed first, as is required by SIMsalabim if i['par'] == 'dev_par_file': if os.name == 'nt': # Add the device parameters file as " -dev_par_file "dev_par_file" args_single = ' "' + i['val']+ '"' else: args_single = ' ''' + i['val']+ ' ''' cmd_line = cmd_line + args_single # After the dev_par_file key had been found once, stop the loop. If more than one dev_par_file is specified, the rest are ignored. break # Add the parameters for i in cmd_pars: if i['par'] != 'dev_par_file': # Add each parameter as " -par_name par_value" if os.name == 'nt': #add "" to the parameter name if on Windows args_single = ' -"' + i['par'] + '" "' + i['val'] + '"' else: args_single = ' -' +i['par'] + ' ' + i['val'] cmd_line = cmd_line + args_single return cmd_line
[docs] def run_simulation(sim_type, cmd_pars, session_path, run_mode = False, verbose = False): """Run the SIMsalabim simulation executable with the chosen device parameters. Return the complete result object of the process accompanied by a message with information, in case of both success and failure. Parameters ---------- sim_type : string Which type of simulation to run: simss or zimt cmd_pars : List List with parameters to add to the simss/zimt cmd line. Each parameter is a dict with par,val keys. Note: when relevant the first entry must be the deviceparameters file with a key: dev_par_file session_path : string File path of the simss or zimt executable run_mode : boolean True if function is called as part of The Shell, False when called directly. Prevents using streamlit components outside of The Shell. verbose : boolean True if the console output of the simulation should be printed to the console Returns ------- int Return code of the simulation process, 0 for success, other values for errors. string Return message to display on the UI, for both success and failed """ # Construct the command to run the executable cmd_line = construct_cmd(sim_type, cmd_pars) if run_mode: # Run the simulation in a shell environment if os.name == 'nt': result = run(cmd_line, cwd=session_path,stdout=PIPE, check=False, shell=True) else: result = run([cmd_line], cwd=session_path, stdout=PIPE, check=False, shell=True) # Check the results of the process using the returncodes and console output if result.returncode != 0 and result.returncode != 95 and result.returncode != 3: # SIMsalabim raised an error, stop the program and return the error message on the UI. startMessage = False message = '' result_decode = result.stdout.decode('utf-8') if result.returncode >= 100: # A fatal (numerical) error occurred. Return errorcode and a standard error message. message = fatal_error_message(result.returncode) else: # Simsalabim raised an error. Read the console output for the details / error messaging. # All linetypes after 'Program will be terminated, press enter.' are considered part of the error message for line_console in result_decode.split('\n'): if startMessage is True: # The actual error message. Since the error message can be multi-line, append each line. message = message + line_console + '\n' if 'Program will be terminated.' in line_console: # Last 'regular' line of the console output. The next line is from the error message. startMessage = True # Show the message as an error on the screen. Do not continue to the simulation results page. message = 'Simulation raised an error with Errorcode: ' + str(result.returncode) + '\n\n' + message else: # SIMsalabim simulation succeeded if result.returncode == 95: # In case of errorcode 95, failures during the simulations were encountered but the simulation did not halt. Show 'error' messages on the UI. startMessage = False message = '' result_decode = result.stdout.decode('utf-8') for line_console in result_decode.split('\n'): if startMessage is True: # The actual error message. Since the error message can be multi-line, append each line. message = message + line_console + '\n' if 'Program will be terminated.' in line_console: # Last 'regular' line of the console output. The next line is from the error message. startMessage = True # Show the message as a success on the screen message = 'Simulation completed but raised errorcode: ' + str(result.returncode) + '\n\n' + 'The simulation finished but at least 1 point did not converge. \n\n' + message elif result.returncode == 3: # Special case, should not occur in the web version. # When the program exits as a success but no simulation has been run, e.g. in the case of the autotidy functionality. message = 'Action completed' else: # Simulation completed as expected. message = 'Simulation complete. Output can be found in the Simulation results.' else: # if verbose: if os.name == 'nt': result = run(cmd_line, cwd=session_path,stdout=PIPE, check=False, shell=True) else: result = run([cmd_line], cwd=session_path, stdout=PIPE, check=False, shell=True) if verbose: startMessage = False message = '' result_decode = result.stdout.decode('utf-8') for line_console in result_decode.split('\n'): if startMessage is True: # The actual error message. Since the error message can be multi-line, append each line. message = message + line_console + '\n' if 'Program will be terminated.' in line_console: # Last 'regular' line of the console output. The next line is from the error message. startMessage = True print(message) message = '' result = result.returncode return result, message
[docs] def run_simulation_filesafe(sim_type, cmd_pars, session_path, run_mode = False, verbose = False, **kwargs): """Run the SIMsalabim simulation executable with the chosen device parameters. Return the complete result object of the process accompanied by a message with information, in case of both success and failure. Parameters ---------- sim_type : string Which type of simulation to run: simss or zimt cmd_pars : List List with parameters to add to the simss/zimt cmd line. Each parameter is a dict with par,val keys. Note: when relevant the first entry must be the deviceparameters file with a key: dev_par_file session_path : string File path of the simss or zimt executable run_mode : boolean True if function is called as part of The Shell, False when called directly. Prevents using streamlit components outside of The Shell. verbose : boolean True if the console output of the simulation should be printed to the console **kwargs : dict Additional keyword arguments, not used in this function but can be used to pass additional parameters to the function. Returns ------- result : int Return code of the simulation process, 0 for success, other values for errors. string Return message to display on the UI, for both success and failed """ max_wait_time = kwargs.get('max_wait_time', 100) # seconds # Create a temp folder to store the simulation results ID = str(uuid.uuid4()) tmp_folder = os.path.join(session_path, 'tmp_'+ID) if not os.path.exists(tmp_folder): os.makedirs(tmp_folder) # get file setup from cmd_pars device_parameters = None for cmd_par in cmd_pars: if cmd_par['par'] == 'dev_par_file': device_parameters = cmd_par['val'] break if device_parameters is None: raise ValueError('Device parameters file not found in the command parameters list.') # Copy the device parameters file to the temp folder make_thread_safe_file_copy(os.path.join(session_path, device_parameters), tmp_folder) # copy the executable to the temp folder if os.name == 'nt': shutil.copy(os.path.join(session_path, sim_type+'.exe'), tmp_folder) else: shutil.copy(os.path.join(session_path, sim_type), tmp_folder) device_parameters = os.path.basename(device_parameters) # while True: # copy the file to the temp folder try: dev_par, layers = load_device_parameters(session_path, device_parameters, run_mode = False) break except: pass time.sleep(0.002) if os.name != 'nt': try: dev_par, layers = load_device_parameters(session_path, device_parameters, run_mode = False) except Exception as e: raise ValueError('Error loading device parameters check that all the input files are in the right directory. \n Error: {}'.format(e)) else: warning_timeout = kwargs.get('warning_timeout', 10) exit_timeout = kwargs.get('exit_timeout', 60) t_wait = 0 while True: # need this to be thread safe try: dev_par, layers = load_device_parameters(session_path, device_parameters, run_mode = False) break except Exception as e: time.sleep(0.002) t_wait = t_wait + 0.002 if t_wait > warning_timeout: print('Warning: SIMsalabim is not responding, please check that all the input files are in the right directory') if t_wait > exit_timeout: raise ValueError('Error loading device parameters check that all the input files are in the right directory. \n Error: {}'.format(e)) # check for new layers in the cmd_pars newlayers = [] idx_layers = [] for cmd_par in cmd_pars: if cmd_par['par'].startswith('l') and cmd_par['par'][1:].isdigit(): newlayers.append(cmd_par) idx_layers.append(int(cmd_par['par'][1:])) # update layer files with cmd_pars if len(newlayers) > 0: for layer in layers: for cmd_par in cmd_pars: if cmd_par['par'] == layer[1]: layer[2] = cmd_par['val'] break # check if we need to add new layers newlayers = [x for _, x in sorted(zip(idx_layers, newlayers))] idx_layers = sorted(idx_layers) if len(idx_layers) > 0: if max(idx_layers) > len(layers)-1: # means we need to add new layers # check that all the layers between len(layers)-& and max(idx_layers) are in the idx_layers list for i in range(len(layers),max(idx_layers)+1): if i not in idx_layers: raise ValueError('Missing layer definition for layer '+str(i)+' in the command parameters list.') # add the new layers for i, layer in zip(idx_layers, newlayers): layers.append(['par','l'+str(i),layer['val']],'parameter file for layer '+str(i)) # update dev_par layer files with cmd_pars dev_par_keys = list(dev_par.keys()) for i, section in enumerate(dev_par[dev_par_keys[0]]): if section[0].lower() == 'layers': # update section with layers[1:] dev_par[dev_par_keys[0]][i][1:] = layers[1:] # move all layer files to the temp folder for layer in layers: make_thread_safe_file_copy(os.path.join(session_path, layer[2]), tmp_folder) res = store_file_names(dev_par, sim_type, device_parameters, layers, run_mode = False) layer_files = res[0] optical_files = res[1] # make absolute paths layer_files = [os.path.abspath(os.path.join(session_path, f)) for f in layer_files] optical_files = [os.path.abspath(os.path.join(session_path, f)) for f in optical_files] optical_file_basenames = [os.path.basename(f) for f in optical_files] traps_int_files = res[2] traps_bulk_files = res[3] traps_int_files = [os.path.abspath(os.path.join(session_path, f)) for f in traps_int_files] traps_bulk_files = [os.path.abspath(os.path.join(session_path, f)) for f in traps_bulk_files] traps_int_file_basenames = [os.path.basename(f) for f in traps_int_files] traps_bulk_file_basenames = [os.path.basename(f) for f in traps_bulk_files] ExpJV_file = None tVGFile = None tJFile = None JVFile = None scParsFile = None if sim_type == 'simss': if res[4].lower() == 'None'.lower(): ExpJV_file = None else: ExpJV_file = res[4] JVFile = res[7] scParsFile = res[8] elif sim_type == 'zimt': tVGFile = res[4] tJFile = res[7] else : raise ValueError('Simulation type not recognized.') varFile = res[5] logFile = res[6] # Copy the files to the temporary folder for file in layer_files + optical_files + traps_int_files + traps_bulk_files + [ExpJV_file] + [tVGFile]: if file is not None and os.path.isfile(file): make_thread_safe_file_copy(file, tmp_folder) # update temp folder files with basename if file in layer_files: make_basename_input_files(os.path.join(tmp_folder, os.path.basename(file))) input_files = get_inputFile_from_cmd_pars(sim_type, cmd_pars) input_files = [os.path.abspath(os.path.join(session_path, f['val'])) for f in input_files] input_files_basenames = [os.path.basename(f) for f in input_files] for file in input_files: # print(file) # if file already in the tmp_folder, remove it and copy the new one while True: try: if os.path.isfile(os.path.join(tmp_folder, os.path.basename(file))): os.remove(os.path.join(tmp_folder, os.path.basename(file))) break except: pass # sleep random time to prevent high CPU usage smaller than 5ms time.sleep(random.uniform(0.002,0.005)) # if os.path.isfile(os.path.join(tmp_folder, os.path.basename(file))): # os.remove(os.path.join(tmp_folder, os.path.basename(file))) make_thread_safe_file_copy(os.path.join(session_path, file), tmp_folder) # set basename for device parameters make_basename_input_files(os.path.join(tmp_folder, os.path.basename(device_parameters))) # Construct the command to run the executable cmd_pars = make_basename_file_cmd_pars(cmd_pars) cmd_line = construct_cmd(sim_type, cmd_pars) # Run the simulation if os.name == 'nt': result = run(cmd_line, cwd=tmp_folder, stdout=PIPE, check=False, shell=True) else: result = run([cmd_line], cwd=tmp_folder, stdout=PIPE, check=False, shell=True) # Check the results of the process using the returncodes and console output if result.returncode != 0 and result.returncode != 95 and result.returncode != 3: # SIMsalabim raised an error, stop the program and return the error message on the UI. startMessage = False message = '' result_decode = result.stdout.decode('utf-8') if result.returncode >= 100: # A fatal (numerical) error occurred. Return errorcode and a standard error message. message = fatal_error_message(result.returncode) else: # Simsalabim raised an error. Read the console output for the details / error messaging. # All linetypes after 'Program will be terminated, press enter.' are considered part of the error message for line_console in result_decode.split('\n'): if startMessage is True: # The actual error message. Since the error message can be multi-line, append each line. message = message + line_console + '\n' if 'Program will be terminated.' in line_console: # Last 'regular' line of the console output. The next line is from the error message. startMessage = True # Show the message as an error on the screen. Do not continue to the simulation results page. message = 'Simulation raised an error with Errorcode: ' + str(result.returncode) + '\n\n' + message else: # SIMsalabim simulation succeeded if result.returncode == 95: # In case of errorcode 95, failures during the simulations were encountered but the simulation did not halt. Show 'error' messages on the UI. startMessage = False message = '' result_decode = result.stdout.decode('utf-8') for line_console in result_decode.split('\n'): if startMessage is True: # The actual error message. Since the error message can be multi-line, append each line. message = message + line_console + '\n' if 'Program will be terminated.' in line_console: # Last 'regular' line of the console output. The next line is from the error message. startMessage = True # Show the message as a success on the screen message = 'Simulation completed but raised errorcode: ' + str(result.returncode) + '\n\n' + 'The simulation finished but at least 1 point did not converge. \n\n' + message elif result.returncode == 3: # Special case, should not occur in the web version. # When the program exits as a success but no simulation has been run, e.g. in the case of the autotidy functionality. message = 'Action completed' else: # Simulation completed as expected. message = 'Simulation complete. Output can be found in the Simulation results.' # Move output files to the original folder if sim_type.lower() == 'simss': if os.path.isfile(os.path.join(tmp_folder, varFile)): # shutil.move(os.path.join(tmp_folder, varFile), os.path.join(session_path, varFile)) make_thread_safe_file_copy(os.path.join(tmp_folder, varFile), session_path) if os.path.isfile(os.path.join(tmp_folder, logFile)): # shutil.move(os.path.join(tmp_folder, logFile), os.path.join(session_path, logFile)) make_thread_safe_file_copy(os.path.join(tmp_folder, logFile), session_path) if os.path.isfile(os.path.join(tmp_folder, JVFile)): # shutil.move(os.path.join(tmp_folder, JVFile), session_path) make_thread_safe_file_copy(os.path.join(tmp_folder, JVFile), session_path) if os.path.isfile(os.path.join(tmp_folder, scParsFile)): # shutil.move(os.path.join(tmp_folder, scParsFile), os.path.join(session_path, scParsFile)) make_thread_safe_file_copy(os.path.join(tmp_folder, scParsFile), session_path) elif sim_type.lower() == 'zimt': if os.path.isfile(os.path.join(tmp_folder, varFile)): # shutil.move(os.path.join(tmp_folder, varFile), os.path.join(session_path, varFile)) make_thread_safe_file_copy(os.path.join(tmp_folder, varFile), session_path) if os.path.isfile(os.path.join(tmp_folder, logFile)): # shutil.move(os.path.join(tmp_folder, logFile), os.path.join(session_path, logFile)) make_thread_safe_file_copy(os.path.join(tmp_folder, logFile), session_path) if os.path.isfile(os.path.join(tmp_folder, tVGFile)): # shutil.move(os.path.join(tmp_folder, tVGFile), os.path.join(session_path, tVGFile)) make_thread_safe_file_copy(os.path.join(tmp_folder, tVGFile), session_path) if os.path.isfile(os.path.join(tmp_folder, tJFile)): # shutil.move(os.path.join(tmp_folder, tJFile), os.path.join(session_path, tJFile)) make_thread_safe_file_copy(os.path.join(tmp_folder, tJFile), session_path) result = result.returncode return result, message
[docs] def make_thread_safe_file_copy(file, destination): """Copy a file to a temp folder, and wait until the file is not in use anymore. Parameters ---------- file : string File path of the file to copy destination : string File path of the destination folder """ # check temp folder exists if not create it if not os.path.exists(destination): os.makedirs(destination) # tries to open the file, if it is in use it will wait until it is not while True: # copy the file to the temp folder try: shutil.copy(file, destination) break except: pass # add 2ms delay to prevent high CPU usage time.sleep(0.002)