"""Functions for general use"""
######### Package Imports #########################################################################
import os, zipfile, subprocess, uuid, shutil, threading, queue
import pandas as pd
from subprocess import run, PIPE
from functools import partial
from threading import Thread
from pySIMsalabim.utils.general import *
from pySIMsalabim.utils.device_parameters import *
######### Function Definitions ####################################################################
[docs]
def parallel_error_message(errorcode):
"""When a 'standard Pascal' fatal error occurs, add the standard error message to be used with parallel which does not read the
Parameters
----------
errorcode : int
the error code
Returns
-------
str
the error message
"""
message = ''
if errorcode >= 90 and errorcode < 100:
message = 'Error '+str(errorcode) +': '
if errorcode == 90:
message += 'Device parameter file corrupted.'
elif errorcode == 91:
message += 'Invalid input. Please check your input files for either incorrect layer definition (see SIMsalabim docs), wrong physics, or voltage in tVG_file too large).'
elif errorcode == 92:
message += 'Invalid input from command line.'
elif errorcode == 93:
message += 'Numerical failure.'
elif errorcode == 94:
message += 'Failed to converge, halt (FailureMode = 0).'
elif errorcode == 95:
message += ' Failed to converge at least 1 point, not halt (FailureMode != 0).'
elif errorcode == 96:
message += 'Missing input file.'
elif errorcode == 97:
message += 'Runtime exceeds limit set by timeout.'
elif errorcode == 99:
message += 'Programming error (i.e. not due to the user!).'
elif errorcode > 100:
message = 'Fatal error '+str(errorcode) +': '
if errorcode == 106:
message += 'Invalid numeric format: Reported when a non-numeric value is read from a text file.'
elif errorcode == 200:
message += 'Division by zero: The application attempted to divide a number by zero.'
elif errorcode == 201:
message += 'Range check error.'
elif errorcode == 202:
message += 'Stack overflow error: This error is only reported when stack checking is enabled.'
elif errorcode == 205:
message += 'Floating point overflow.'
elif errorcode == 206:
message = 'Floating point underflow.'
else:
message = 'Unknown error code '+str(errorcode) + ' occurred.'
else:
message = 'Unknown error code '+str(errorcode) + ' occurred.'
return message
[docs]
def run_simulation_parallel(sim_type, cmd_pars_list, session_path, max_jobs = max(1,os.cpu_count()-1), verbose=False, **kwargs):
"""Run the SIMsalabim simulation executable with the chosen device parameters.
Select the correct function to run the simulation in parallel based on the operating system.
Parameters
----------
sim_type : string
Which type of simulation to run: simss or zimt
cmd_pars_list : List
List of list with parameters to add to the simss/zimt cmd line. Each parameter is a dict with par,val keys.
Note: when relevant the first entry must be the deviceparameters file with a key: dev_par_file
session_path : string
File path of the simss or zimt executable
max_jobs : int
Maximum number of parallel jobs to run. Default is the number of CPU cores - 1
verbose : bool
If True, print the output of the simulation to the console
**kwargs : dict
Additional keyword arguments to pass to the function
Returns
-------
CompletedProcess
Output object of with returncode and console output of the simulation
"""
force_multithreading = kwargs.get('force_multithreading', False)
if os.name == 'nt':
# Windows
result_list = run_simulation_multithreaded_windows(sim_type, cmd_pars_list, session_path, max_jobs, verbose)
else:
# Linux
if shutil.which('parallel') is not None and not force_multithreading:
result, msg_list, return_code_list = run_simulation_GNU_parallel(sim_type, cmd_pars_list, session_path, max_jobs, verbose)
result_list = return_code_list
else:
result_list = run_simulation_multithreaded_linux(sim_type, cmd_pars_list, session_path, max_jobs, verbose)
return result_list
[docs]
def run_simulation_GNU_parallel(sim_type, cmd_pars_list, session_path, max_jobs = max(1,os.cpu_count()-1),verbose=False):
"""Run the SIMsalabim simulation executable with the chosen device parameters.
The simulation is run in parallel using the GNU Parallel program. (https://www.gnu.org/software/parallel/).
If this command is used please cite:
Tange, O. (2021, August 22). GNU Parallel 20210822 ('Kabul').
Zenodo. https://doi.org/10.5281/zenodo.5233953
To Install GNU Parallel on linux: (not available on Windows)
sudo apt-get update
sudo apt-get install parallel
or try to run the install_GNU_parallel_Linux() function from the install module.
import pySIMsalabim.install as install
install.install_GNU_parallel_Linux()
Return the complete result object of the process accompanied by a message with information,
in case of both success and failure.
Parameters
----------
sim_type : string
Which type of simulation to run: simss or zimt
cmd_pars_list : List
List of list with parameters to add to the simss/zimt cmd line. Each parameter is a dict with par,val keys.
Note: when relevant the first entry must be the deviceparameters file with a key: dev_par_file
session_path : string
File path of the simss or zimt executable
max_jobs : int
Maximum number of parallel jobs to run. Default is the number of CPU cores - 1
Returns
-------
CompletedProcess
Output object of with returncode and console output of the simulation
List
Return list of messages for each simulation
List
Return list of return codes for each simulation
"""
# Construct the command to run the executable
cmd_line_list = []
for cmd_pars in cmd_pars_list:
cmd_line = construct_cmd(sim_type, cmd_pars)
cmd_line_list.append(cmd_line)
# Construct the file and command to run the GNU parallel
uuid_str = str(uuid.uuid4())
filename = 'Str4Parallel_'+uuid_str+'.txt'
log_file = os.path.join(session_path,'logjob_'+uuid_str+ '.dat')
with open(os.path.join(session_path,filename),'w') as tempfilepar:
for cmd_line in cmd_line_list:
tempfilepar.write(cmd_line+'\n')
cmd_parallel = 'parallel --joblog '+ log_file +' --jobs '+str(int(max_jobs))+' -a '+os.path.join(session_path,filename)
result = run([cmd_parallel], cwd=session_path,stdout=PIPE, check=False, shell=True)
msg_list,return_code_list = [],[]
# if result.returncode != 0:
log = pd.read_csv(log_file, sep='\t',usecols=['Exitval'],on_bad_lines='skip')
# check if all jobs have been completed successfully, i.e. all exitvals are 0, 95 or 3
if not all(val in [0, 95, 3] for val in log['Exitval']):
for idx, val in enumerate(log['Exitval']):
message = ''
if val != 0 and val != 95 and val != 3:
if val >= 90 :
# Show the message as an error on the screen. Do not continue to the simulation results page.
msg_list.append('Simulation raised an error with Errorcode: ' + str(val) + '\n\n' + parallel_error_message(val))
else:
msg_list.append(parallel_error_message(val))
else:
if val == 95:
# In case of errorcode 95, failures during the simulations were encountered but the simulation did not halt. Show 'error' messages on the UI.
msg_list.append('Simulation completed but raised errorcode: ' + str(val) + '\n\n' + 'The simulation finished but at least 1 point did not converge.')
elif val == 3:
# Special case, should not occur in the web version.
# When the program exits as a success but no simulation has been run, e.g. in the case of the autotidy functionality.
msg_list.append('Action completed')
else:
# Simulation completed as expected.
msg_list.append('Simulation completed.')
return_code_list = log['Exitval'].tolist()
# remove the temporary files
os.remove(os.path.join(session_path,filename))
os.remove(log_file)
return result, msg_list, return_code_list
[docs]
def run_simulation_multithreaded_windows(sim_type,cmd_pars_list,session_path,max_jobs=max(1,os.cpu_count()-1),verbose=False):
"""Runs simulations in parallel on max_jobs number of threads.
This procedure should work on Windows and Linux but it is not as efficient as run_parallel_simu on Linux.
Yet, it is the only way to run simulations in parallel on Windows in a thread safe way and making sure that two thread do not try to write to the same file at the same time.
This is achieved by running the simulation in a temporary folder with a copy of all the necessary input files and then moving the output files to the original folder.
Parameters
----------
sim_type : string
Which type of simulation to run: simss or zimt
cmd_pars_list : List
List of list with parameters to add to the simss/zimt cmd line. Each parameter is a dict with par,val keys.
Note: when relevant the first entry must be the deviceparameters file with a key: dev_par_file
session_path : string
File path of the simss or zimt executable
max_jobs : int
Maximum number of parallel jobs to run. Default is the number of CPU cores - 1
verbose : bool
If True, print the output of the simulation to the console
Returns
-------
List
List of CompletedProcess objects with returncode and console output of the simulation
"""
lock = threading.Lock() # Create a lock
semaphore = threading.Semaphore(max_jobs) # Create a semaphore
# Create tmp folder
rnd_ID = str(uuid.uuid4())
tmp_folder = os.path.join(session_path, 'tmp'+rnd_ID) # Generate a unique ID for the temporary folder to make sure there are no conflicts with other processes
if os.path.isdir(tmp_folder):
shutil.rmtree(tmp_folder)
os.mkdir(tmp_folder)
else:
os.mkdir(tmp_folder)
tmp_folder_lst = [tmp_folder] * len(cmd_pars_list)
# Create a queue to communicate with the worker threads
q = queue.Queue()
# Start worker threads
threads = []
for i in range(len(cmd_pars_list)):
t = CustomThread(target=partial(worker_windows,q=q,lock=lock,tmp_folder=tmp_folder_lst[i],semaphore=semaphore,verbose=verbose))
t.start()
threads.append(t)
# Add tasks to the queue
for code_name, cmd_pars, path in zip([sim_type] * len(cmd_pars_list), cmd_pars_list, tmp_folder_lst):
q.put((code_name, cmd_pars, path, session_path))
# Wait for all tasks to be finished
q.join()
# Stop workers
for i in range(len(cmd_pars_list)):
q.put(None)
result_list = []
for t in threads:
result_list.append(t.join())
# # Clean up
shutil.rmtree(tmp_folder)
return result_list
[docs]
def worker_windows(q,lock,tmp_folder,semaphore,verbose=False):
"""Worker function that runs the simulation in a temporary folder and moves the output files to the original folder.
Parameters
----------
q : queue
Queue to communicate with the main thread
lock : threading.Lock
Lock to prevent multiple threads from writing to the same file at the same time
tmp_folder : string
Temporary folder to run the simulation in
semaphore : threading.Semaphore
Semaphore to limit the number of parallel jobs
verbose : bool
If True, print the output of the simulation to the console
"""
while True:
# Get task from the queue
task = q.get()
if task is None:
break
# Unpack task
sim_type, cmd_pars, path, session_path = task
device_parameters = None
for cmd_par in cmd_pars:
if cmd_par['par'] == 'dev_par_file':
device_parameters = cmd_par['val']
break
if device_parameters is None:
raise ValueError('Device parameters file not found in the command parameters list.')
# Acquire semaphore
semaphore.acquire()
# create a temporary folder for the simulation
ID = str(uuid.uuid4()) # Generate a unique ID for the temporary folder
tmp_folder = os.path.join(tmp_folder, ID)
if not os.path.isdir(tmp_folder):
os.mkdir(tmp_folder)
# Copy all necessary files to the temporary folder
lock.acquire()
# copy the executable to the temporary folder
if os.name == 'nt':
shutil.copy(os.path.join(session_path, sim_type + '.exe'), tmp_folder)
else:
shutil.copy(os.path.join(session_path, sim_type), tmp_folder)
dev_par, layers = load_device_parameters(session_path, device_parameters, run_mode = False)
# check for new layers in the cmd_pars
newlayers = []
idx_layers = []
for cmd_par in cmd_pars:
if cmd_par['par'].startswith('l') and cmd_par['par'][1:].isdigit():
newlayers.append(cmd_par)
idx_layers.append(int(cmd_par['par'][1:]))
# update layer files with cmd_pars
if len(newlayers) > 0:
for layer in layers:
for cmd_par in cmd_pars:
if cmd_par['par'] == layer[1]:
layer[2] = cmd_par['val']
break
# check if we need to add new layers
newlayers = [x for _, x in sorted(zip(idx_layers, newlayers))]
idx_layers = sorted(idx_layers)
if len(idx_layers) > 0:
if max(idx_layers) > len(layers)-1: # means we need to add new layers
# check that all the layers between len(layers)-& and max(idx_layers) are in the idx_layers list
for i in range(len(layers),max(idx_layers)+1):
if i not in idx_layers:
raise ValueError('Missing layer definition for layer '+str(i)+' in the command parameters list.')
# add the new layers
for i, layer in zip(idx_layers, newlayers):
layers.append(['par','l'+str(i),layer['val']],'parameter file for layer '+str(i))
# update dev_par layer files with cmd_pars
dev_par_keys = list(dev_par.keys())
for i, section in enumerate(dev_par[dev_par_keys[0]]):
if section[0].lower() == 'layers':
# update section with layers[1:]
dev_par[dev_par_keys[0]][i][1:] = layers[1:]
# move all layers to the temporary folder
for layer in layers:
shutil.copy(os.path.join(session_path, layer[2]), tmp_folder)
res = store_file_names(dev_par, sim_type, device_parameters, layers, run_mode = False)
layer_files = res[0]
optical_files = res[1]
# make absolute paths
layer_files = [os.path.abspath(os.path.join(session_path, f)) for f in layer_files]
optical_files = [os.path.abspath(os.path.join(session_path, f)) for f in optical_files]
optical_file_basenames = [os.path.basename(f) for f in optical_files]
traps_int_files = res[2]
traps_bulk_files = res[3]
traps_int_files = [os.path.abspath(os.path.join(session_path, f)) for f in traps_int_files]
traps_bulk_files = [os.path.abspath(os.path.join(session_path, f)) for f in traps_bulk_files]
traps_int_file_basenames = [os.path.basename(f) for f in traps_int_files]
traps_bulk_file_basenames = [os.path.basename(f) for f in traps_bulk_files]
ExpJV_file = None
tVGFile = None
tJFile = None
JVFile = None
scParsFile = None
if sim_type == 'simss':
if res[4].lower() == 'None'.lower():
ExpJV_file = None
else:
ExpJV_file = res[4]
JVFile = res[7]
scParsFile = res[8]
else:
tVGFile = res[4]
tJFile = res[7]
varFile = res[5]
logFile = res[6]
# Copy the files to the temporary folder
for file in layer_files + optical_files + traps_int_files + traps_bulk_files + [ExpJV_file] + [tVGFile]:
if file is not None and os.path.isfile(file):
shutil.copy(file, tmp_folder)
# update temp folder files with basename
if file in layer_files:
make_basename_input_files(os.path.join(tmp_folder, os.path.basename(file)))
# move files from cmd_pars to the temporary folder and overwrite if necessary
input_files = get_inputFile_from_cmd_pars(sim_type, cmd_pars)
input_files = [os.path.abspath(os.path.join(session_path, f['val'])) for f in input_files]
input_files_basenames = [os.path.basename(f) for f in input_files]
for file in input_files:
# if file already in the tmp_folder, remove it and copy the new one
if os.path.isfile(os.path.join(tmp_folder, os.path.basename(file))):
os.remove(os.path.join(tmp_folder, os.path.basename(file)))
shutil.copy(os.path.join(session_path, file), tmp_folder)
# set basename for device parameters
make_basename_input_files(os.path.join(tmp_folder, os.path.basename(device_parameters)))
# check the input files for the new layers
# for layer in newlayers:
# input_files2 = get_inputFile_from_layer(layer,session_path)
# for file in input_files2:
# # if file does not exist in the tmp_folder, copy it. Otherwise, copy it if it is not in the input_files list
# if not os.path.isfile(os.path.join(tmp_folder, os.path.basename(file))):
# shutil.copy(os.path.join(session_path, file), tmp_folder)
# else:
# if not os.path.basename(file) in input_files_basenames:
# shutil.copy(os.path.join(session_path, file), tmp_folder)
lock.release()
# Construct the command to run the executable
cmd_pars = make_basename_file_cmd_pars(cmd_pars)
cmd_line = construct_cmd(sim_type, cmd_pars)
# Run the simulation
result = run(cmd_line, cwd=tmp_folder, stdout=PIPE, check=False, shell=True)
# Check the results of the process using the returncodes and console output
if result.returncode != 0 and result.returncode != 95 and result.returncode != 3:
# SIMsalabim raised an error, stop the program and return the error message on the UI.
startMessage = False
message = ''
result_decode = result.stdout.decode('utf-8')
if result.returncode >= 100:
# A fatal (numerical) error occurred. Return errorcode and a standard error message.
message = fatal_error_message(result.returncode)
else:
# Simsalabim raised an error. Read the console output for the details / error messaging.
# All linetypes after 'Program will be terminated, press enter.' are considered part of the error message
for line_console in result_decode.split('\n'):
if startMessage is True:
# The actual error message. Since the error message can be multi-line, append each line.
message = message + line_console + '\n'
if 'Program will be terminated.' in line_console:
# Last 'regular' line of the console output. The next line is from the error message.
startMessage = True
# Show the message as an error on the screen. Do not continue to the simulation results page.
message = 'Simulation raised an error with Errorcode: ' + str(result.returncode) + '\n\n' + message
else:
# SIMsalabim simulation succeeded
if result.returncode == 95:
# In case of errorcode 95, failures during the simulations were encountered but the simulation did not halt. Show 'error' messages on the UI.
startMessage = False
message = ''
result_decode = result.stdout.decode('utf-8')
for line_console in result_decode.split('\n'):
if startMessage is True:
# The actual error message. Since the error message can be multi-line, append each line.
message = message + line_console + '\n'
if 'Program will be terminated.' in line_console:
# Last 'regular' line of the console output. The next line is from the error message.
startMessage = True
# Show the message as a success on the screen
message = 'Simulation completed but raised errorcode: ' + str(result.returncode) + '\n\n' + 'The simulation finished but at least 1 point did not converge. \n\n' + message
elif result.returncode == 3:
# Special case, should not occur in the web version.
# When the program exits as a success but no simulation has been run, e.g. in the case of the autotidy functionality.
message = 'Action completed'
else:
# Simulation completed as expected.
message = 'Simulation complete. Output can be found in the Simulation results.'
# Move output files to the original folder
lock.acquire()
if sim_type.lower() == 'simss':
if os.path.isfile(os.path.join(tmp_folder, varFile)):
shutil.move(os.path.join(tmp_folder, varFile), os.path.join(session_path, varFile))
if os.path.isfile(os.path.join(tmp_folder, logFile)):
shutil.move(os.path.join(tmp_folder, logFile), os.path.join(session_path, logFile))
if os.path.isfile(os.path.join(tmp_folder, JVFile)):
shutil.move(os.path.join(tmp_folder, JVFile), session_path)
if os.path.isfile(os.path.join(tmp_folder, scParsFile)):
shutil.move(os.path.join(tmp_folder, scParsFile), os.path.join(session_path, scParsFile))
elif sim_type.lower() == 'zimt':
if os.path.isfile(os.path.join(tmp_folder, varFile)):
shutil.move(os.path.join(tmp_folder, varFile), os.path.join(session_path, varFile))
if os.path.isfile(os.path.join(tmp_folder, logFile)):
shutil.move(os.path.join(tmp_folder, logFile), os.path.join(session_path, logFile))
if os.path.isfile(os.path.join(tmp_folder, tVGFile)):
shutil.move(os.path.join(tmp_folder, tVGFile), os.path.join(session_path, tVGFile))
if os.path.isfile(os.path.join(tmp_folder, tJFile)):
shutil.move(os.path.join(tmp_folder, tJFile), os.path.join(session_path, tJFile))
lock.release()
# Release semaphore
semaphore.release()
# Print output if verbose
if verbose:
print(result.stdout)
# Notify the queue that the task is done
q.task_done()
return result, message
[docs]
def worker_linux(q,lock,semaphore,verbose=False):
"""Worker function that runs the simulation in a temporary folder and moves the output files to the original folder.
Parameters
----------
q : queue
Queue to communicate with the main thread
lock : threading.Lock
Lock to prevent multiple threads from writing to the same file at the same time
semaphore : threading.Semaphore
Semaphore to limit the number of parallel jobs
verbose : bool
If True, print the output of the simulation to the console
"""
while True:
# Get task from the queue
task = q.get()
if task is None:
break
# Unpack task
sim_type, cmd_pars, session_path = task
# Acquire semaphore
semaphore.acquire()
result, message = run_simulation(sim_type, cmd_pars, session_path, run_mode = False, verbose=verbose)
# Release semaphore
semaphore.release()
# Print output if verbose
if verbose:
print(result.stdout)
# Notify the queue that the task is done
q.task_done()
return result, message
[docs]
def run_simulation_multithreaded_linux(sim_type,cmd_pars_list,session_path,max_jobs=max(1,os.cpu_count()-1),verbose=False):
"""Runs simulations in parallel on max_jobs number of threads.
This procedure should work on Windows and Linux but it is not as efficient as run_parallel_simu on Linux.
Yet, it is the only way to run simulations in parallel on Windows in a thread safe way and making sure that two thread do not try to write to the same file at the same time.
This is achieved by running the simulation in a temporary folder with a copy of all the necessary input files and then moving the output files to the original folder.
Parameters
----------
sim_type : string
Which type of simulation to run: simss or zimt
cmd_pars_list : List
List of list with parameters to add to the simss/zimt cmd line. Each parameter is a dict with par,val keys.
Note: when relevant the first entry must be the deviceparameters file with a key: dev_par_file
session_path : string
File path of the simss or zimt executable
max_jobs : int
Maximum number of parallel jobs to run. Default is the number of CPU cores - 1
verbose : bool
If True, print the output of the simulation to the console
Returns
-------
List
List of CompletedProcess objects with returncode and console output of the simulation
"""
lock = threading.Lock() # Create a lock
semaphore = threading.Semaphore(max_jobs) # Create a semaphore
# Create a queue to communicate with the worker threads
q = queue.Queue()
# Start worker threads
threads = []
for i in range(len(cmd_pars_list)):
t = CustomThread(target=partial(worker_linux,q=q,lock=lock,semaphore=semaphore,verbose=verbose))
t.start()
threads.append(t)
# Add tasks to the queue
for code_name, cmd_pars in zip([sim_type] * len(cmd_pars_list), cmd_pars_list):
q.put((code_name, cmd_pars, session_path))
# Wait for all tasks to be finished
q.join()
# Stop workers
for i in range(len(cmd_pars_list)):
q.put(None)
result_list = []
for t in threads:
result_list.append(t.join())
return result_list
# Custom thread class to return the result of the thread
[docs]
class CustomThread(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
[docs]
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
[docs]
def join(self, *args):
Thread.join(self, *args)
return self._return