[docs]classPyro:""" The main driver to run pyro. """def__init__(self,solver_name,*,from_commandline=False):""" Constructor Parameters ---------- solver_name : str Name of solver to use from_commandline : bool True if we are running from the commandline -- this enables runtime vis by default. """iffrom_commandline:msg.bold('pyro ...')ifsolver_namenotinvalid_solvers:msg.fail(f"ERROR: {solver_name} is not a valid solver")self.from_commandline=from_commandlineself.pyro_home=os.path.dirname(os.path.realpath(__file__))+'/'ifnotsolver_name.startswith("pyro."):solver_import="pyro."+solver_nameelse:solver_import=solver_name# import desired solver under "solver" namespaceself.solver=importlib.import_module(solver_import)self.solver_name=solver_nameself.problem_name=Noneself.problem_func=Noneself.problem_source=Noneself.problem_params=Noneself.problem_finalize=None# custom problemsself.custom_problems={}# runtime parameters# parameter defaultsself.rp=RuntimeParameters()self.rp.load_params(self.pyro_home+"_defaults")self.rp.load_params(self.pyro_home+self.solver_name+"/_defaults")self.tc=profile.TimerCollection()self.is_initialized=False
[docs]defadd_problem(self,name,problem_func,*,problem_params=None):"""Add a problem setup for this solver. Parameters ---------- name : str The descriptive name of the problem problem_func : function The function to initialize the state data problem_params : dict A dictionary of runtime parameters needed for the problem setup """ifproblem_paramsisNone:problem_params={}self.custom_problems[name]=(problem_func,problem_params)
[docs]definitialize_problem(self,problem_name,*,inputs_file=None,inputs_dict=None):""" Initialize the specific problem Parameters ---------- problem_name : str Name of the problem inputs_file : str Filename containing problem's runtime parameters inputs_dict : dict Dictionary containing extra runtime parameters """# pylint: disable=attribute-defined-outside-initifproblem_nameinself.custom_problems:# this is a problem we added via self.add_problemself.problem_name=problem_nameself.problem_func,self.problem_params=self.custom_problems[problem_name]self.problem_finalize=Noneself.problem_source=Noneelse:problem=importlib.import_module(f"pyro.{self.solver_name}.problems.{problem_name}")self.problem_name=problem_nameself.problem_func=problem.init_dataself.problem_params=problem.PROBLEM_PARAMSself.problem_finalize=problem.finalizetry:self.problem_source=problem.source_termsexceptAttributeError:self.problem_source=Noneifinputs_fileisNone:inputs_file=problem.DEFAULT_INPUTS# problem-specific runtime parametersfork,vinself.problem_params.items():self.rp.set_param(k,v,no_new=False)# now read in the inputs fileifinputs_fileisnotNone:ifnotos.path.isfile(inputs_file):# check if the param file lives in the solver's problems directoryinputs_file=self.pyro_home+self.solver_name+"/problems/"+inputs_fileifnotos.path.isfile(inputs_file):msg.fail("ERROR: inputs file does not exist")self.rp.load_params(inputs_file,no_new=1)# manually override the I/O, dovis, and verbose defaults# for Jupyter, we want runtime vis disabled by defaultifnotself.from_commandline:self.rp.set_param("vis.dovis",0)self.rp.set_param("driver.verbose",0)self.rp.set_param("io.do_io",0)ifinputs_dictisnotNone:fork,vininputs_dict.items():self.rp.set_param(k,v)# write out the inputs.autoself.rp.print_paramfile()self.verbose=self.rp.get_param("driver.verbose")self.dovis=self.rp.get_param("vis.dovis")# -------------------------------------------------------------------------# initialization# -------------------------------------------------------------------------# initialize the Simulation object -- this will hold the grid and# data and know about the runtime parameters and which problem we# are runningself.sim=self.solver.Simulation(self.solver_name,self.problem_name,self.problem_func,self.rp,problem_finalize_func=self.problem_finalize,problem_source_func=self.problem_source,timers=self.tc)self.sim.initialize()self.sim.preevolve()plt.ion()self.sim.cc_data.t=0.0self.is_initialized=True
[docs]defrun_sim(self):""" Evolve entire simulation """ifnotself.is_initialized:msg.fail("ERROR: problem has not been initialized")tm_main=self.tc.timer("main")tm_main.begin()# output the 0th databasename=self.rp.get_param("io.basename")do_io=self.rp.get_param("io.do_io")ifdo_io:self.sim.write(f"{basename}{self.sim.n:04d}")ifself.dovis:plt.figure(num=1,figsize=(8,6),dpi=100,facecolor='w')self.sim.dovis()whilenotself.sim.finished():self.single_step()# final outputforce_final_output=self.rp.get_param("io.force_final_output")ifdo_ioorforce_final_output:ifself.verbose>0:msg.warning("outputting...")basename=self.rp.get_param("io.basename")self.sim.write(f"{basename}{self.sim.n:04d}")tm_main.end()# -------------------------------------------------------------------------# final reports# -------------------------------------------------------------------------ifself.verbose>0:self.rp.print_unused_params()self.tc.report()self.sim.finalize()
[docs]defsingle_step(self):""" Do a single step """ifnotself.is_initialized:msg.fail("ERROR: problem has not been initialized")# fill boundary conditionsself.sim.cc_data.fill_BC_all()# get the timestepself.sim.compute_timestep()# evolve for a single timestepself.sim.evolve()ifself.verbose>0:print("%5d%10.5f%10.5f"%(self.sim.n,self.sim.cc_data.t,self.sim.dt))# outputifself.sim.do_output():ifself.verbose>0:msg.warning("outputting...")basename=self.rp.get_param("io.basename")self.sim.write(f"{basename}{self.sim.n:04d}")# visualizationifself.dovis:tm_vis=self.tc.timer("vis")tm_vis.begin()self.sim.dovis()store=self.rp.get_param("vis.store_images")ifstore==1:basename=self.rp.get_param("io.basename")plt.savefig(f"{basename}{self.sim.n:04d}.png")tm_vis.end()
def__repr__(self):s=f"Pyro('{self.solver_name}')"returnsdef__str__(self):s=f"Solver = {self.solver_name}\n"ifself.is_initialized:s+=f"Problem = {self.sim.problem_name}\n"s+=f"Simulation time = {self.sim.cc_data.t}\n"s+=f"Simulation step number = {self.sim.n}\n"s+="\nRuntime Parameters"s+="\n------------------\n"s+=str(self.rp)returns
[docs]defget_var(self,v):"""Alias for the data's get_var routine, returns the simulation data given the variable name v. """ifnotself.is_initialized:msg.fail("ERROR: problem has not been initialized")returnself.sim.cc_data.get_var(v)
[docs]defget_grid(self):"""Return the underlying grid object for the simulation """ifnotself.is_initialized:msg.fail("ERROR: problem has not been initialized")returnself.sim.cc_data.grid
[docs]defget_sim(self):"""Return the Simulation object"""returnself.sim
[docs]classPyroBenchmark(Pyro):"""A subclass of Pyro for benchmarking. Inherits everything from pyro, but adds benchmarking routines. """def__init__(self,solver_name,*,comp_bench=False,reset_bench_on_fail=False,make_bench=False):""" Constructor Parameters ---------- solver_name : str Name of solver to use comp_bench : bool Are we comparing to a benchmark? reset_bench_on_fail : bool Do we reset the benchmark on fail? make_bench : bool Are we storing a benchmark? """super().__init__(solver_name)self.comp_bench=comp_benchself.reset_bench_on_fail=reset_bench_on_failself.make_bench=make_bench
[docs]defrun_sim(self,rtol=1.e-12):""" Evolve entire simulation and compare to benchmark at the end. """super().run_sim()result=0ifself.comp_bench:result=self.compare_to_benchmark(rtol)ifself.make_benchor(result!=0andself.reset_bench_on_fail):self.store_as_benchmark()ifself.comp_bench:returnresultreturnself.sim
[docs]defcompare_to_benchmark(self,rtol):""" Are we comparing to a benchmark? """basename=self.rp.get_param("io.basename")compare_file="{}{}/tests/{}{:04d}".format(self.pyro_home,self.solver_name,basename,self.sim.n)msg.warning(f"comparing to: {compare_file} ")try:sim_bench=io.read(compare_file)exceptOSError:msg.warning("ERROR opening compare file")return"ERROR opening compare file"result=compare.compare(self.sim.cc_data,sim_bench.cc_data,rtol)ifresult==0:msg.success(f"results match benchmark to within relative tolerance of {rtol}\n")else:msg.warning("ERROR: "+compare.errors[result]+"\n")returnresult
[docs]defstore_as_benchmark(self):""" Are we storing a benchmark? """ifnotos.path.isdir(self.pyro_home+self.solver_name+"/tests/"):try:os.mkdir(self.pyro_home+self.solver_name+"/tests/")except(FileNotFoundError,PermissionError):msg.fail("ERROR: unable to create the solver's tests/ directory")basename=self.rp.get_param("io.basename")bench_file=self.pyro_home+self.solver_name+"/tests/"+ \
basename+"%4.4d"%(self.sim.n)msg.warning(f"storing new benchmark: {bench_file}\n")self.sim.write(bench_file)
[docs]defparse_args():"""Parse the runtime parameters"""p=argparse.ArgumentParser()p.add_argument("--make_benchmark",help="create a new benchmark file for regression testing",action="store_true")p.add_argument("--compare_benchmark",help="compare the end result to the stored benchmark",action="store_true")p.add_argument("solver",metavar="solver-name",type=str,nargs=1,help="name of the solver to use",choices=valid_solvers)p.add_argument("problem",metavar="problem-name",type=str,nargs=1,help="name of the problem to run")p.add_argument("param",metavar="inputs-file",type=str,nargs=1,help="name of the inputs file")p.add_argument("other",metavar="runtime-parameters",type=str,nargs="*",help="additional runtime parameters that override the inputs file ""in the format section.option=value")returnp.parse_args()