mirror of
https://github.com/kuhyx/WUT_Computer_Science.git
synced 2026-07-04 14:43:08 +02:00
feat: faster thread method
This commit is contained in:
parent
fd94f597a5
commit
fcfd95a1af
@ -154,15 +154,6 @@ class ThreadsLinearAlgebraUtils:
|
|||||||
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks)
|
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks)
|
||||||
return sum(results)
|
return sum(results)
|
||||||
|
|
||||||
# @staticmethod
|
|
||||||
# @time_measurement(time_accumulator)
|
|
||||||
# def matrix_vector_multiply(A, x):
|
|
||||||
# chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
|
|
||||||
# with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
|
|
||||||
# func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x)
|
|
||||||
# results = executor.map(func, chunks)
|
|
||||||
# return [item for sublist in results for item in sublist]
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@time_measurement(time_accumulator)
|
@time_measurement(time_accumulator)
|
||||||
def matrix_vector_multiply(A, x):
|
def matrix_vector_multiply(A, x):
|
||||||
|
|||||||
@ -2,7 +2,7 @@ import pytest
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
from matrix_generator import MatrixGenerator
|
from matrix_generator import MatrixGenerator
|
||||||
from richardson_method import RichardsonMethod
|
from richardson_method import RichardsonMethod
|
||||||
from threads import RichardsonMethodThreads
|
from threads_indep import RichardsonMethodThreads
|
||||||
from processing_type import ProcessingType
|
from processing_type import ProcessingType
|
||||||
from time_measurement import time_measurement, tests_time
|
from time_measurement import time_measurement, tests_time
|
||||||
|
|
||||||
@ -41,10 +41,10 @@ def solution_lib(A, b):
|
|||||||
10000
|
10000
|
||||||
])
|
])
|
||||||
@pytest.mark.parametrize("processing_type", [
|
@pytest.mark.parametrize("processing_type", [
|
||||||
# ProcessingType.SEQUENTIAL,
|
ProcessingType.SEQUENTIAL,
|
||||||
ProcessingType.THREADS#,
|
ProcessingType.THREADS,
|
||||||
# ProcessingType.PROCESSES,
|
ProcessingType.PROCESSES,
|
||||||
# ProcessingType.DISTRIBUTED_ARRAYS
|
ProcessingType.DISTRIBUTED_ARRAYS
|
||||||
])
|
])
|
||||||
@pytest.mark.parametrize("matrix_type", [
|
@pytest.mark.parametrize("matrix_type", [
|
||||||
"spd",
|
"spd",
|
||||||
|
|||||||
103
code/threads_indep.py
Normal file
103
code/threads_indep.py
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
import multiprocessing
|
||||||
|
import gc
|
||||||
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time
|
||||||
|
import linear_algebra_utils as linAlg
|
||||||
|
|
||||||
|
|
||||||
|
@time_measurement_longest(longest_threads_time_accumulator)
|
||||||
|
def matrix_vector_multiply(A, input_x, start, end, Ax):
|
||||||
|
Ax[start:end] = [sum(x*y for x, y in zip(row, input_x)) for row in A[start:end]]
|
||||||
|
|
||||||
|
@time_measurement_longest(longest_threads_time_accumulator)
|
||||||
|
def vector_vector_subtraction(b, Ax, start, end, residual):
|
||||||
|
residual[start:end] = [x-y for x, y in zip(b[start:end], Ax[start:end])]
|
||||||
|
|
||||||
|
@time_measurement_longest(longest_threads_time_accumulator)
|
||||||
|
def scalar_vector_multiply(omega, vector, start, end, result):
|
||||||
|
result[start:end] = [omega * x for x in vector[start:end]]
|
||||||
|
|
||||||
|
@time_measurement_longest(longest_threads_time_accumulator)
|
||||||
|
def vector_vector_addition(input_x, vector, start, end, output_x):
|
||||||
|
output_x[start:end] = [x+y for x, y in zip(input_x[start:end], vector[start:end])]
|
||||||
|
|
||||||
|
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
|
||||||
|
longest_threads_time_accumulator.hard_reset()
|
||||||
|
|
||||||
|
gc.disable()
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
|
||||||
|
n = len(b)
|
||||||
|
x0 = x0 if x0 is not None else [0.0] * len(b)
|
||||||
|
x = x0[:]
|
||||||
|
omega = 2 / (lambda_min + lambda_max)
|
||||||
|
num_threads = multiprocessing.cpu_count()
|
||||||
|
chunk_size = n // num_threads
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=num_threads) as executor: # wątki są tworzone raz i nie są niszczone
|
||||||
|
for iteration in range(max_iterations):
|
||||||
|
|
||||||
|
Ax = [0] * len(x) # tutaj zostanie przypisany wynik z mnożenia macierzy A z wektorem x
|
||||||
|
futures = []
|
||||||
|
|
||||||
|
for i in range(num_threads):
|
||||||
|
start = i * chunk_size
|
||||||
|
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||||
|
futures.append(executor.submit(matrix_vector_multiply, A, x, start, end, Ax))
|
||||||
|
|
||||||
|
for future in futures:
|
||||||
|
future.result()
|
||||||
|
|
||||||
|
longest_threads_time_accumulator.save_lap_and_reset()
|
||||||
|
residual = [0] * len(b) # tutaj zostanie przypisany wynik z vector_vector_subtraction
|
||||||
|
futures = []
|
||||||
|
|
||||||
|
for i in range(num_threads):
|
||||||
|
start = i * chunk_size
|
||||||
|
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||||
|
futures.append(executor.submit(vector_vector_subtraction, b, Ax, start, end, residual))
|
||||||
|
|
||||||
|
for future in futures:
|
||||||
|
future.result()
|
||||||
|
|
||||||
|
longest_threads_time_accumulator.save_lap_and_reset()
|
||||||
|
change_vector = [0] * len(residual) # zostanie tu przypisany wynik scalar_vector_multiply po pracy wątków
|
||||||
|
futures = []
|
||||||
|
|
||||||
|
for i in range(num_threads):
|
||||||
|
start = i * chunk_size
|
||||||
|
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||||
|
futures.append(executor.submit(scalar_vector_multiply, omega, residual, start, end, change_vector))
|
||||||
|
|
||||||
|
for future in futures:
|
||||||
|
future.result()
|
||||||
|
|
||||||
|
longest_threads_time_accumulator.save_lap_and_reset()
|
||||||
|
_x = x[:] # do _x zostanie przez wątki przypisany wynik pracy w danej iteracji
|
||||||
|
futures = []
|
||||||
|
|
||||||
|
for i in range(num_threads):
|
||||||
|
start = i * chunk_size
|
||||||
|
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||||
|
futures.append(executor.submit(vector_vector_addition, x, change_vector, start, end, _x))
|
||||||
|
|
||||||
|
for future in futures:
|
||||||
|
future.result()
|
||||||
|
|
||||||
|
longest_threads_time_accumulator.save_lap_and_reset()
|
||||||
|
x = _x[:]
|
||||||
|
|
||||||
|
|
||||||
|
if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol):
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
gc.enable()
|
||||||
|
total_time = end_time - start_time
|
||||||
|
sequential_time = total_time - longest_threads_time_accumulator.total_time
|
||||||
|
|
||||||
|
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
|
||||||
|
|
||||||
|
return x, 0
|
||||||
@ -1,119 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
import threading
|
|
||||||
import multiprocessing
|
|
||||||
import gc
|
|
||||||
import time
|
|
||||||
import sys
|
|
||||||
from time_measurement import time_measurement_longest, longest_time_accumulator, tests_time
|
|
||||||
import linear_algebra_utils as linAlg
|
|
||||||
|
|
||||||
|
|
||||||
@time_measurement_longest(longest_time_accumulator)
|
|
||||||
def RichardsonThread(A, b, x, _x, omega, start, end):
|
|
||||||
for i in range(start, end):
|
|
||||||
sigma = np.dot(A[i, :], x) - A[i, i] * x[i]
|
|
||||||
x[i] = (1 - omega) * x[i] + omega * (b[i] - sigma) / A[i, i]
|
|
||||||
|
|
||||||
def matrix_vector_multiply(A, x, start, end, Ax):
|
|
||||||
Ax[start:end] = [sum(xx*yy for xx, yy in zip(row, x)) for row in A[start:end]]
|
|
||||||
|
|
||||||
def vector_vector_subtraction(b, Ax, start, end, residual):
|
|
||||||
residual[start:end] = [xx-yy for xx, yy in zip(b[start:end], Ax[start:end])]
|
|
||||||
|
|
||||||
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
|
|
||||||
longest_time_accumulator.total_time = 0
|
|
||||||
longest_time_accumulator.start = sys.float_info.max
|
|
||||||
longest_time_accumulator.end = 0
|
|
||||||
|
|
||||||
gc.disable()
|
|
||||||
start_time = time.perf_counter()
|
|
||||||
|
|
||||||
n = len(b)
|
|
||||||
x0 = x0 if x0 is not None else [0.0] * len(b)
|
|
||||||
x = x0[:]
|
|
||||||
omega = 2 / (lambda_min + lambda_max)
|
|
||||||
num_threads = multiprocessing.cpu_count()
|
|
||||||
threads = []
|
|
||||||
chunk_size = n // num_threads
|
|
||||||
|
|
||||||
for iteration in range(max_iterations):
|
|
||||||
# chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
|
|
||||||
|
|
||||||
# with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
|
|
||||||
# func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x)
|
|
||||||
# results = executor.map(func, chunks)
|
|
||||||
Ax = [0] * len(x)
|
|
||||||
for i in range(num_threads):
|
|
||||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
|
||||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
|
||||||
thread = threading.Thread(target=matrix_vector_multiply, args=(A, x, start, end, Ax))
|
|
||||||
threads.append(thread)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
for thread in threads:
|
|
||||||
thread.join()
|
|
||||||
|
|
||||||
|
|
||||||
residual = [0] * len(b)
|
|
||||||
|
|
||||||
for i in range(num_threads):
|
|
||||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
|
||||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
|
||||||
thread = threading.Thread(target=vector_vector_subtraction, args=(b, Ax, start, end, residual))
|
|
||||||
threads.append(thread)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
for thread in threads:
|
|
||||||
thread.join()
|
|
||||||
|
|
||||||
|
|
||||||
x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual))
|
|
||||||
|
|
||||||
for i in range(num_threads):
|
|
||||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
|
||||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
|
||||||
thread = threading.Thread(target=scalar_vector_multiply, args=(A, b, x, omega, start, end))
|
|
||||||
threads.append(thread)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
for thread in threads:
|
|
||||||
thread.join()
|
|
||||||
|
|
||||||
|
|
||||||
x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual))
|
|
||||||
|
|
||||||
for i in range(num_threads):
|
|
||||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
|
||||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
|
||||||
thread = threading.Thread(target=vector_vector_addition, args=(A, b, x, omega, start, end))
|
|
||||||
threads.append(thread)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
for thread in threads:
|
|
||||||
thread.join()
|
|
||||||
|
|
||||||
if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < self.tol):
|
|
||||||
break
|
|
||||||
|
|
||||||
end_time = time.perf_counter()
|
|
||||||
gc.enable()
|
|
||||||
total_time = end_time - start_time
|
|
||||||
sequential_time = total_time - longest_time_accumulator.total_time
|
|
||||||
|
|
||||||
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
|
|
||||||
|
|
||||||
return x, 0
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# # Przykładowe dane wejściowe
|
|
||||||
# np.random.seed(0) # Ustalanie ziarna dla powtarzalności wyników
|
|
||||||
# A = np.random.rand(20, 20) + 20 * np.eye(20) # Macierz przekątniowa z losowymi elementami
|
|
||||||
# b = np.random.rand(20) # Wektor wyrazów wolnych
|
|
||||||
# omega = 0.2
|
|
||||||
# n_iterations = 1000
|
|
||||||
|
|
||||||
# # Rozwiązanie układu równań metodą Richardson'a
|
|
||||||
# x = RichardsonMethodThreads(A, b, 5, 5, n_iterations)
|
|
||||||
# print("Rozwiązanie: ", x)
|
|
||||||
@ -58,15 +58,3 @@ def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=Non
|
|||||||
return x, 0
|
return x, 0
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# # Przykładowe dane wejściowe
|
|
||||||
# np.random.seed(0) # Ustalanie ziarna dla powtarzalności wyników
|
|
||||||
# A = np.random.rand(20, 20) + 20 * np.eye(20) # Macierz przekątniowa z losowymi elementami
|
|
||||||
# b = np.random.rand(20) # Wektor wyrazów wolnych
|
|
||||||
# omega = 0.2
|
|
||||||
# n_iterations = 1000
|
|
||||||
|
|
||||||
# # Rozwiązanie układu równań metodą Richardson'a
|
|
||||||
# x = RichardsonMethodThreads(A, b, 5, 5, n_iterations)
|
|
||||||
# print("Rozwiązanie: ", x)
|
|
||||||
@ -8,16 +8,28 @@ class TimeAccumulator:
|
|||||||
|
|
||||||
class ComplexTimeAcumulator:
|
class ComplexTimeAcumulator:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
self.hard_reset()
|
||||||
|
|
||||||
|
def hard_reset(self):
|
||||||
self.total_time = 0
|
self.total_time = 0
|
||||||
|
self.reset()
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self.lap_time = 0
|
||||||
self.start = sys.float_info.max
|
self.start = sys.float_info.max
|
||||||
self.end = 0
|
self.end = 0
|
||||||
|
|
||||||
|
def save_lap_and_reset(self):
|
||||||
|
self.total_time += self.lap_time
|
||||||
|
self.reset()
|
||||||
|
|
||||||
|
|
||||||
time_accumulator = TimeAccumulator()
|
time_accumulator = TimeAccumulator()
|
||||||
tests_time = TimeAccumulator()
|
tests_time = TimeAccumulator()
|
||||||
|
|
||||||
longest_time_accumulator = ComplexTimeAcumulator()
|
longest_threads_time_accumulator = ComplexTimeAcumulator()
|
||||||
|
|
||||||
def time_measurement(accumulator):
|
def time_measurement(accumulator: TimeAccumulator):
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def inner(*args, **kwargs):
|
def inner(*args, **kwargs):
|
||||||
@ -40,7 +52,7 @@ def time_measurement_longest(accumulator: ComplexTimeAcumulator):
|
|||||||
accumulator.start = start
|
accumulator.start = start
|
||||||
if end > accumulator.end:
|
if end > accumulator.end:
|
||||||
accumulator.end = end
|
accumulator.end = end
|
||||||
accumulator.total_time = accumulator.end - accumulator.start # "=" instead of "+="
|
accumulator.lap_time = accumulator.end - accumulator.start # "=" instead of "+="
|
||||||
return result
|
return result
|
||||||
return inner
|
return inner
|
||||||
return decorator
|
return decorator
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user