"""
Stores and manages particles and updates their positions based
on the velocity on the grid.
"""
import numpy as np
from pyro.util import msg
[docs]
class Particle:
"""
Class to hold properties of a single (massless) particle.
This class could be extended (i.e. inherited from) to
model e.g. massive/charged particles.
"""
def __init__(self, x, y, u=0, v=0):
self.x = x
self.y = y
self.u = u
self.v = v
[docs]
def pos(self):
"""
Return position vector.
"""
return np.array([self.x, self.y])
[docs]
def velocity(self):
"""
Return velocity vector.
"""
return np.array([self.u, self.v])
[docs]
def update(self, u, v, dt):
"""
Advect the particle and update its velocity.
"""
self.u = u
self.v = v
self.x += u * dt
self.y += v * dt
[docs]
def interpolate_velocity(self, myg, u, v):
"""
Interpolate the x- and y-velocities defined on grid myg to the
particle's position.
Parameters
----------
myg : Grid2d
grid which the velocities are defined on
u : ArrayIndexer
x-velocity
v : ArrayIndexer
y_velocity
"""
# find what cell it lives in
x_idx = (self.x - myg.xmin) / myg.dx - 0.5
y_idx = (self.y - myg.ymin) / myg.dy - 0.5
x_frac = x_idx % 1
y_frac = y_idx % 1
# get the index of the particle's closest bottom left cell
# we'll add one as going to use buf'd grids -
# this will catch the cases where the particle is on the edges
# of the grid.
x_idx = int(x_idx) + 1
y_idx = int(y_idx) + 1
# interpolate velocity
u_vel = (1-x_frac)*(1-y_frac)*u.v(buf=1)[x_idx, y_idx] + \
x_frac*(1-y_frac)*u.v(buf=1)[x_idx+1, y_idx] + \
(1-x_frac)*y_frac*u.v(buf=1)[x_idx, y_idx+1] + \
x_frac*y_frac*u.v(buf=1)[x_idx+1, y_idx+1]
v_vel = (1-x_frac)*(1-y_frac)*v.v(buf=1)[x_idx, y_idx] + \
x_frac*(1-y_frac)*v.v(buf=1)[x_idx+1, y_idx] + \
(1-x_frac)*y_frac*v.v(buf=1)[x_idx, y_idx+1] + \
x_frac*y_frac*v.v(buf=1)[x_idx+1, y_idx+1]
return u_vel, v_vel
[docs]
class Particles:
"""
Class to hold multiple particles.
"""
def __init__(self, sim_data, bc, n_particles, particle_generator="grid",
pos_array=None, init_array=None):
"""
Initialize the Particles object.
Particles are stored as a dictionary, with their keys being tuples
of their initial position. This was done in order to have a simple way
to access the initial particle positions when plotting.
However, this assumes that no two particles are
initialised with the same initial position, which is fine for the
massless particle case, however could no longer be a sensible thing
to do if have particles have other properties (e.g. mass).
Parameters
----------
sim_data : CellCenterData2d object
The cell-centered simulation data
bc : BC object
Boundary conditions
n_particles : int
Number of particles
particle_generator : string or function
String with generator name of custom particle generator function
pos_array : float array
Array of particle positions to use with particle initialization
init_array : float array
Array of initial particle positions required for plotting from file.
"""
self.sim_data = sim_data
self.bc = bc
self.particles = {}
if n_particles <= 0:
msg.fail("ERROR: n_particles = %s <= 0" % (n_particles))
if callable(particle_generator): # custom particle generator function
self.particles = particle_generator(n_particles)
else:
if particle_generator == "random":
self.randomly_generate_particles(n_particles)
elif particle_generator == "grid":
self.grid_generate_particles(n_particles)
elif particle_generator == "array":
self.array_generate_particles(pos_array, init_array)
else:
msg.fail("ERROR: do not recognise particle generator %s"
% (particle_generator))
self.n_particles = len(self.particles)
[docs]
def randomly_generate_particles(self, n_particles):
"""
Randomly generate n_particles.
"""
myg = self.sim_data.grid
positions = np.random.rand(n_particles, 2)
positions[:, 0] = positions[:, 0] * (myg.xmax - myg.xmin) + \
myg.xmin
positions[:, 1] = positions[:, 1] * (myg.ymax - myg.ymin) + \
myg.ymin
for (x, y) in positions:
self.particles[(x, y)] = Particle(x, y)
[docs]
def grid_generate_particles(self, n_particles):
"""
Generate particles equally spaced across the grid.
Currently has the same number of particles in the x and y
directions (so dx != dy unless the domain is square) -
may be better to scale this.
If necessary, shall increase/decrease n_particles
in order to fill grid.
"""
sq_n_particles = int(round(np.sqrt(n_particles)))
if sq_n_particles**2 != n_particles:
msg.warning(f"WARNING: Changing number of particles from {n_particles} to {sq_n_particles**2}")
myg = self.sim_data.grid
xs, step = np.linspace(myg.xmin, myg.xmax, num=sq_n_particles, endpoint=False, retstep=True)
xs += 0.5 * step
ys, step = np.linspace(myg.ymin, myg.ymax, num=sq_n_particles, endpoint=False, retstep=True)
ys += 0.5 * step
for x in xs:
for y in ys:
self.particles[(x, y)] = Particle(x, y)
[docs]
def array_generate_particles(self, pos_array, init_array=None):
"""
Generate particles based on array of their positions. This is used
when reading particle data from file.
Parameters
----------
pos_array : float array
Array of particle positions.
init_array : float array
Array of initial particle positions.
"""
# check a pos_array has been passed to the constructor
if pos_array is None:
msg.fail("ERROR: Array of particle positions has not been passed into Particles constructor.\
Cannot generate particles.")
if init_array is None:
for (x, y) in pos_array:
self.particles[(x, y)] = Particle(x, y)
else:
for (ix, iy), (x, y) in zip(init_array, pos_array):
self.particles[(ix, iy)] = Particle(x, y)
[docs]
def update_particles(self, dt, u=None, v=None):
r"""
Update the particles on the grid. This is based off the
``AdvectWithUcc`` function in AMReX, which used the midpoint
method to advance particles using the cell-centered velocity.
We will explicitly pass in u and v if they cannot be accessed from the
``sim_data`` using ``get_var("velocity")``.
Parameters
----------
dt : float
timestep
u : ArrayIndexer object
x-velocity
v : ArrayIndexer object
y-velocity
"""
myg = self.sim_data.grid
if (u is None) and (v is None):
u, v = self.sim_data.get_var("velocity")
elif u is None:
u = self.sim_data.get_var("x-velocity")
elif v is None:
v = self.sim_data.get_var("y-velocity")
for p in self.particles.values():
# save old position and predict location at dt/2
initial_position = p.pos()
u_vel, v_vel = p.interpolate_velocity(myg, u, v)
p.update(u_vel, v_vel, 0.5*dt)
# find velocity at dt/2
u_vel, v_vel = p.interpolate_velocity(myg, u, v)
# update to final time using original position and vel at dt/2
p.x, p.y = initial_position
p.update(u_vel, v_vel, dt)
self.enforce_particle_boundaries()
[docs]
def enforce_particle_boundaries(self):
"""
Enforce the particle boundaries
TODO: copying the dict and adding everything back again is messy
- think of a better way to do this? Did it this way so don't have
to remove items from a dictionary while iterating it for outflow
boundaries.
"""
old_particles = self.particles.copy()
self.particles = {}
myg = self.sim_data.grid
xlb = self.bc.xlb
xrb = self.bc.xrb
ylb = self.bc.ylb
yrb = self.bc.yrb
while old_particles:
k, p = old_particles.popitem()
# -x boundary
if p.x < myg.xmin:
if xlb in ["outflow", "neumann"]:
continue
if xlb == "periodic":
p.x = myg.xmax + p.x - myg.xmin
elif xlb in ["reflect-even", "reflect-odd", "dirichlet"]:
p.x = 2 * myg.xmin - p.x
else:
msg.fail("ERROR: xlb = %s invalid BC for particles" % (xlb))
# +x boundary
if p.x > myg.xmax:
if xrb in ["outflow", "neumann"]:
continue
if xrb == "periodic":
p.x = myg.xmin + p.x - myg.xmax
elif xrb in ["reflect-even", "reflect-odd", "dirichlet"]:
p.x = 2 * myg.xmax - p.x
else:
msg.fail("ERROR: xrb = %s invalid BC for particles" % (xrb))
# -y boundary
if p.y < myg.ymin:
if ylb in ["outflow", "neumann"]:
continue
if ylb == "periodic":
p.y = myg.ymax + p.y - myg.ymin
elif ylb in ["reflect-even", "reflect-odd", "dirichlet"]:
p.y = 2 * myg.ymin - p.y
else:
msg.fail("ERROR: ylb = %s invalid BC for particles" % (ylb))
# +y boundary
if p.y > myg.ymax:
if yrb in ["outflow", "neumann"]:
continue
if yrb == "periodic":
p.y = myg.ymin + p.y - myg.ymax
elif yrb in ["reflect-even", "reflect-odd", "dirichlet"]:
p.y = 2 * myg.ymax - p.y
else:
msg.fail("ERROR: yrb = %s invalid BC for particles" % (yrb))
self.particles[k] = p
self.n_particles = len(self.particles)
[docs]
def get_positions(self):
"""
Return an array of current particle positions.
"""
return np.array([[p.x, p.y] for p in self.particles.values()])
[docs]
def get_init_positions(self):
"""
Return initial positions of the particles as an array.
We defined the particles as a dictionary with their initial positions
as the keys, so this just becomes a restructuring operation.
"""
return np.array([[x, y] for (x, y) in self.particles.keys()])
[docs]
def write_particles(self, f):
"""
Output the particles' positions (and initial positions) to an HDF5 file.
Parameters
----------
f : h5py object
HDF5 file to write to
"""
gparticles = f.create_group("particles")
gparticles.create_dataset("init_particle_positions",
data=self.get_init_positions())
gparticles.create_dataset("particle_positions",
data=self.get_positions())