From c938e63b7feecfc2a5a15d966477edbb3bd75dcf Mon Sep 17 00:00:00 2001 From: Gromiusz <104679774+Gromiusz@users.noreply.github.com> Date: Sun, 27 Oct 2024 20:13:43 +0100 Subject: [PATCH] Complete all threads functions in Linear Algebra Utils, add new parameters to tests (#5) * complete threads function (tests FAIL) Test fail due to indivisibility of the problem into threads. Test is turned to ProcessingType.THREADS!. On ProcessingType.SEQUENTIAL all test PASS. TODO: make sure that len(problem) is divisible by num of threads. It can be less threads for small problem. * fixing bugs in linear_algebra_utils, properly adress chunks to threads * the refactor of the ThreadsLinearAlgebraUtils * set test to execute both sequential and threads processing types * Delete unnecessary lines tests.py * rename scalar_matrix_multiply to scalar_vector_multiply --- code/linear_algebra_utils.py | 155 +++++++++++++++++++++++++++++++---- code/richardson_method.py | 2 +- code/tests.py | 11 +-- 3 files changed, 145 insertions(+), 23 deletions(-) diff --git a/code/linear_algebra_utils.py b/code/linear_algebra_utils.py index a159f42f..876533c3 100644 --- a/code/linear_algebra_utils.py +++ b/code/linear_algebra_utils.py @@ -1,5 +1,7 @@ import math from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from functools import partial class LinearAlgebraUtils(ABC): @staticmethod @@ -39,7 +41,7 @@ class LinearAlgebraUtils(ABC): @staticmethod @abstractmethod - def scalar_matrix_multiply(omega, vector): + def scalar_vector_multiply(omega, vector): pass @staticmethod @@ -88,7 +90,7 @@ class SequentialLinearAlgebraUtils(ABC): return [x+y for x, y in zip(v1, v2)] @staticmethod - def scalar_matrix_multiply(omega, vector): + def scalar_vector_multiply(omega, vector): # na pewno scalar matrix? a nie scalar vector? return [omega * x for x in vector] @@ -130,46 +132,169 @@ class SequentialLinearAlgebraUtils(ABC): class ThreadsLinearAlgebraUtils(ABC): + NUM_THREADS = 4 + + @staticmethod + def get_chunk_size(data): + num_elements = len(data) + num_threads = min(ThreadsLinearAlgebraUtils.NUM_THREADS, num_elements) + chunk_size = num_elements // num_threads + remainder = num_elements % num_threads + return chunk_size, num_threads, remainder + + + @staticmethod + def divide_vectors_to_chunks(v1, v2): + chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v1) + + chunks = [] + start = 0 + for i in range(num_threads): + end = start + chunk_size + (1 if i < remainder else 0) + chunks.append((v1[start:end], v2[start:end])) + start = end + + return chunks + + @staticmethod + def divide_vector_or_matrix_to_chunks(v): + chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v) + + chunks = [] + start = 0 + for i in range(num_threads): + end = start + chunk_size + (1 if i < remainder else 0) + chunks.append(v[start:end]) + start = end + + return chunks + + @staticmethod def dot_product(v1, v2): - pass + chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2) + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks) + return sum(results) @staticmethod def matrix_vector_multiply(A, x): - pass + chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x) + results = executor.map(func, chunks) + return [item for sublist in results for item in sublist] @staticmethod def vector_norm(v): - pass + chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(v) + + def partial_norm(chunk): + return sum(x * x for x in chunk) + + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(partial_norm, chunks) + total_sum = sum(results) + return total_sum**0.5 @staticmethod def vector_scalar_divide(x, scalar): - pass + chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(x) + + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.vector_scalar_divide(chunk, scalar), chunks) + return [item for sublist in results for item in sublist] @staticmethod def matrix_scalar_multiply(A, w): - pass + chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.matrix_scalar_multiply(w, chunk), chunks) + return [item for sublist in results for item in sublist] @staticmethod def vector_vector_subtraction(v1, v2): - pass + chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2) + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(lambda pair: SequentialLinearAlgebraUtils.vector_vector_subtraction(*pair), chunks) + return [item for sublist in results for item in sublist] + @staticmethod def vector_vector_addition(v1, v2): - pass - + chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2) + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(lambda pair: SequentialLinearAlgebraUtils.vector_vector_addition(*pair), chunks) + return [item for sublist in results for item in sublist] + @staticmethod - def scalar_matrix_multiply(omega, vector): - pass + def scalar_vector_multiply(omega, vector): + chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(vector) + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.scalar_vector_multiply(omega, chunk), chunks) + + return [item for sublist in results for item in sublist] @staticmethod def matrix_norm(A): - pass + chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) + + def partial_norm(chunk): + return sum(element ** 2 for row in chunk for element in row) + + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(partial_norm, chunks) + + total_sum = sum(results) + return math.sqrt(total_sum) + + @staticmethod + def divide_matrixes_to_chunks(A, B): + chunk_size = len(A) // ThreadsLinearAlgebraUtils.NUM_THREADS + return [(A[i:i + chunk_size], B[i:i + chunk_size]) for i in range(0, len(A), chunk_size)] @staticmethod def matrix_matrix_subtraction(A, B): - pass + + def subtract_chunk(pair): + chunk_A, chunk_B = pair + return [[chunk_A[i][j] - chunk_B[i][j] for j in range(len(chunk_A[0]))] for i in range(len(chunk_A))] + + chunks = ThreadsLinearAlgebraUtils.divide_matrixes_to_chunks(A, B) + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + results = executor.map(subtract_chunk, chunks) + return [row for chunk in results for row in chunk] @staticmethod def gaussian_elimination(A, b): - pass + n = len(A) + M = [row[:] for row in A] + + for i in range(n): + M[i].append(b[i]) + + for k in range(n): + # Pivoting + if M[k][k] == 0: + for i in range(k + 1, n): + if M[i][k] != 0: + M[k], M[i] = M[i], M[k] + break + + # Threads + def eliminate_row(i): + factor = M[i][k] / M[k][k] + for j in range(k, n + 1): + M[i][j] -= factor * M[k][j] + + with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: + rows_to_eliminate = range(k + 1, n) + executor.map(eliminate_row, rows_to_eliminate) + + x = [0] * n + for i in range(n - 1, -1, -1): + x[i] = M[i][-1] / M[i][i] + for k in range(i - 1, -1, -1): + M[k][-1] -= M[k][i] * x[i] + + return x \ No newline at end of file diff --git a/code/richardson_method.py b/code/richardson_method.py index 8fb02275..da9ab4d1 100644 --- a/code/richardson_method.py +++ b/code/richardson_method.py @@ -54,6 +54,6 @@ class RichardsonMethod: for iteration in range(self.max_iterations): Ax = self.LinAlg.matrix_vector_multiply(self.A, x) residual = self.LinAlg.vector_vector_subtraction(self.b, Ax) - x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_matrix_multiply(self.omega, residual)) + x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual)) return x, 0 diff --git a/code/tests.py b/code/tests.py index 675916c0..87e30c88 100644 --- a/code/tests.py +++ b/code/tests.py @@ -30,15 +30,14 @@ def calcualte_norm_from_matrix_numpy(A, n, max_iterations): I = np.eye(n) return calculate_norm_numpy(I, omega, A) - - @pytest.mark.parametrize("n", [2, 3, 4, 5, 10, 20, 50, 100]) -def test_richardson_vs_cg(n: int): +@pytest.mark.parametrize("processing_type", [ProcessingType.SEQUENTIAL, ProcessingType.THREADS]) +def test_richardson_vs_cg(n: int, processing_type: ProcessingType): print("matrix size: ", n) tolerance = 1e-5 max_iterations=1000 A, b = MatrixGenerator.generate_random_matrix_and_vector(n) - richardson_solver = RichardsonMethod(ProcessingType.SEQUENTIAL , A, b, max_iterations, size=n, tol=1e-7) + richardson_solver = RichardsonMethod(processing_type, A, b, max_iterations, size=n, tol=1e-7) solution_richardson, info_richardson = richardson_solver.solve() solution_cg, info = cg(A, b) @@ -76,6 +75,4 @@ def assert_scipy_not_converged(solution_richardson, info_richardson, A, b): assert False, "Richardson converged while SciPy did not" if __name__ == "__main__": - # Run pytest and exit with the appropriate status code - for n in [2, 3, 4, 5, 10, 20, 50, 100]: - test_richardson_vs_cg(n) \ No newline at end of file + pytest.main()