From fd94f597a550be4cf0178317d3cb9511e823aae2 Mon Sep 17 00:00:00 2001 From: Gromiusz Date: Sat, 28 Dec 2024 13:24:58 +0100 Subject: [PATCH] feat: trying and testing different ways to implement threads --- code/linear_algebra_utils.py | 21 ++++--- code/tests.py | 23 ++++--- code/threads.py | 72 +++++++++++++++++++++ code/threads_nowy.py | 119 +++++++++++++++++++++++++++++++++++ code/time_measurement.py | 25 ++++++++ 5 files changed, 244 insertions(+), 16 deletions(-) create mode 100644 code/threads.py create mode 100644 code/threads_nowy.py diff --git a/code/linear_algebra_utils.py b/code/linear_algebra_utils.py index a8739a5d..4e6de0ab 100644 --- a/code/linear_algebra_utils.py +++ b/code/linear_algebra_utils.py @@ -2,6 +2,7 @@ import cmath import math import itertools import operator +import multiprocessing from multiprocessing import Pool from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor @@ -10,7 +11,7 @@ from time_measurement import time_measurement, time_accumulator import numpy as np import dask.array as da -class LinearAlgebraUtils(ABC): +class LinearAlgebraUtils: @staticmethod @abstractmethod def dot_product(v1, v2): @@ -62,7 +63,7 @@ class LinearAlgebraUtils(ABC): pass -class SequentialLinearAlgebraUtils(ABC): +class SequentialLinearAlgebraUtils: @staticmethod def dot_product(v1, v2): return sum(x*y for x, y in zip(v1, v2)) @@ -106,11 +107,10 @@ class SequentialLinearAlgebraUtils(ABC): return [[A[i][j] - B[i][j] for j in range(len(A[0]))] for i in range(len(A))] -class ThreadsLinearAlgebraUtils(ABC): - NUM_THREADS = 4 +class ThreadsLinearAlgebraUtils: + NUM_THREADS = multiprocessing.cpu_count() @staticmethod - @time_measurement(time_accumulator) def get_chunk_size(data): num_elements = len(data) num_threads = min(ThreadsLinearAlgebraUtils.NUM_THREADS, num_elements) @@ -120,7 +120,6 @@ class ThreadsLinearAlgebraUtils(ABC): @staticmethod - @time_measurement(time_accumulator) def divide_vectors_to_chunks(v1, v2): chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v1) @@ -134,7 +133,6 @@ class ThreadsLinearAlgebraUtils(ABC): return chunks @staticmethod - @time_measurement(time_accumulator) def divide_vector_or_matrix_to_chunks(v): chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v) @@ -156,6 +154,15 @@ class ThreadsLinearAlgebraUtils(ABC): results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks) return sum(results) + # @staticmethod + # @time_measurement(time_accumulator) + # def matrix_vector_multiply(A, x): + # chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) + # with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + # func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x) + # results = executor.map(func, chunks) + # return [item for sublist in results for item in sublist] + @staticmethod @time_measurement(time_accumulator) def matrix_vector_multiply(A, x): diff --git a/code/tests.py b/code/tests.py index f01f1e66..93c7990c 100644 --- a/code/tests.py +++ b/code/tests.py @@ -2,6 +2,7 @@ import pytest import numpy as np from matrix_generator import MatrixGenerator from richardson_method import RichardsonMethod +from threads import RichardsonMethodThreads from processing_type import ProcessingType from time_measurement import time_measurement, tests_time @@ -40,10 +41,10 @@ def solution_lib(A, b): 10000 ]) @pytest.mark.parametrize("processing_type", [ - ProcessingType.SEQUENTIAL, - ProcessingType.THREADS, - ProcessingType.PROCESSES, - ProcessingType.DISTRIBUTED_ARRAYS + # ProcessingType.SEQUENTIAL, + ProcessingType.THREADS#, + # ProcessingType.PROCESSES, + # ProcessingType.DISTRIBUTED_ARRAYS ]) @pytest.mark.parametrize("matrix_type", [ "spd", @@ -67,12 +68,16 @@ def test_richardson_vs_cg(n: int, processing_type: ProcessingType, matrix_type: else: raise ValueError("Invalid matrix type specified. Choose 'spd', 'poli3', or 'nemeth12'.") - richardson_solver = RichardsonMethod(processing_type, A, b, lambda_min, lambda_max, max_iterations, size=A.shape[0], tol=1e-7) - solution_richardson, info_richardson = None, None - with capsys.disabled(): - solution_richardson, info_richardson = richardson_solver.solve() - + + if processing_type != ProcessingType.THREADS: + richardson_solver = RichardsonMethod(processing_type, A, b, lambda_min, lambda_max, max_iterations, size=A.shape[0], tol=1e-7) + with capsys.disabled(): + solution_richardson, info_richardson = richardson_solver.solve() + else: + with capsys.disabled(): + solution_richardson, info_richardson = RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, tol=1e-7) + # Przechwytywanie wyjścia po solve captured = capsys.readouterr() print("Captured output:", captured.out) diff --git a/code/threads.py b/code/threads.py new file mode 100644 index 00000000..4f43f0bb --- /dev/null +++ b/code/threads.py @@ -0,0 +1,72 @@ +import numpy as np +import threading +import multiprocessing +import gc +import time +import sys +from time_measurement import time_measurement_longest, longest_time_accumulator, tests_time +import linear_algebra_utils as linAlg + + +@time_measurement_longest(longest_time_accumulator) +def RichardsonThread(A, b, x, _x, omega, start, end): + for i in range(start, end): + sigma = np.dot(A[i, :], _x) - A[i, i] * _x[i] + x[i] = (1 - omega) * _x[i] + omega * (b[i] - sigma) / A[i, i] + +def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): + longest_time_accumulator.total_time = 0 + longest_time_accumulator.start = sys.float_info.max + longest_time_accumulator.end = 0 + + gc.disable() + start_time = time.perf_counter() + + n = len(b) + x0 = x0 if x0 is not None else [0.0] * len(b) + x = x0[:] + omega = 0.05#2 / (lambda_min + lambda_max) + num_threads = multiprocessing.cpu_count() + threads = [] + chunk_size = n // num_threads + max_iterations = 1000 + + for _ in range(max_iterations): + _x = x[:] + for i in range(num_threads): + start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek + end = n if i == num_threads - 1 else (i + 1) * chunk_size + thread = threading.Thread(target=RichardsonThread, args=(A, b, x, _x, omega, start, end)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + # Ax = linAlg.SequentialLinearAlgebraUtils.matrix_vector_multiply(A, x) + # residual = linAlg.SequentialLinearAlgebraUtils.vector_vector_subtraction(b, Ax) + # if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol): + # break + + end_time = time.perf_counter() + gc.enable() + total_time = end_time - start_time + sequential_time = total_time - longest_time_accumulator.total_time + + print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") + + return x, 0 + + + + +# # Przykładowe dane wejściowe +# np.random.seed(0) # Ustalanie ziarna dla powtarzalności wyników +# A = np.random.rand(20, 20) + 20 * np.eye(20) # Macierz przekątniowa z losowymi elementami +# b = np.random.rand(20) # Wektor wyrazów wolnych +# omega = 0.2 +# n_iterations = 1000 + +# # Rozwiązanie układu równań metodą Richardson'a +# x = RichardsonMethodThreads(A, b, 5, 5, n_iterations) +# print("Rozwiązanie: ", x) diff --git a/code/threads_nowy.py b/code/threads_nowy.py new file mode 100644 index 00000000..7acfc6c3 --- /dev/null +++ b/code/threads_nowy.py @@ -0,0 +1,119 @@ +import numpy as np +import threading +import multiprocessing +import gc +import time +import sys +from time_measurement import time_measurement_longest, longest_time_accumulator, tests_time +import linear_algebra_utils as linAlg + + +@time_measurement_longest(longest_time_accumulator) +def RichardsonThread(A, b, x, _x, omega, start, end): + for i in range(start, end): + sigma = np.dot(A[i, :], x) - A[i, i] * x[i] + x[i] = (1 - omega) * x[i] + omega * (b[i] - sigma) / A[i, i] + +def matrix_vector_multiply(A, x, start, end, Ax): + Ax[start:end] = [sum(xx*yy for xx, yy in zip(row, x)) for row in A[start:end]] + +def vector_vector_subtraction(b, Ax, start, end, residual): + residual[start:end] = [xx-yy for xx, yy in zip(b[start:end], Ax[start:end])] + +def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): + longest_time_accumulator.total_time = 0 + longest_time_accumulator.start = sys.float_info.max + longest_time_accumulator.end = 0 + + gc.disable() + start_time = time.perf_counter() + + n = len(b) + x0 = x0 if x0 is not None else [0.0] * len(b) + x = x0[:] + omega = 2 / (lambda_min + lambda_max) + num_threads = multiprocessing.cpu_count() + threads = [] + chunk_size = n // num_threads + + for iteration in range(max_iterations): + # chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) + + # with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + # func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x) + # results = executor.map(func, chunks) + Ax = [0] * len(x) + for i in range(num_threads): + start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek + end = n if i == num_threads - 1 else (i + 1) * chunk_size + thread = threading.Thread(target=matrix_vector_multiply, args=(A, x, start, end, Ax)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + + residual = [0] * len(b) + + for i in range(num_threads): + start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek + end = n if i == num_threads - 1 else (i + 1) * chunk_size + thread = threading.Thread(target=vector_vector_subtraction, args=(b, Ax, start, end, residual)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + + x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual)) + + for i in range(num_threads): + start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek + end = n if i == num_threads - 1 else (i + 1) * chunk_size + thread = threading.Thread(target=scalar_vector_multiply, args=(A, b, x, omega, start, end)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + + x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual)) + + for i in range(num_threads): + start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek + end = n if i == num_threads - 1 else (i + 1) * chunk_size + thread = threading.Thread(target=vector_vector_addition, args=(A, b, x, omega, start, end)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < self.tol): + break + + end_time = time.perf_counter() + gc.enable() + total_time = end_time - start_time + sequential_time = total_time - longest_time_accumulator.total_time + + print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") + + return x, 0 + + + + +# # Przykładowe dane wejściowe +# np.random.seed(0) # Ustalanie ziarna dla powtarzalności wyników +# A = np.random.rand(20, 20) + 20 * np.eye(20) # Macierz przekątniowa z losowymi elementami +# b = np.random.rand(20) # Wektor wyrazów wolnych +# omega = 0.2 +# n_iterations = 1000 + +# # Rozwiązanie układu równań metodą Richardson'a +# x = RichardsonMethodThreads(A, b, 5, 5, n_iterations) +# print("Rozwiązanie: ", x) diff --git a/code/time_measurement.py b/code/time_measurement.py index 03ca4c0f..c6be2a49 100644 --- a/code/time_measurement.py +++ b/code/time_measurement.py @@ -1,13 +1,22 @@ import time +import sys from functools import wraps class TimeAccumulator: def __init__(self): self.total_time = 0 +class ComplexTimeAcumulator: + def __init__(self): + self.total_time = 0 + self.start = sys.float_info.max + self.end = 0 + time_accumulator = TimeAccumulator() tests_time = TimeAccumulator() +longest_time_accumulator = ComplexTimeAcumulator() + def time_measurement(accumulator): def decorator(func): @wraps(func) @@ -20,6 +29,22 @@ def time_measurement(accumulator): return inner return decorator +def time_measurement_longest(accumulator: ComplexTimeAcumulator): + def decorator(func): + @wraps(func) + def inner(*args, **kwargs): + start = time.perf_counter() + result = func(*args, **kwargs) + end = time.perf_counter() + if start < accumulator.start: + accumulator.start = start + if end > accumulator.end: + accumulator.end = end + accumulator.total_time = accumulator.end - accumulator.start # "=" instead of "+=" + return result + return inner + return decorator +