Data processing with Multithreading/Multiprocessing/MPI

Hello,

I’m currently testing obspy’s processing modules on approximately 1 million seismic traces from massdownloader. I’m running into issues with speed and I’m trying to diagnose and see if it’s a problem behind the keyboard and if not, find options for speeding things up.

Here is my current script for processing where I have event directories subdivided into raw, response, and processed directories:

#!/home/casey/miniconda3/envs/obspy/bin/python3

import obspy
import os
import string
import numpy as np
from obspy import UTCDateTime, read, read_inventory

# Initialize
n = 1
total = 0
ext = ".mseed"
record_length = 20 * 60
# Resample to how many Hz
resamp = 10.0
# Number of data points
ndps = int(record_length * resamp)

# Walk through current directory and build array of event directories. Current directory must only consist of 
# event directories
path='.'
event_dir = next(os.walk(path))[1]
#print(event_dir)

# Loop through every event directories and find traces
for x in range(0, len(event_dir)):		
	st = read("./" + event_dir[x] + "/raw/*.mseed")
	inv = read_inventory("./" + event_dir[x] + "/resp/*.xml")
	# Actual starttime/endtime varies between stations in their headers since they are accurate to microseconds. 
	# However, the filename outputs starttime/endtime from massdownloader are identical between events so these 
	# will be used to define unique and consistent shot/receiver IDs between events.
	for dirpath, dirnames, filenames in os.walk(event_dir[x]+"/raw"):
		for filename in [f for f in filenames if f.endswith(ext)][:n]:
			files = os.path.splitext(filename)[0]
			files_splt = list(filter(None, files.split('_')))
			starttime = files_splt[1]
			endtime = files_splt[2]
	count = 0
	for y in range(0, len(st)):
		tr = st[y].copy()
		#print(tr)
		network = tr.stats.network
		station = tr.stats.station
		location = tr.stats.location
		channel = tr.stats.channel

		print(network + "." + station + "." + location + "." + channel + "_" + starttime + "_" + endtime + "...begin")
		## Trace processing
		filt = (0.001, 0.005, 4.9, 5.0)
		tr_demean = tr.detrend("demean")
		tr_taper = tr.taper(max_percentage=0.05, type="hann")
		# Remove instrument response
		try:
			tr_resp = tr.remove_response(inventory=inv, output='DISP', pre_filt=filt, water_level=0, zero_mean=False, taper=False)
			# Downsample traces to 10 Hz
			tr_inter = tr.interpolate(sampling_rate=resamp, method="lanczos", a=25, npts=ndps)

			# Write to SAC file
			tr.write("./" + event_dir[x] + "/processed/" + network + "." + station + "." + location + "." + channel 
					+ "_" + starttime + "_" + endtime,format="SAC")
			print(network + "." + station + "." + location + "." + channel + "_" + starttime + "_" + endtime + "...done")
			count += 1
		except ValueError:
			pass
			print("!!! Value error, inconsistent/incorrect StationXML header...skipping")
	
	print("\n\nEvent %d of %d done...%d traces processed\n\n" % (x+1, len(event_dir), count))
	total = total + count

print("%d total traces processed\n\n" % (total))	

Right now, each trace takes between 0.5-1.0 seconds to finish processing, but that would take far too long for the whole dataset I plan to download. The current implementation seems to be like a pipe function, the code waits for a trace to finish processing before moving on to the next so I’ve been taking a look at the multiprocessing module. I’m wondering if it’s also a problem with I/O since I’m working with a bunch of small files on a single mechanical hard drive.

I’m not familiar with python and I’ve been having trouble parallelizing it since the docs don’t seem to have much information on how to handle obspy streams with multiprocessing, so before I dive deeper into it I just want to make sure I’m not doing something that’s already been implemented. Thanks for the help.

I’ve also included my massdownloader script which creates the directory structure I’m using:

#!/home/casey/miniconda3/envs/obspy/bin/python3

import obspy
import os
from obspy import UTCDateTime
from obspy.clients.fdsn import Client
from obspy.clients.fdsn.mass_downloader import RectangularDomain, Restrictions, MassDownloader

t_start = UTCDateTime("2018-01-01T00:00:00")
t_end = UTCDateTime("2020-05-31T00:00:00")
t_before = 0.
t_after = 20. * 60
min_lat = -50
max_lat = 70
min_lon = -170
max_lon = -55
min_mag = 5.0
max_mag = 10.0

client = obspy.clients.fdsn.Client("IRIS")
iris_events = client.get_events(minlatitude=min_lat,maxlatitude=max_lat,
								minlongitude=min_lon,maxlongitude=max_lon, minmagnitude=min_mag,
								maxmagnitude=max_mag, starttime=t_start, endtime=t_end)
						   
