[docs]classVariables:""" a container class for easy access to the different compressible variable by an integer key """def__init__(self,myd):self.nvar=len(myd.names)# conserved variables -- we set these when we initialize for# they match the CellCenterData2d objectself.idens=myd.names.index("density")self.ixmom=myd.names.index("x-momentum")self.iymom=myd.names.index("y-momentum")self.iener=myd.names.index("energy")# if there are any additional variable, we treat them as# passively advected scalarsself.naux=self.nvar-4ifself.naux>0:self.irhox=4else:self.irhox=-1# primitive variablesself.nq=4+self.nauxself.irho=0self.iu=1self.iv=2self.ip=3ifself.naux>0:self.ix=4# advected scalarelse:self.ix=-1
[docs]defcons_to_prim(U,gamma,ivars,myg):""" convert an input vector of conserved variables to primitive variables """q=myg.scratch_array(nvar=ivars.nq)q[:,:,ivars.irho]=U[:,:,ivars.idens]q[:,:,ivars.iu]=np.divide(U[:,:,ivars.ixmom],U[:,:,ivars.idens],out=np.zeros_like(U[:,:,ivars.ixmom]),where=(U[:,:,ivars.idens]!=0.0))q[:,:,ivars.iv]=np.divide(U[:,:,ivars.iymom],U[:,:,ivars.idens],out=np.zeros_like(U[:,:,ivars.iymom]),where=(U[:,:,ivars.idens]!=0.0))e=np.divide(U[:,:,ivars.iener]-0.5*q[:,:,ivars.irho]*(q[:,:,ivars.iu]**2+q[:,:,ivars.iv]**2),q[:,:,ivars.irho],out=np.zeros_like(U[:,:,ivars.iener]),where=(U[:,:,ivars.idens]!=0.0))e_min=e.v().min()rho_min=q.v(n=ivars.irho).min()asserte_min>0.0andrho_min>0.0,f"invalid state, min(rho) = {rho_min}, min(e) = {e_min}"q[:,:,ivars.ip]=eos.pres(gamma,q[:,:,ivars.irho],e)ifivars.naux>0:fornq,nuinzip(range(ivars.ix,ivars.ix+ivars.naux),range(ivars.irhox,ivars.irhox+ivars.naux)):q[:,:,nq]=U[:,:,nu]/q[:,:,ivars.irho]returnq
[docs]defprim_to_cons(q,gamma,ivars,myg):""" convert an input vector of primitive variables to conserved variables """U=myg.scratch_array(nvar=ivars.nvar)U[:,:,ivars.idens]=q[:,:,ivars.irho]U[:,:,ivars.ixmom]=q[:,:,ivars.iu]*U[:,:,ivars.idens]U[:,:,ivars.iymom]=q[:,:,ivars.iv]*U[:,:,ivars.idens]rhoe=eos.rhoe(gamma,q[:,:,ivars.ip])U[:,:,ivars.iener]=rhoe+0.5*q[:,:,ivars.irho]*(q[:,:,ivars.iu]**2+q[:,:,ivars.iv]**2)ifivars.naux>0:fornq,nuinzip(range(ivars.ix,ivars.ix+ivars.naux),range(ivars.irhox,ivars.irhox+ivars.naux)):U[:,:,nu]=q[:,:,nq]*q[:,:,ivars.irho]returnU
[docs]defget_external_sources(t,dt,U,ivars,rp,myg,*,U_old=None,problem_source=None):"""compute the external sources, including gravity"""_=t# maybe unusedS=myg.scratch_array(nvar=ivars.nvar)grav=rp.get_param("compressible.grav")ifU_oldisNone:# we are just computing the sources from the current state Uifmyg.coord_type==1:# gravity points in the radial direction for sphericalS[:,:,ivars.ixmom]=U[:,:,ivars.idens]*gravS[:,:,ivars.iener]=U[:,:,ivars.ixmom]*gravS[:,:,ivars.ixmom]+=U[:,:,ivars.iymom]**2/(U[:,:,ivars.idens]*myg.x2d)S[:,:,ivars.iymom]+=-U[:,:,ivars.ixmom]*U[:,:,ivars.iymom]/U[:,:,ivars.idens]else:# gravity points in the vertical (y) direction for CartesianS[:,:,ivars.iymom]=U[:,:,ivars.idens]*gravS[:,:,ivars.iener]=U[:,:,ivars.iymom]*gravelse:# we want to compute gravity using the time-updated momentum# we assume that U is an approximation to U^{n+1}, which includes# a full dt * S_oldifmyg.coord_type==1:S[:,:,ivars.ixmom]=U[:,:,ivars.idens]*gravS_old_xmom=U_old[:,:,ivars.idens]*grav# we want the corrected xmom that has a time-centered sourcexmom_new=U[:,:,ivars.ixmom]+0.5*dt*(S[:,:,ivars.ixmom]-S_old_xmom)S[:,:,ivars.iener]=xmom_new*gravS[:,:,ivars.ixmom]+=U[:,:,ivars.iymom]**2/(U[:,:,ivars.idens]*myg.x2d)S[:,:,ivars.iymom]+=-U[:,:,ivars.ixmom]*U[:,:,ivars.iymom]/U[:,:,ivars.idens]else:S[:,:,ivars.iymom]=U[:,:,ivars.idens]*gravS_old_ymom=U_old[:,:,ivars.idens]*grav# we want the corrected ymom that has a time-centered sourceymom_new=U[:,:,ivars.iymom]+0.5*dt*(S[:,:,ivars.iymom]-S_old_ymom)S[:,:,ivars.iener]=ymom_new*grav# now add the heatingifproblem_source:S_heating=problem_source(myg,U,ivars,rp)S[...]+=S_heatingreturnS
[docs]defget_sponge_factor(U,ivars,rp,myg):"""compute the sponge factor, f / tau, that goes into a sponge damping term of the form S = - (f / tau) (rho U)"""rho=U[:,:,ivars.idens]rho_begin=rp.get_param("sponge.sponge_rho_begin")rho_full=rp.get_param("sponge.sponge_rho_full")assertrho_begin>rho_fullf=myg.scratch_array()f[:,:]=np.where(rho>rho_begin,0.0,np.where(rho<rho_full,1.0,0.5*(1.0-np.cos(np.pi*(rho-rho_begin)/(rho_full-rho_begin)))))tau=rp.get_param("sponge.sponge_timescale")returnf/tau
[docs]classSimulation(NullSimulation):"""The main simulation class for the corner transport upwind compressible hydrodynamics solver """
[docs]definitialize(self,*,extra_vars=None,ng=4):""" Initialize the grid and variables for compressible flow and set the initial conditions for the chosen problem. """my_grid=grid_setup(self.rp,ng=ng)my_data=self.data_class(my_grid)# Make sure we use CGF for riemann solver when we do SphericalPolartry:riemann_method=self.rp.get_param("compressible.riemann")exceptKeyError:msg.warning("ERROR: Riemann Solver is not set.")ifmy_grid.coord_type==1andriemann_method=="HLLC":msg.fail("ERROR: HLLC Riemann Solver is not supported "+"with SphericalPolar Geometry")# define solver specific boundary condition routinesbnd.define_bc("hse",BC.user,is_solid=False)bnd.define_bc("ambient",BC.user,is_solid=False)bnd.define_bc("ramp",BC.user,is_solid=False)# for double mach reflection problembc,bc_xodd,bc_yodd=bc_setup(self.rp)# are we dealing with solid boundaries? we'll use these for# the Riemann solverself.solid=bnd.bc_is_solid(bc)# density and energymy_data.register_var("density",bc)my_data.register_var("energy",bc)my_data.register_var("x-momentum",bc_xodd)my_data.register_var("y-momentum",bc_yodd)# any extras?ifextra_varsisnotNone:forvinextra_vars:my_data.register_var(v,bc)# store the EOS gamma as an auxiliary quantity so we can have a# self-contained object stored in output files to make plots.# store grav because we'll need that in some BCsmy_data.set_aux("gamma",self.rp.get_param("eos.gamma"))my_data.set_aux("grav",self.rp.get_param("compressible.grav"))my_data.create()self.cc_data=my_dataifself.rp.get_param("particles.do_particles")==1:self.particles=particles.Particles(self.cc_data,bc,self.rp)# some auxiliary data that we'll need to fill GC in, but isn't# really part of the main solutionaux_data=self.data_class(my_grid)aux_data.register_var("dens_src",bc)aux_data.register_var("xmom_src",bc_xodd)aux_data.register_var("ymom_src",bc_yodd)aux_data.register_var("E_src",bc)aux_data.create()self.aux_data=aux_dataself.ivars=Variables(my_data)# derived variablesself.cc_data.add_derived(derives.derive_primitives)# initial conditions for the problemself.problem_func(self.cc_data,self.rp)ifself.verbose>0:print(my_data)
[docs]defmethod_compute_timestep(self):""" The timestep function computes the advective timestep (CFL) constraint. The CFL constraint says that information cannot propagate further than one zone per timestep. We use the driver.cfl parameter to control what fraction of the CFL step we actually take. """cfl=self.rp.get_param("driver.cfl")# get the variables we needu,v,cs=self.cc_data.get_var(["velocity","soundspeed"])grid=self.cc_data.grid# the timestep is min(dx/(|u| + cs), dy/(|v| + cs))xtmp=grid.Lx/(abs(u)+cs)ytmp=grid.Ly/(abs(v)+cs)self.dt=cfl*float(min(xtmp.min(),ytmp.min()))
[docs]defevolve(self):""" Evolve the equations of compressible hydrodynamics through a timestep dt. """self.clean_state(self.cc_data.data)tm_evolve=self.tc.timer("evolve")tm_evolve.begin()dens=self.cc_data.get_var("density")xmom=self.cc_data.get_var("x-momentum")ymom=self.cc_data.get_var("y-momentum")ener=self.cc_data.get_var("energy")gamma=self.rp.get_param("eos.gamma")myg=self.cc_data.grid# First get conserved states normal to the x and y interfaceU_xl,U_xr,U_yl,U_yr=flx.interface_states(self.cc_data,self.rp,self.ivars,self.tc,self.dt)# Apply source terms to them.# This includes external (gravity), geometric and pressure terms for SphericalPolar# Only gravitional source for Cartesian2dU_xl,U_xr,U_yl,U_yr=flx.apply_source_terms(U_xl,U_xr,U_yl,U_yr,self.cc_data,self.aux_data,self.rp,self.ivars,self.tc,self.dt,problem_source=self.problem_source)# Apply transverse corrections.U_xl,U_xr,U_yl,U_yr=flx.apply_transverse_flux(U_xl,U_xr,U_yl,U_yr,self.cc_data,self.rp,self.ivars,self.solid,self.tc,self.dt)# Get the actual interface conserved state after using Riemann Solver# Then construct the corresponding fluxes using the conserved statesifmyg.coord_type==1:# We need pressure from interface state for conservative update for# SphericalPolar geometry. So we need interface conserved states.F_x,U_x=riemann.riemann_flux(1,U_xl,U_xr,self.cc_data,self.rp,self.ivars,self.solid.xl,self.solid.xr,self.tc,return_cons=True)F_y,U_y=riemann.riemann_flux(2,U_yl,U_yr,self.cc_data,self.rp,self.ivars,self.solid.yl,self.solid.yr,self.tc,return_cons=True)# Find primitive variable since we need pressure in conservative update.qx=cons_to_prim(U_x,gamma,self.ivars,myg)qy=cons_to_prim(U_y,gamma,self.ivars,myg)else:# Directly calculate the interface flux using Riemann SolverF_x=riemann.riemann_flux(1,U_xl,U_xr,self.cc_data,self.rp,self.ivars,self.solid.xl,self.solid.xr,self.tc,return_cons=False)F_y=riemann.riemann_flux(2,U_yl,U_yr,self.cc_data,self.rp,self.ivars,self.solid.yl,self.solid.yr,self.tc,return_cons=False)# Apply artificial viscosity to fluxesq=cons_to_prim(self.cc_data.data,gamma,self.ivars,myg)F_x,F_y=flx.apply_artificial_viscosity(F_x,F_y,q,self.cc_data,self.rp,self.ivars)# save the old state (without ghost cells)U_old=myg.scratch_array(nvar=self.ivars.nvar)U_old[:,:,self.ivars.idens]=dens[:,:]U_old[:,:,self.ivars.ixmom]=xmom[:,:]U_old[:,:,self.ivars.iymom]=ymom[:,:]U_old[:,:,self.ivars.iener]=ener[:,:]# Conservative update# Apply contribution due to fluxesdtdV=self.dt/myg.V.v()forninrange(self.ivars.nvar):var=self.cc_data.get_var_by_index(n)var.v()[:,:]+=dtdV* \
(F_x.v(n=n)*myg.Ax.v()-F_x.ip(1,n=n)*myg.Ax.ip(1)+F_y.v(n=n)*myg.Ay.v()-F_y.jp(1,n=n)*myg.Ay.jp(1))# Now apply external sources# For SphericalPolar (coord_type == 1) there are pressure# gradients since we don't include pressure in xmom and ymom# fluxesifmyg.coord_type==1:xmom.v()[:,:]-=self.dt*(qx.ip(1,n=self.ivars.ip)-qx.v(n=self.ivars.ip))/myg.Lx.v()ymom.v()[:,:]-=self.dt*(qy.jp(1,n=self.ivars.ip)-qy.v(n=self.ivars.ip))/myg.Ly.v()# now the external sources (including gravity). We are going# to do a predictor-corrector here:## * compute old sources using old state: S^n = S(U^n)# * update state full dt using old sources: U^{n+1,*} += dt * S^n# * compute new sources using this updated state: S^{n+1) = S(U^{n+1,*})# * correct: U^{n+1} = U^{n+1,*} + dt/2 (S^{n+1} - S^n)S_old=get_external_sources(self.cc_data.t,self.dt,U_old,self.ivars,self.rp,myg,problem_source=self.problem_source)forninrange(self.ivars.nvar):var=self.cc_data.get_var_by_index(n)var.v()[:,:]+=self.dt*S_old.v(n=n)# now get the new time sourceS_new=get_external_sources(self.cc_data.t,self.dt,self.cc_data.data,self.ivars,self.rp,myg,U_old=U_old,problem_source=self.problem_source)# and correctforninrange(self.ivars.nvar):var=self.cc_data.get_var_by_index(n)var.v()[:,:]+=0.5*self.dt*(S_new.v(n=n)-S_old.v(n=n))# finally, do the sponge, if desired -- this is formulated as an# implicit update to the velocityifself.rp.get_param("sponge.do_sponge"):kappa_f=get_sponge_factor(self.cc_data.data,self.ivars,self.rp,myg)U_old=self.cc_data.data.copy()self.cc_data.data[:,:,self.ivars.ixmom]/=(1.0+self.dt*kappa_f)self.cc_data.data[:,:,self.ivars.iymom]/=(1.0+self.dt*kappa_f)# for energy, there is no change in density from the sponge, so we# can just apply the change in kinetic energy from the velocity updatedke=0.5*((self.cc_data.data[...,self.ivars.ixmom]**2+self.cc_data.data[...,self.ivars.iymom]**2)-(U_old[...,self.ivars.ixmom]**2+U_old[...,self.ivars.iymom]**2))/self.cc_data.data[...,self.ivars.idens]self.cc_data.data[...,self.ivars.iener]+=dkeifself.particlesisnotNone:self.particles.update_particles(self.dt)# increment the timeself.cc_data.t+=self.dtself.n+=1tm_evolve.end()
[docs]defclean_state(self,U):"""enforce minimum density and eint on the conserved state U"""U.v(n=self.ivars.idens)[:,:]=np.maximum(U.v(n=self.ivars.idens),self.rp.get_param("compressible.small_dens"))
[docs]defdovis(self):""" Do runtime visualization. """plt.clf()plt.rc("font",size=10)# we do this even though ivars is in self, so this works when# we are plotting from a fileivars=Variables(self.cc_data)# access gamma from the cc_data object so we can use dovis# outside of a running simulation.gamma=self.cc_data.get_aux("gamma")q=cons_to_prim(self.cc_data.data,gamma,ivars,self.cc_data.grid)rho=q[:,:,ivars.irho]u=q[:,:,ivars.iu]v=q[:,:,ivars.iv]p=q[:,:,ivars.ip]e=eos.rhoe(gamma,p)/rhomagvel=np.sqrt(u**2+v**2)myg=self.cc_data.gridfields=[rho,magvel,p,e]field_names=[r"$\rho$",r"U","p","e"]x=myg.scratch_array()y=myg.scratch_array()ifmyg.coord_type==1:x.v()[:,:]=myg.x2d.v()[:,:]*np.sin(myg.y2d.v()[:,:])y.v()[:,:]=myg.x2d.v()[:,:]*np.cos(myg.y2d.v()[:,:])else:x.v()[:,:]=myg.x2d.v()[:,:]y.v()[:,:]=myg.y2d.v()[:,:]_,axes,cbar_title=plot_tools.setup_axes(myg,len(fields))forn,axinenumerate(axes):v=fields[n]img=ax.pcolormesh(x.v(),y.v(),v.v(),shading="nearest",cmap=self.cm)ax.set_xlabel("x")ax.set_ylabel("y")# needed for PDF renderingcb=axes.cbar_axes[n].colorbar(img)cb.solids.set_rasterized(True)cb.solids.set_edgecolor("face")ifcbar_title:cb.ax.set_title(field_names[n])else:ax.set_title(field_names[n])ifself.particlesisnotNone:ax=axes[0]particle_positions=self.particles.get_positions()# dye particlescolors=self.particles.get_init_positions()[:,0]# plot particlesax.scatter(particle_positions[:,0],particle_positions[:,1],s=5,c=colors,alpha=0.8,cmap="Greys")ifmyg.coord_type==1:ax.set_xlim([np.min(x),np.max(x)])ax.set_ylim([np.min(y),np.max(y)])else:ax.set_xlim([myg.xmin,myg.xmax])ax.set_ylim([myg.ymin,myg.ymax])plt.figtext(0.05,0.0125,f"t = {self.cc_data.t:10.5g}")plt.pause(0.001)plt.draw()
[docs]defwrite_extras(self,f):""" Output simulation-specific data to the h5py file f """# make note of the custom BCgb=f.create_group("BC")# the value here is the value of "is_solid"gb.create_dataset("hse",data=False)gb.create_dataset("ambient",data=False)