I am reading lines of text into a list 'rows', and trying to use multithreading to speedup. However, it doesn't speedup at all. I am watching the cpu usage on my Mac, and noticed that cpu is 145% with multithreading, but no speedup.
from concurrent.futures import ThreadPoolExecutor
te = TimeExtractor()
def time_test(text):
result = te.compute_time(text)
# print(result)
if __name__ == "__main__":
start = time.time()
rows = []
with open('data/data.csv', 'r', encoding='utf') as f:
csvreader = csv.DictReader(f, delimiter='\t', quoting=csv.QUOTE_ALL)
for row in csvreader:
rows.append(row['text'])
with ThreadPoolExecutor(4) as executor:
results = executor.map(time_test, rows)
end = time.time()
print(end-start)
print('Done!!!')
A simplified version of your code, using Multi-threading, is:
import time
import concurrent.futures
from multiprocessing import cpu_count
num_cpu = cpu_count()
print("CPU Count: ", num_cpu) # cpu_count doesnt really matter
e = concurrent.futures.ThreadPoolExecutor(num_cpu)
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
start = time.time()
for i in e.map(cpu_intensive_task, range(10)):
print(i, ' : in loop')
end = time.time()
print('LOOP DONE')
print('Total Time taken: ', (end-start))
Output:
CPU Count: 8
0 : start task
1 : start task
2 : start task
3 : start task
4 : start task
5 : start task
7 : start task
6 : start task
0 : end task
8 : start task
0 : in loop
1 : end task
9 : start task
1 : in loop
2 : end task
2 : in loop
3 : end task
3 : in loop
4 : end task
4 : in loop
5 : end task
5 : in loop
6 : end task
6 : in loop
7 : end task
7 : in loop
8 : end task
8 : in loop
9 : end task
9 : in loop
LOOP DONE
Total Time taken: 30.59025502204895
Note: Loop is exited only after all the threads are done
Same code without Multi-threading:
import time
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
start = time.time()
for i in range(10):
cpu_intensive_task(i)
print(i, ' : in loop')
end = time.time()
print('LOOP DONE')
print('Time taken: ', (end-start))
Output:
0 : start task
0 : end task
0 : in loop
1 : start task
1 : end task
1 : in loop
2 : start task
2 : end task
2 : in loop
3 : start task
3 : end task
3 : in loop
4 : start task
4 : end task
4 : in loop
5 : start task
5 : end task
5 : in loop
6 : start task
6 : end task
6 : in loop
7 : start task
7 : end task
7 : in loop
8 : start task
8 : end task
8 : in loop
9 : start task
9 : end task
9 : in loop
LOOP DONE
Time taken: 30.072215795516968
Note: Time taken is almost same as Multi-threading approach (slightly lesser). Multi-threading doesnt help this type of work load
Same code using multiprocessing:
import time
import sys
from multiprocessing import Process, Lock, Value, cpu_count
def cpu_intensive_task(i):
print(i, ' : start task')
count = 10000000 * (i+1)
while count > 0:
count -= 1
print(i, ' : end task')
return i
if __name__ == '__main__':
print("CPU Count: ", cpu_count())
start = time.time()
processes = []
for i in range(10):
p = Process(target=cpu_intensive_task, args=(i,))
processes.append(p)
p.start()
print(i, ' : in loop')
print('LOOP END')
for p in processes:
p.join()
end = time.time()
print('Total Time Taken: ', (end - start))
Output:
CPU Count: 8
0 : in loop
1 : in loop
2 : in loop
3 : in loop
4 : in loop
5 : in loop
6 : in loop
7 : in loop
8 : in loop
9 : in loop
LOOP END
0 : start task
1 : start task
2 : start task
3 : start task
5 : start task
4 : start task
8 : start task
7 : start task
6 : start task
9 : start task
0 : end task
1 : end task
2 : end task
3 : end task
4 : end task
5 : end task
6 : end task
7 : end task
8 : end task
9 : end task
Total Time Taken: 10.335741996765137
Note: Multiprocessing takes just 1/3rd of the time taken for Multi-threading approach
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. However, some extension modules, either standard or third-party, are designed so as to release the GIL when doing computationally-intensive tasks such as compression or hashing. Also, the GIL is always released when doing I/O.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine.
So, only multiprocessing allows utilisation of multiple processors and hence true concurrency.
do you need to use "Lock" in your multiprocessing version code? You imported it but didn't use it in your code. Once somewhere it is locked, it might not be faster?
@marlon no the lock isn't used here. But locks are needed if two concurrently running functions want to access memory that is being mutated. If you can divide up the work before sending it to the workers then you don't need a lock, as long as the result is returned to a single collector.
On a separate note, I want to point out a caveat that the multiprocess approach being used here is not a good example of a scalable solution. You shouldn't use an unbounded number of processes to handle your inputs (in this case it is being capped to 10 for the demo). You want to either use a pool of fixed worker processes, or use a semaphore that will not let you launch a new process until the running count drops bellow the max again.