print("Obspy IRIS.get_events found %s event(s):" % len(iris_events))
#for event in events:
#	print(event)

domain = RectangularDomain(minlatitude=min_lat, maxlatitude=max_lat,
						   minlongitude=min_lon, maxlongitude=max_lon)

for x in range(0, len(iris_events)):
	restrictions = Restrictions(
		starttime=iris_events[x].origins[0].time - t_before,
		endtime=iris_events[x].origins[0].time + t_after,
		reject_channels_with_gaps=True,
		minimum_length=0.99,
		minimum_interstation_distance_in_m=1000,
		channel_priorities=["BH[Z]", "HH[Z]"],# "SH[Z]", "EH[Z]"],
		sanitize=True)

	mdl = MassDownloader(providers=["IRIS"])
	event_id = str(iris_events[x].resource_id).split('=')[-1]
	if not os.path.isdir(event_id):
		os.mkdir(event_id)
	raw_wf_dir = event_id+'/raw/'
	if not os.path.isdir(raw_wf_dir):
		os.mkdir(raw_wf_dir)
	sta_resp_dir = event_id+'/resp/'
	if not os.path.isdir(sta_resp_dir):
		os.mkdir(sta_resp_dir)
	processed_wf_dir = event_id+'/processed/'
	if not os.path.isdir(processed_wf_dir):
		os.mkdir(processed_wf_dir)
	mdl.download(domain, restrictions, mseed_storage=raw_wf_dir,
		stationxml_storage=sta_resp_dir, threads_per_client=3, print_report=True)
	print("\n%d event of %d events complete\n" % (x+1,len(iris_events)))

Thanks a lot for the help.

Maybe this PR has some insight, but I never needed this and didn’t look into it much: https://github.com/obspy/obspy/pull/1218

Here is an example of multithreading some code. Basically, I make a function to process each piece of data and then send a list to the a function via pool.map. As in line 66. The function on line 10 is similar to what you are doing (read in some data process it and write something out).

Thanks for the examples, I’ll study them and try to incorporate them into my code.

Like @Ringler_Adam’s code, most multiprocessing examples I’ve found pass lists of strings like filenames or values to Pool. I could loop through every directory and compile a list of mseed and xml filenames then pass those to Pool.

Since I’m trying to be efficient though, right now I’m reading entire directories into st (line28) and inv (line29) so is it faster or even possible to pass the whole streams to Pool for processing rather than reading files one by one?

Another option might be to use MPI - @krischer pointed me to this example a while back which might be useful. I have found it helpful for processing large datasets. It avoids the memory overhead associated with multiprocessing, and allows for more parallelism than multithreading.

Thanks for the link @calum-chamberlain, mpi does seem like a pretty good option. Did you find mpi4py plays nicer with obspy than multiprocessing in terms of coding? The obspy documentation only briefly touches on parallel processing and it hasn’t been updated for a while.

Now I think I might try coding using mpi4py and multiprocessing, then running some benchmarks for comparison.

No worries. Multiprocessing plays fairly nicely with ObsPy, I use multiprocessing for processing data in EQcorrscan (see the pre_processing functions here), but the memory overhead can be a real annoyance. I use multiprocessing to process each Trace in a Stream concurrently - but that means that you end up with n_processes copies of the Stream in memory. The speed-up can be useful, but it is balanced against the time taken to copy objects.

Because you are processing many short chunks of data, I think that the costs associated with copying in multiprocessing would mean that you wouldn’t see that much speed-up. If you want to just take the code from EQcorrscan and add in your steps to the process function you could quickly see whether it is worth expending any more effort on multiprocessing in that way.

I would probably go with MPI parallelism where each event (and stream) is handled (potentially, depending on the number of workers) by a separate worker. Going with multiprocessing over the event loop would also be a possibility, but, because the processes themselves do not need to communicate then the costs of multiprocessing don’t seem worth it.

Parallelism in Python isn’t that easy… the GIL often limits what can be done. I’m thinking of starting an ObsPy-Accelerated repository trying to use things like CuPy and numba to speed up some functions.

Although, if you are perusing Python native parallelism I would recommend starting off with concurrent.futures, which uses multiprocessing, but provides a nicer, higher-level interface.

Thanks a lot for the help and info, it seems I have some homework to do but I do have a direction to work towards now.

This is just a test for the discourse github linkback plugin, which should make an entry at the respective pull request over at: https://github.com/obspy/obspy/pull/1218 (works!)

It may also be work looking at Dask https://dask.org/
It manages a lot of the coordination for you and plays well with numpy i.e. traces.
It’s also easy to add another machine if you need more compute.