Hello,
I’m currently testing obspy’s processing modules on approximately 1 million seismic traces from massdownloader. I’m running into issues with speed and I’m trying to diagnose and see if it’s a problem behind the keyboard and if not, find options for speeding things up.
Here is my current script for processing where I have event directories subdivided into raw, response, and processed directories:
#!/home/casey/miniconda3/envs/obspy/bin/python3
import obspy
import os
import string
import numpy as np
from obspy import UTCDateTime, read, read_inventory
# Initialize
n = 1
total = 0
ext = ".mseed"
record_length = 20 * 60
# Resample to how many Hz
resamp = 10.0
# Number of data points
ndps = int(record_length * resamp)
# Walk through current directory and build array of event directories. Current directory must only consist of
# event directories
path='.'
event_dir = next(os.walk(path))[1]
#print(event_dir)
# Loop through every event directories and find traces
for x in range(0, len(event_dir)):
st = read("./" + event_dir[x] + "/raw/*.mseed")
inv = read_inventory("./" + event_dir[x] + "/resp/*.xml")
# Actual starttime/endtime varies between stations in their headers since they are accurate to microseconds.
# However, the filename outputs starttime/endtime from massdownloader are identical between events so these
# will be used to define unique and consistent shot/receiver IDs between events.
for dirpath, dirnames, filenames in os.walk(event_dir[x]+"/raw"):
for filename in [f for f in filenames if f.endswith(ext)][:n]:
files = os.path.splitext(filename)[0]
files_splt = list(filter(None, files.split('_')))
starttime = files_splt[1]
endtime = files_splt[2]
count = 0
for y in range(0, len(st)):
tr = st[y].copy()
#print(tr)
network = tr.stats.network
station = tr.stats.station
location = tr.stats.location
channel = tr.stats.channel
print(network + "." + station + "." + location + "." + channel + "_" + starttime + "_" + endtime + "...begin")
## Trace processing
filt = (0.001, 0.005, 4.9, 5.0)
tr_demean = tr.detrend("demean")
tr_taper = tr.taper(max_percentage=0.05, type="hann")
# Remove instrument response
try:
tr_resp = tr.remove_response(inventory=inv, output='DISP', pre_filt=filt, water_level=0, zero_mean=False, taper=False)
# Downsample traces to 10 Hz
tr_inter = tr.interpolate(sampling_rate=resamp, method="lanczos", a=25, npts=ndps)
# Write to SAC file
tr.write("./" + event_dir[x] + "/processed/" + network + "." + station + "." + location + "." + channel
+ "_" + starttime + "_" + endtime,format="SAC")
print(network + "." + station + "." + location + "." + channel + "_" + starttime + "_" + endtime + "...done")
count += 1
except ValueError:
pass
print("!!! Value error, inconsistent/incorrect StationXML header...skipping")
print("\n\nEvent %d of %d done...%d traces processed\n\n" % (x+1, len(event_dir), count))
total = total + count
print("%d total traces processed\n\n" % (total))
Right now, each trace takes between 0.5-1.0 seconds to finish processing, but that would take far too long for the whole dataset I plan to download. The current implementation seems to be like a pipe function, the code waits for a trace to finish processing before moving on to the next so I’ve been taking a look at the multiprocessing module. I’m wondering if it’s also a problem with I/O since I’m working with a bunch of small files on a single mechanical hard drive.
I’m not familiar with python and I’ve been having trouble parallelizing it since the docs don’t seem to have much information on how to handle obspy streams with multiprocessing, so before I dive deeper into it I just want to make sure I’m not doing something that’s already been implemented. Thanks for the help.
I’ve also included my massdownloader script which creates the directory structure I’m using:
#!/home/casey/miniconda3/envs/obspy/bin/python3
import obspy
import os
from obspy import UTCDateTime
from obspy.clients.fdsn import Client
from obspy.clients.fdsn.mass_downloader import RectangularDomain, Restrictions, MassDownloader
t_start = UTCDateTime("2018-01-01T00:00:00")
t_end = UTCDateTime("2020-05-31T00:00:00")
t_before = 0.
t_after = 20. * 60
min_lat = -50
max_lat = 70
min_lon = -170
max_lon = -55
min_mag = 5.0
max_mag = 10.0
client = obspy.clients.fdsn.Client("IRIS")
iris_events = client.get_events(minlatitude=min_lat,maxlatitude=max_lat,
minlongitude=min_lon,maxlongitude=max_lon, minmagnitude=min_mag,
maxmagnitude=max_mag, starttime=t_start, endtime=t_end)
print("Obspy IRIS.get_events found %s event(s):" % len(iris_events))
#for event in events:
# print(event)
domain = RectangularDomain(minlatitude=min_lat, maxlatitude=max_lat,
minlongitude=min_lon, maxlongitude=max_lon)
for x in range(0, len(iris_events)):
restrictions = Restrictions(
starttime=iris_events[x].origins[0].time - t_before,
endtime=iris_events[x].origins[0].time + t_after,
reject_channels_with_gaps=True,
minimum_length=0.99,
minimum_interstation_distance_in_m=1000,
channel_priorities=["BH[Z]", "HH[Z]"],# "SH[Z]", "EH[Z]"],
sanitize=True)
mdl = MassDownloader(providers=["IRIS"])
event_id = str(iris_events[x].resource_id).split('=')[-1]
if not os.path.isdir(event_id):
os.mkdir(event_id)
raw_wf_dir = event_id+'/raw/'
if not os.path.isdir(raw_wf_dir):
os.mkdir(raw_wf_dir)
sta_resp_dir = event_id+'/resp/'
if not os.path.isdir(sta_resp_dir):
os.mkdir(sta_resp_dir)
processed_wf_dir = event_id+'/processed/'
if not os.path.isdir(processed_wf_dir):
os.mkdir(processed_wf_dir)
mdl.download(domain, restrictions, mseed_storage=raw_wf_dir,
stationxml_storage=sta_resp_dir, threads_per_client=3, print_report=True)
print("\n%d event of %d events complete\n" % (x+1,len(iris_events)))
Thanks a lot for the help.