diff --git a/README.md b/README.md new file mode 100644 index 00000000..a8a0487e --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +cd code + +pyenv init + +COPY AND PASTE PYENV CONFIGURATION TO YOUR SHELL CONFIG + +source ~/.zshrc (or your shell) + +pyenv install 3.11 + +pyenv global 3.11 + +Ensure that python DID change its version + +python --version + +python -m venv ./venv + +source ./venv/bin/activate + +pip install -r requirements.txt + +python main.py diff --git a/code/linear_algebra_utils.py b/code/linear_algebra_utils.py index a8739a5d..5448545b 100644 --- a/code/linear_algebra_utils.py +++ b/code/linear_algebra_utils.py @@ -2,6 +2,7 @@ import cmath import math import itertools import operator +import multiprocessing from multiprocessing import Pool from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor @@ -10,7 +11,7 @@ from time_measurement import time_measurement, time_accumulator import numpy as np import dask.array as da -class LinearAlgebraUtils(ABC): +class LinearAlgebraUtils: @staticmethod @abstractmethod def dot_product(v1, v2): @@ -62,7 +63,7 @@ class LinearAlgebraUtils(ABC): pass -class SequentialLinearAlgebraUtils(ABC): +class SequentialLinearAlgebraUtils: @staticmethod def dot_product(v1, v2): return sum(x*y for x, y in zip(v1, v2)) @@ -106,11 +107,10 @@ class SequentialLinearAlgebraUtils(ABC): return [[A[i][j] - B[i][j] for j in range(len(A[0]))] for i in range(len(A))] -class ThreadsLinearAlgebraUtils(ABC): - NUM_THREADS = 4 +class ThreadsLinearAlgebraUtils: + NUM_THREADS = multiprocessing.cpu_count() @staticmethod - @time_measurement(time_accumulator) def get_chunk_size(data): num_elements = len(data) num_threads = min(ThreadsLinearAlgebraUtils.NUM_THREADS, num_elements) @@ -118,9 +118,7 @@ class ThreadsLinearAlgebraUtils(ABC): remainder = num_elements % num_threads return chunk_size, num_threads, remainder - @staticmethod - @time_measurement(time_accumulator) def divide_vectors_to_chunks(v1, v2): chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v1) @@ -134,7 +132,6 @@ class ThreadsLinearAlgebraUtils(ABC): return chunks @staticmethod - @time_measurement(time_accumulator) def divide_vector_or_matrix_to_chunks(v): chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v) @@ -147,15 +144,6 @@ class ThreadsLinearAlgebraUtils(ABC): return chunks - - @staticmethod - @time_measurement(time_accumulator) - def dot_product(v1, v2): - chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2) - with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: - results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks) - return sum(results) - @staticmethod @time_measurement(time_accumulator) def matrix_vector_multiply(A, x): @@ -178,23 +166,6 @@ class ThreadsLinearAlgebraUtils(ABC): total_sum = sum(results) return total_sum**0.5 - @staticmethod - @time_measurement(time_accumulator) - def vector_scalar_divide(x, scalar): - chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(x) - - with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: - results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.vector_scalar_divide(chunk, scalar), chunks) - return [item for sublist in results for item in sublist] - - @staticmethod - @time_measurement(time_accumulator) - def matrix_scalar_multiply(A, w): - chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) - with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: - results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.matrix_scalar_multiply(w, chunk), chunks) - return [item for sublist in results for item in sublist] - @staticmethod @time_measurement(time_accumulator) def vector_vector_subtraction(v1, v2): @@ -221,54 +192,6 @@ class ThreadsLinearAlgebraUtils(ABC): return [item for sublist in results for item in sublist] - @staticmethod - @time_measurement(time_accumulator) - def matrix_norm(A): - chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A) - - def partial_norm(chunk): - return sum(element ** 2 for row in chunk for element in row) - - with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: - results = executor.map(partial_norm, chunks) - - total_sum = sum(results) - return math.sqrt(total_sum) - - @staticmethod - @time_measurement(time_accumulator) - def divide_matrixes_to_chunks(A, B): - num_rows = len(A) - num_threads = ThreadsLinearAlgebraUtils.NUM_THREADS - if num_threads > num_rows: - num_threads = num_rows - if num_rows == 0: - return [] - chunk_size = num_rows // num_threads - remainder = num_rows % num_threads - chunks = [] - start = 0 - for _ in range(num_threads): - end = start + chunk_size + (1 if remainder > 0 else 0) - chunks.append((A[start:end], B[start:end])) - start = end - if remainder > 0: - remainder -= 1 - return chunks - - @staticmethod - @time_measurement(time_accumulator) - def matrix_matrix_subtraction(A, B): - - def subtract_chunk(pair): - chunk_A, chunk_B = pair - return [[chunk_A[i][j] - chunk_B[i][j] for j in range(len(chunk_A[0]))] for i in range(len(chunk_A))] - - chunks = ThreadsLinearAlgebraUtils.divide_matrixes_to_chunks(A, B) - with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor: - results = executor.map(subtract_chunk, chunks) - return [row for chunk in results for row in chunk] - @time_measurement(time_accumulator) def process_row(params): diff --git a/code/requirements.txt b/code/requirements.txt index 95d5e87f..72bf1c03 100644 --- a/code/requirements.txt +++ b/code/requirements.txt @@ -1,3 +1,4 @@ scipy pytest -dask \ No newline at end of file +dask +numba \ No newline at end of file diff --git a/code/tests.py b/code/tests.py index f01f1e66..1bc1ba20 100644 --- a/code/tests.py +++ b/code/tests.py @@ -2,6 +2,7 @@ import pytest import numpy as np from matrix_generator import MatrixGenerator from richardson_method import RichardsonMethod +from threads_indep import RichardsonMethodThreads from processing_type import ProcessingType from time_measurement import time_measurement, tests_time @@ -67,12 +68,16 @@ def test_richardson_vs_cg(n: int, processing_type: ProcessingType, matrix_type: else: raise ValueError("Invalid matrix type specified. Choose 'spd', 'poli3', or 'nemeth12'.") - richardson_solver = RichardsonMethod(processing_type, A, b, lambda_min, lambda_max, max_iterations, size=A.shape[0], tol=1e-7) - solution_richardson, info_richardson = None, None - with capsys.disabled(): - solution_richardson, info_richardson = richardson_solver.solve() - + + if processing_type != ProcessingType.THREADS: + richardson_solver = RichardsonMethod(processing_type, A, b, lambda_min, lambda_max, max_iterations, size=A.shape[0], tol=1e-7) + with capsys.disabled(): + solution_richardson, info_richardson = richardson_solver.solve() + else: + with capsys.disabled(): + solution_richardson, info_richardson = RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, tol=1e-7) + # Przechwytywanie wyjścia po solve captured = capsys.readouterr() print("Captured output:", captured.out) diff --git a/code/threads_indep.py b/code/threads_indep.py new file mode 100644 index 00000000..08103025 --- /dev/null +++ b/code/threads_indep.py @@ -0,0 +1,93 @@ +import gc +import time +import numpy as np +from numba import njit, prange +from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time +import linear_algebra_utils as linAlg + + +@njit(parallel=True) +def numba_matrix_vector_multiply(A, input_x, Ax): + for i in prange(len(A)): + acc = 0.0 + for j in range(len(input_x)): + acc += A[i][j] * input_x[j] + Ax[i] = acc + +@njit(parallel=True) +def numba_vector_vector_subtraction(b, Ax, residual): + for i in prange(len(b)): + residual[i] = b[i] - Ax[i] + +@njit(nopython=True) +def numba_scalar_vector_multiply(omega, vector, result): + omega_real = omega.real + for i in range(len(vector)): + result[i] = omega_real * vector[i] + +@njit(parallel=True) +def numba_vector_vector_addition(input_x, vector, output_x): + for i in prange(len(input_x)): + output_x[i] = input_x[i] + vector[i] + + +# Funkcje z dekoratorem +@time_measurement_longest(longest_threads_time_accumulator) +def matrix_vector_multiply(A, input_x, Ax): + numba_matrix_vector_multiply(A, input_x, Ax) + +@time_measurement_longest(longest_threads_time_accumulator) +def vector_vector_subtraction(b, Ax, residual): + numba_vector_vector_subtraction(b, Ax, residual) + +@time_measurement_longest(longest_threads_time_accumulator) +def scalar_vector_multiply(omega, vector, result): + numba_scalar_vector_multiply(omega, vector, result) + +@time_measurement_longest(longest_threads_time_accumulator) +def vector_vector_addition(input_x, vector, output_x): + numba_vector_vector_addition(input_x, vector, output_x) + +# Metoda Richardson z obsługą wątków +def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): + longest_threads_time_accumulator.hard_reset() + + gc.disable() + start_time = time.perf_counter() + + x0 = x0 if x0 is not None else [0.0] * len(b) + x = x0[:] + + omega = 2 / (lambda_min + lambda_max) + n = len(b) + + for iteration in range(max_iterations): + Ax = [0.0] * n + matrix_vector_multiply(A, x, Ax) + longest_threads_time_accumulator.save_lap_and_reset() + + residual = [0.0] * n + vector_vector_subtraction(b, Ax, residual) + longest_threads_time_accumulator.save_lap_and_reset() + + change_vector = [0.0] * n + scalar_vector_multiply(omega, residual, change_vector) + longest_threads_time_accumulator.save_lap_and_reset() + + _x = [0.0] * n + vector_vector_addition(x, change_vector, _x) + longest_threads_time_accumulator.save_lap_and_reset() + + x = _x[:] + + if linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol: + break + + end_time = time.perf_counter() + gc.enable() + total_time = end_time - start_time + sequential_time = total_time - longest_threads_time_accumulator.total_time + + print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") + + return x, 0 diff --git a/code/threads_indep_old.py b/code/threads_indep_old.py new file mode 100644 index 00000000..7c5ecc5e --- /dev/null +++ b/code/threads_indep_old.py @@ -0,0 +1,103 @@ +import multiprocessing +import gc +import time +from concurrent.futures import ThreadPoolExecutor +from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time +import linear_algebra_utils as linAlg + + +@time_measurement_longest(longest_threads_time_accumulator) +def matrix_vector_multiply(A, input_x, start, end, Ax): + Ax[start:end] = [sum(x*y for x, y in zip(row, input_x)) for row in A[start:end]] + +@time_measurement_longest(longest_threads_time_accumulator) +def vector_vector_subtraction(b, Ax, start, end, residual): + residual[start:end] = [x-y for x, y in zip(b[start:end], Ax[start:end])] + +@time_measurement_longest(longest_threads_time_accumulator) +def scalar_vector_multiply(omega, vector, start, end, result): + result[start:end] = [omega * x for x in vector[start:end]] + +@time_measurement_longest(longest_threads_time_accumulator) +def vector_vector_addition(input_x, vector, start, end, output_x): + output_x[start:end] = [x+y for x, y in zip(input_x[start:end], vector[start:end])] + +def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): + longest_threads_time_accumulator.hard_reset() + + gc.disable() + start_time = time.perf_counter() + + n = len(b) + x0 = x0 if x0 is not None else [0.0] * len(b) + x = x0[:] + omega = 2 / (lambda_min + lambda_max) + num_threads = multiprocessing.cpu_count() + chunk_size = n // num_threads + + with ThreadPoolExecutor(max_workers=num_threads) as executor: # wątki są tworzone raz i nie są niszczone + for iteration in range(max_iterations): + + Ax = [0] * len(x) # tutaj zostanie przypisany wynik z mnożenia macierzy A z wektorem x + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(matrix_vector_multiply, A, x, start, end, Ax)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + residual = [0] * len(b) # tutaj zostanie przypisany wynik z vector_vector_subtraction + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(vector_vector_subtraction, b, Ax, start, end, residual)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + change_vector = [0] * len(residual) # zostanie tu przypisany wynik scalar_vector_multiply po pracy wątków + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(scalar_vector_multiply, omega, residual, start, end, change_vector)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + _x = x[:] # do _x zostanie przez wątki przypisany wynik pracy w danej iteracji + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(vector_vector_addition, x, change_vector, start, end, _x)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + x = _x[:] + + + if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol): + break + + + end_time = time.perf_counter() + gc.enable() + total_time = end_time - start_time + sequential_time = total_time - longest_threads_time_accumulator.total_time + + print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") + + return x, 0 \ No newline at end of file diff --git a/code/threads_too_simple.py b/code/threads_too_simple.py new file mode 100644 index 00000000..d5ac2713 --- /dev/null +++ b/code/threads_too_simple.py @@ -0,0 +1,60 @@ +import numpy as np +import threading +import multiprocessing +import gc +import time +import sys +from time_measurement import time_measurement_longest, longest_time_accumulator, tests_time +import linear_algebra_utils as linAlg + + +@time_measurement_longest(longest_time_accumulator) +def RichardsonThread(A, b, x, _x, omega, start, end): + for i in range(start, end): + sigma = np.dot(A[i, :], _x) - A[i, i] * _x[i] + x[i] = (1 - omega) * _x[i] + omega * (b[i] - sigma) / A[i, i] + +def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): + longest_time_accumulator.total_time = 0 + longest_time_accumulator.start = sys.float_info.max + longest_time_accumulator.end = 0 + + gc.disable() + start_time = time.perf_counter() + + n = len(b) + x0 = x0 if x0 is not None else [0.0] * len(b) + x = x0[:] + omega = 0.05#2 / (lambda_min + lambda_max) + num_threads = multiprocessing.cpu_count() + threads = [] + chunk_size = n // num_threads + max_iterations = 1000 + + for _ in range(max_iterations): + _x = x[:] + for i in range(num_threads): + start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek + end = n if i == num_threads - 1 else (i + 1) * chunk_size + thread = threading.Thread(target=RichardsonThread, args=(A, b, x, _x, omega, start, end)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + # Ax = linAlg.SequentialLinearAlgebraUtils.matrix_vector_multiply(A, x) + # residual = linAlg.SequentialLinearAlgebraUtils.vector_vector_subtraction(b, Ax) + # if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol): + # break + + end_time = time.perf_counter() + gc.enable() + total_time = end_time - start_time + sequential_time = total_time - longest_time_accumulator.total_time + + print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") + + return x, 0 + + diff --git a/code/time_measurement.py b/code/time_measurement.py index 03ca4c0f..d70992a2 100644 --- a/code/time_measurement.py +++ b/code/time_measurement.py @@ -1,14 +1,35 @@ import time +import sys from functools import wraps class TimeAccumulator: def __init__(self): self.total_time = 0 +class ComplexTimeAcumulator: + def __init__(self): + self.hard_reset() + + def hard_reset(self): + self.total_time = 0 + self.reset() + + def reset(self): + self.lap_time = 0 + self.start = sys.float_info.max + self.end = 0 + + def save_lap_and_reset(self): + self.total_time += self.lap_time + self.reset() + + time_accumulator = TimeAccumulator() tests_time = TimeAccumulator() -def time_measurement(accumulator): +longest_threads_time_accumulator = ComplexTimeAcumulator() + +def time_measurement(accumulator: TimeAccumulator): def decorator(func): @wraps(func) def inner(*args, **kwargs): @@ -20,6 +41,22 @@ def time_measurement(accumulator): return inner return decorator +def time_measurement_longest(accumulator: ComplexTimeAcumulator): + def decorator(func): + @wraps(func) + def inner(*args, **kwargs): + start = time.perf_counter() + result = func(*args, **kwargs) + end = time.perf_counter() + if start < accumulator.start: + accumulator.start = start + if end > accumulator.end: + accumulator.end = end + accumulator.lap_time = accumulator.end - accumulator.start # "=" instead of "+=" + return result + return inner + return decorator +