Mingze Gao, PhD – Macquarie University June 8, 2020
Python 3.8 introduced a new module multiprocessing.shared_memory that provides shared memory for direct access across processes. My test shows that it significantly reduces the memory usage, which also speeds up the program by reducing the costs of copying and moving things around.1
1 This test is performed on a 2017 12-inch MacBook with 1.3 GHz Dual-Core Intel Core i5 and 8 GB 1867 MHz LPDDR3 RAM.
Test
In this test, I generated a 240MB numpy.recarray from a pandas.DataFrame with datetime, int and str typed columns. I used numpy.recarray because it can preserve the dtype of each column, so that later I can reconstruct the same array from the buffer of shared memory.
I performed a simple numpy.nansum on the numeric column of the data using two methods. The first method uses multiprocessing.shared_memory where the 4 spawned processes directly access the data in the shared memory. The second method passes the data to the spawned processes, which effectively means each process will have a separate copy of the data.
Test Result
Test result
A quick run of the test code below shows that the first method based on shared_memory uses minimal memory (peak usage is 0.33MB) and is much faster (2.09s) than the second one where the entire data is copied and passed into each process (peak memory usage of 1.8G and takes 216s). More importantly, the memory usage under the second method is consistently high.
Test Code
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-1"></a>from multiprocessing.shared_memory import SharedMemory
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-2"></a>from multiprocessing.managers import SharedMemoryManager
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-3"></a>from concurrent.futures import ProcessPoolExecutor, as_completed
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-4"></a>from multiprocessing import current_process, cpu_count, Process
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-5"></a>from datetime import datetime
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-6"></a>import numpy as np
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-7"></a>import pandas as pd
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-8"></a>import tracemalloc
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-9"></a>import time
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-10"></a>
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-11"></a>
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-12"></a>def work_with_shared_memory(shm_name, shape, dtype):
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-13"></a> print(f'With SharedMemory: {current_process()=}')
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-14"></a> # Locate the shared memory by its name
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-15"></a> shm = SharedMemory(shm_name)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-16"></a> # Create the np.recarray from the buffer of the shared memory
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-17"></a> np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-18"></a> return np.nansum(np_array.val)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-19"></a>
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-20"></a>
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-21"></a>def work_no_shared_memory(np_array: np.recarray):
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-22"></a> print(f'No SharedMemory: {current_process()=}')
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-23"></a> # Without shared memory, the np_array is copied into the child process
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-24"></a> return np.nansum(np_array.val)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-25"></a>
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-26"></a>
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-27"></a>if __name__ == "__main__":
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-28"></a> # Make a large data frame with date, float and character columns
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-29"></a> a = [
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-30"></a> (datetime.today(), 1, 'string'),
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-31"></a> (datetime.today(), np.nan, 'abc'),
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-32"></a> ] * 5000000
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-33"></a> df = pd.DataFrame(a, columns=['date', 'val', 'character_col'])
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-34"></a> # Convert into numpy recarray to preserve the dtypes
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-35"></a> np_array = df.to_records(index=False, column_dtypes={'character_col': 'S6'})
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-36"></a> del df
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-37"></a> shape, dtype = np_array.shape, np_array.dtype
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-38"></a> print(f"np_array's size={np_array.nbytes/1e6}MB")
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-39"></a>
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-40"></a> # With shared memory
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-41"></a> # Start tracking memory usage
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-42"></a> tracemalloc.start()
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-43"></a> start_time = time.time()
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-44"></a> with SharedMemoryManager() as smm:
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-45"></a> # Create a shared memory of size np_arry.nbytes
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-46"></a> shm = smm.SharedMemory(np_array.nbytes)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-47"></a> # Create a np.recarray using the buffer of shm
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-48"></a> shm_np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-49"></a> # Copy the data into the shared memory
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-50"></a> np.copyto(shm_np_array, np_array)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-51"></a> # Spawn some processes to do some work
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-52"></a> with ProcessPoolExecutor(cpu_count()) as exe:
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-53"></a> fs = [exe.submit(work_with_shared_memory, shm.name, shape, dtype)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-54"></a> for _ in range(cpu_count())]
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-55"></a> for _ in as_completed(fs):
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-56"></a> pass
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-57"></a> # Check memory usage
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-58"></a> current, peak = tracemalloc.get_traced_memory()
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-59"></a> print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB")
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-60"></a> print(f'Time elapsed: {time.time()-start_time:.2f}s')
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-61"></a> tracemalloc.stop()
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-62"></a>
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-63"></a> # Without shared memory
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-64"></a> tracemalloc.start()
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-65"></a> start_time = time.time()
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-66"></a> with ProcessPoolExecutor(cpu_count()) as exe:
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-67"></a> fs = [exe.submit(work_no_shared_memory, np_array)
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-68"></a> for _ in range(cpu_count())]
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-69"></a> for _ in as_completed(fs):
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-70"></a> pass
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-71"></a> # Check memory usage
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-72"></a> current, peak = tracemalloc.get_traced_memory()
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-73"></a> print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB")
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-74"></a> print(f'Time elapsed: {time.time()-start_time:.2f}s')
<a href="https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/#annotated-cell-1-75"></a> tracemalloc.stop()
Note on Segfault
Warning
A very important note about using multiprocessing.shared_memory, as at June 2020, is that the numpy.ndarray cannot have a dtype=dtype('O'). That is, the dtype cannot be dtype(object). If it is, there will be a segmentation fault when child processes try to access the shared memory and dereference it. It happens when the column contains strings.
To solve this problem, you need to specify the dtype in df.to_records(). For example:
Here, we specify that character_col contains strings of length 6. If it contains Unicode, we can use 'U6' instead. Longer strings will then be truncated at the specified length. As such, there won’t be anymore segfault
This website uses cookies to improve your experience. We'll assume you're ok with this, but you can opt-out if you wish.AcceptRead More
Privacy & Cookies Policy
Privacy Overview
This website uses cookies to improve your experience while you navigate through the website. Out of these, the cookies that are categorized as necessary are stored on your browser as they are essential for the working of basic functionalities of the website. We also use third-party cookies that help us analyze and understand how you use this website. These cookies will be stored in your browser only with your consent. You also have the option to opt-out of these cookies. But opting out of some of these cookies may affect your browsing experience.
Necessary cookies are absolutely essential for the website to function properly. This category only includes cookies that ensures basic functionalities and security features of the website. These cookies do not store any personal information.
Any cookies that may not be particularly necessary for the website to function and is used specifically to collect user personal data via analytics, ads, other embedded contents are termed as non-necessary cookies. It is mandatory to procure user consent prior to running these cookies on your website.