diff --git a/code/linear_algebra_utils.py b/code/linear_algebra_utils.py index 4e6de0ab..1f56d806 100644 --- a/code/linear_algebra_utils.py +++ b/code/linear_algebra_utils.py @@ -154,15 +154,6 @@ class ThreadsLinearAlgebraUtils: results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks) return sum(results) - # @staticmethod - # @time_measurement(time_accumulator) - # def matrix_vector_multiply(A, x): - # chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) - # with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: - # func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x) - # results = executor.map(func, chunks) - # return [item for sublist in results for item in sublist] - @staticmethod @time_measurement(time_accumulator) def matrix_vector_multiply(A, x): diff --git a/code/tests.py b/code/tests.py index 93c7990c..1bc1ba20 100644 --- a/code/tests.py +++ b/code/tests.py @@ -2,7 +2,7 @@ import pytest import numpy as np from matrix_generator import MatrixGenerator from richardson_method import RichardsonMethod -from threads import RichardsonMethodThreads +from threads_indep import RichardsonMethodThreads from processing_type import ProcessingType from time_measurement import time_measurement, tests_time @@ -41,10 +41,10 @@ def solution_lib(A, b): 10000 ]) @pytest.mark.parametrize("processing_type", [ - # ProcessingType.SEQUENTIAL, - ProcessingType.THREADS#, - # ProcessingType.PROCESSES, - # ProcessingType.DISTRIBUTED_ARRAYS + ProcessingType.SEQUENTIAL, + ProcessingType.THREADS, + ProcessingType.PROCESSES, + ProcessingType.DISTRIBUTED_ARRAYS ]) @pytest.mark.parametrize("matrix_type", [ "spd", diff --git a/code/threads_indep.py b/code/threads_indep.py new file mode 100644 index 00000000..7c5ecc5e --- /dev/null +++ b/code/threads_indep.py @@ -0,0 +1,103 @@ +import multiprocessing +import gc +import time +from concurrent.futures import ThreadPoolExecutor +from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time +import linear_algebra_utils as linAlg + + +@time_measurement_longest(longest_threads_time_accumulator) +def matrix_vector_multiply(A, input_x, start, end, Ax): + Ax[start:end] = [sum(x*y for x, y in zip(row, input_x)) for row in A[start:end]] + +@time_measurement_longest(longest_threads_time_accumulator) +def vector_vector_subtraction(b, Ax, start, end, residual): + residual[start:end] = [x-y for x, y in zip(b[start:end], Ax[start:end])] + +@time_measurement_longest(longest_threads_time_accumulator) +def scalar_vector_multiply(omega, vector, start, end, result): + result[start:end] = [omega * x for x in vector[start:end]] + +@time_measurement_longest(longest_threads_time_accumulator) +def vector_vector_addition(input_x, vector, start, end, output_x): + output_x[start:end] = [x+y for x, y in zip(input_x[start:end], vector[start:end])] + +def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): + longest_threads_time_accumulator.hard_reset() + + gc.disable() + start_time = time.perf_counter() + + n = len(b) + x0 = x0 if x0 is not None else [0.0] * len(b) + x = x0[:] + omega = 2 / (lambda_min + lambda_max) + num_threads = multiprocessing.cpu_count() + chunk_size = n // num_threads + + with ThreadPoolExecutor(max_workers=num_threads) as executor: # wątki są tworzone raz i nie są niszczone + for iteration in range(max_iterations): + + Ax = [0] * len(x) # tutaj zostanie przypisany wynik z mnożenia macierzy A z wektorem x + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(matrix_vector_multiply, A, x, start, end, Ax)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + residual = [0] * len(b) # tutaj zostanie przypisany wynik z vector_vector_subtraction + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(vector_vector_subtraction, b, Ax, start, end, residual)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + change_vector = [0] * len(residual) # zostanie tu przypisany wynik scalar_vector_multiply po pracy wątków + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(scalar_vector_multiply, omega, residual, start, end, change_vector)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + _x = x[:] # do _x zostanie przez wątki przypisany wynik pracy w danej iteracji + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(vector_vector_addition, x, change_vector, start, end, _x)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + x = _x[:] + + + if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol): + break + + + end_time = time.perf_counter() + gc.enable() + total_time = end_time - start_time + sequential_time = total_time - longest_threads_time_accumulator.total_time + + print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") + + return x, 0 \ No newline at end of file diff --git a/code/threads_nowy.py b/code/threads_nowy.py deleted file mode 100644 index 7acfc6c3..00000000 --- a/code/threads_nowy.py +++ /dev/null @@ -1,119 +0,0 @@ -import numpy as np -import threading -import multiprocessing -import gc -import time -import sys -from time_measurement import time_measurement_longest, longest_time_accumulator, tests_time -import linear_algebra_utils as linAlg - - -@time_measurement_longest(longest_time_accumulator) -def RichardsonThread(A, b, x, _x, omega, start, end): - for i in range(start, end): - sigma = np.dot(A[i, :], x) - A[i, i] * x[i] - x[i] = (1 - omega) * x[i] + omega * (b[i] - sigma) / A[i, i] - -def matrix_vector_multiply(A, x, start, end, Ax): - Ax[start:end] = [sum(xx*yy for xx, yy in zip(row, x)) for row in A[start:end]] - -def vector_vector_subtraction(b, Ax, start, end, residual): - residual[start:end] = [xx-yy for xx, yy in zip(b[start:end], Ax[start:end])] - -def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): - longest_time_accumulator.total_time = 0 - longest_time_accumulator.start = sys.float_info.max - longest_time_accumulator.end = 0 - - gc.disable() - start_time = time.perf_counter() - - n = len(b) - x0 = x0 if x0 is not None else [0.0] * len(b) - x = x0[:] - omega = 2 / (lambda_min + lambda_max) - num_threads = multiprocessing.cpu_count() - threads = [] - chunk_size = n // num_threads - - for iteration in range(max_iterations): - # chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) - - # with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: - # func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x) - # results = executor.map(func, chunks) - Ax = [0] * len(x) - for i in range(num_threads): - start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek - end = n if i == num_threads - 1 else (i + 1) * chunk_size - thread = threading.Thread(target=matrix_vector_multiply, args=(A, x, start, end, Ax)) - threads.append(thread) - thread.start() - - for thread in threads: - thread.join() - - - residual = [0] * len(b) - - for i in range(num_threads): - start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek - end = n if i == num_threads - 1 else (i + 1) * chunk_size - thread = threading.Thread(target=vector_vector_subtraction, args=(b, Ax, start, end, residual)) - threads.append(thread) - thread.start() - - for thread in threads: - thread.join() - - - x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual)) - - for i in range(num_threads): - start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek - end = n if i == num_threads - 1 else (i + 1) * chunk_size - thread = threading.Thread(target=scalar_vector_multiply, args=(A, b, x, omega, start, end)) - threads.append(thread) - thread.start() - - for thread in threads: - thread.join() - - - x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual)) - - for i in range(num_threads): - start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek - end = n if i == num_threads - 1 else (i + 1) * chunk_size - thread = threading.Thread(target=vector_vector_addition, args=(A, b, x, omega, start, end)) - threads.append(thread) - thread.start() - - for thread in threads: - thread.join() - - if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < self.tol): - break - - end_time = time.perf_counter() - gc.enable() - total_time = end_time - start_time - sequential_time = total_time - longest_time_accumulator.total_time - - print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") - - return x, 0 - - - - -# # Przykładowe dane wejściowe -# np.random.seed(0) # Ustalanie ziarna dla powtarzalności wyników -# A = np.random.rand(20, 20) + 20 * np.eye(20) # Macierz przekątniowa z losowymi elementami -# b = np.random.rand(20) # Wektor wyrazów wolnych -# omega = 0.2 -# n_iterations = 1000 - -# # Rozwiązanie układu równań metodą Richardson'a -# x = RichardsonMethodThreads(A, b, 5, 5, n_iterations) -# print("Rozwiązanie: ", x) diff --git a/code/threads.py b/code/threads_too_simple.py similarity index 83% rename from code/threads.py rename to code/threads_too_simple.py index 4f43f0bb..d5ac2713 100644 --- a/code/threads.py +++ b/code/threads_too_simple.py @@ -58,15 +58,3 @@ def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=Non return x, 0 - - -# # Przykładowe dane wejściowe -# np.random.seed(0) # Ustalanie ziarna dla powtarzalności wyników -# A = np.random.rand(20, 20) + 20 * np.eye(20) # Macierz przekątniowa z losowymi elementami -# b = np.random.rand(20) # Wektor wyrazów wolnych -# omega = 0.2 -# n_iterations = 1000 - -# # Rozwiązanie układu równań metodą Richardson'a -# x = RichardsonMethodThreads(A, b, 5, 5, n_iterations) -# print("Rozwiązanie: ", x) diff --git a/code/time_measurement.py b/code/time_measurement.py index c6be2a49..d70992a2 100644 --- a/code/time_measurement.py +++ b/code/time_measurement.py @@ -8,16 +8,28 @@ class TimeAccumulator: class ComplexTimeAcumulator: def __init__(self): + self.hard_reset() + + def hard_reset(self): self.total_time = 0 + self.reset() + + def reset(self): + self.lap_time = 0 self.start = sys.float_info.max self.end = 0 + + def save_lap_and_reset(self): + self.total_time += self.lap_time + self.reset() + time_accumulator = TimeAccumulator() tests_time = TimeAccumulator() -longest_time_accumulator = ComplexTimeAcumulator() +longest_threads_time_accumulator = ComplexTimeAcumulator() -def time_measurement(accumulator): +def time_measurement(accumulator: TimeAccumulator): def decorator(func): @wraps(func) def inner(*args, **kwargs): @@ -40,7 +52,7 @@ def time_measurement_longest(accumulator: ComplexTimeAcumulator): accumulator.start = start if end > accumulator.end: accumulator.end = end - accumulator.total_time = accumulator.end - accumulator.start # "=" instead of "+=" + accumulator.lap_time = accumulator.end - accumulator.start # "=" instead of "+=" return result return inner return decorator