diff --git a/code/threads_indep.py b/code/threads_indep.py index 7c5ecc5e..deae5381 100644 --- a/code/threads_indep.py +++ b/code/threads_indep.py @@ -1,103 +1,91 @@ -import multiprocessing import gc import time -from concurrent.futures import ThreadPoolExecutor +import numpy as np +from numba import njit, prange from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time import linear_algebra_utils as linAlg +# Funkcje równoległe z Numba +@njit(parallel=True) +def numba_matrix_vector_multiply(A, input_x, Ax): + n, m = A.shape + for i in prange(n): + Ax[i] = np.dot(A[i], input_x) + +@njit(parallel=True) +def numba_vector_vector_subtraction(b, Ax, residual): + for i in prange(len(b)): + residual[i] = b[i] - Ax[i] + +@njit(parallel=True) +def numba_scalar_vector_multiply(omega, vector, result): + for i in prange(len(vector)): + result[i] = omega * vector[i] + +@njit(parallel=True) +def numba_vector_vector_addition(input_x, vector, output_x): + for i in prange(len(input_x)): + output_x[i] = input_x[i] + vector[i] + +# Funkcje z dekoratorem +@time_measurement_longest(longest_threads_time_accumulator) +def matrix_vector_multiply(A, input_x, Ax): + numba_matrix_vector_multiply(A, input_x, Ax) @time_measurement_longest(longest_threads_time_accumulator) -def matrix_vector_multiply(A, input_x, start, end, Ax): - Ax[start:end] = [sum(x*y for x, y in zip(row, input_x)) for row in A[start:end]] +def vector_vector_subtraction(b, Ax, residual): + numba_vector_vector_subtraction(b, Ax, residual) @time_measurement_longest(longest_threads_time_accumulator) -def vector_vector_subtraction(b, Ax, start, end, residual): - residual[start:end] = [x-y for x, y in zip(b[start:end], Ax[start:end])] +def scalar_vector_multiply(omega, vector, result): + numba_scalar_vector_multiply(omega, vector, result) @time_measurement_longest(longest_threads_time_accumulator) -def scalar_vector_multiply(omega, vector, start, end, result): - result[start:end] = [omega * x for x in vector[start:end]] - -@time_measurement_longest(longest_threads_time_accumulator) -def vector_vector_addition(input_x, vector, start, end, output_x): - output_x[start:end] = [x+y for x, y in zip(input_x[start:end], vector[start:end])] +def vector_vector_addition(input_x, vector, output_x): + numba_vector_vector_addition(input_x, vector, output_x) +# Metoda Richardson z obsługą wątków def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): longest_threads_time_accumulator.hard_reset() gc.disable() start_time = time.perf_counter() - n = len(b) - x0 = x0 if x0 is not None else [0.0] * len(b) - x = x0[:] + A = np.array(A) + b = np.array(b) + x0 = np.array(x0) if x0 is not None else np.zeros_like(b) + x = x0.copy() + omega = 2 / (lambda_min + lambda_max) - num_threads = multiprocessing.cpu_count() - chunk_size = n // num_threads + n = len(b) - with ThreadPoolExecutor(max_workers=num_threads) as executor: # wątki są tworzone raz i nie są niszczone - for iteration in range(max_iterations): + for iteration in range(max_iterations): + Ax = np.zeros_like(x) + matrix_vector_multiply(A, x, Ax) + longest_threads_time_accumulator.save_lap_and_reset() - Ax = [0] * len(x) # tutaj zostanie przypisany wynik z mnożenia macierzy A z wektorem x - futures = [] + residual = np.zeros_like(b) + vector_vector_subtraction(b, Ax, residual) + longest_threads_time_accumulator.save_lap_and_reset() - for i in range(num_threads): - start = i * chunk_size - end = n if i == num_threads - 1 else (i + 1) * chunk_size - futures.append(executor.submit(matrix_vector_multiply, A, x, start, end, Ax)) - - for future in futures: - future.result() + change_vector = np.zeros_like(residual) + scalar_vector_multiply(omega, residual, change_vector) + longest_threads_time_accumulator.save_lap_and_reset() - longest_threads_time_accumulator.save_lap_and_reset() - residual = [0] * len(b) # tutaj zostanie przypisany wynik z vector_vector_subtraction - futures = [] + _x = np.zeros_like(x) + vector_vector_addition(x, change_vector, _x) + longest_threads_time_accumulator.save_lap_and_reset() - for i in range(num_threads): - start = i * chunk_size - end = n if i == num_threads - 1 else (i + 1) * chunk_size - futures.append(executor.submit(vector_vector_subtraction, b, Ax, start, end, residual)) - - for future in futures: - future.result() + x = _x.copy() - longest_threads_time_accumulator.save_lap_and_reset() - change_vector = [0] * len(residual) # zostanie tu przypisany wynik scalar_vector_multiply po pracy wątków - futures = [] - - for i in range(num_threads): - start = i * chunk_size - end = n if i == num_threads - 1 else (i + 1) * chunk_size - futures.append(executor.submit(scalar_vector_multiply, omega, residual, start, end, change_vector)) - - for future in futures: - future.result() - - longest_threads_time_accumulator.save_lap_and_reset() - _x = x[:] # do _x zostanie przez wątki przypisany wynik pracy w danej iteracji - futures = [] - - for i in range(num_threads): - start = i * chunk_size - end = n if i == num_threads - 1 else (i + 1) * chunk_size - futures.append(executor.submit(vector_vector_addition, x, change_vector, start, end, _x)) - - for future in futures: - future.result() - - longest_threads_time_accumulator.save_lap_and_reset() - x = _x[:] - - - if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol): - break - + if linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol: + break end_time = time.perf_counter() gc.enable() total_time = end_time - start_time sequential_time = total_time - longest_threads_time_accumulator.total_time - + print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") - - return x, 0 \ No newline at end of file + + return x, 0 diff --git a/code/threads_indep_old.py b/code/threads_indep_old.py new file mode 100644 index 00000000..7c5ecc5e --- /dev/null +++ b/code/threads_indep_old.py @@ -0,0 +1,103 @@ +import multiprocessing +import gc +import time +from concurrent.futures import ThreadPoolExecutor +from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time +import linear_algebra_utils as linAlg + + +@time_measurement_longest(longest_threads_time_accumulator) +def matrix_vector_multiply(A, input_x, start, end, Ax): + Ax[start:end] = [sum(x*y for x, y in zip(row, input_x)) for row in A[start:end]] + +@time_measurement_longest(longest_threads_time_accumulator) +def vector_vector_subtraction(b, Ax, start, end, residual): + residual[start:end] = [x-y for x, y in zip(b[start:end], Ax[start:end])] + +@time_measurement_longest(longest_threads_time_accumulator) +def scalar_vector_multiply(omega, vector, start, end, result): + result[start:end] = [omega * x for x in vector[start:end]] + +@time_measurement_longest(longest_threads_time_accumulator) +def vector_vector_addition(input_x, vector, start, end, output_x): + output_x[start:end] = [x+y for x, y in zip(input_x[start:end], vector[start:end])] + +def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5): + longest_threads_time_accumulator.hard_reset() + + gc.disable() + start_time = time.perf_counter() + + n = len(b) + x0 = x0 if x0 is not None else [0.0] * len(b) + x = x0[:] + omega = 2 / (lambda_min + lambda_max) + num_threads = multiprocessing.cpu_count() + chunk_size = n // num_threads + + with ThreadPoolExecutor(max_workers=num_threads) as executor: # wątki są tworzone raz i nie są niszczone + for iteration in range(max_iterations): + + Ax = [0] * len(x) # tutaj zostanie przypisany wynik z mnożenia macierzy A z wektorem x + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(matrix_vector_multiply, A, x, start, end, Ax)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + residual = [0] * len(b) # tutaj zostanie przypisany wynik z vector_vector_subtraction + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(vector_vector_subtraction, b, Ax, start, end, residual)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + change_vector = [0] * len(residual) # zostanie tu przypisany wynik scalar_vector_multiply po pracy wątków + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(scalar_vector_multiply, omega, residual, start, end, change_vector)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + _x = x[:] # do _x zostanie przez wątki przypisany wynik pracy w danej iteracji + futures = [] + + for i in range(num_threads): + start = i * chunk_size + end = n if i == num_threads - 1 else (i + 1) * chunk_size + futures.append(executor.submit(vector_vector_addition, x, change_vector, start, end, _x)) + + for future in futures: + future.result() + + longest_threads_time_accumulator.save_lap_and_reset() + x = _x[:] + + + if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol): + break + + + end_time = time.perf_counter() + gc.enable() + total_time = end_time - start_time + sequential_time = total_time - longest_threads_time_accumulator.total_time + + print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s") + + return x, 0 \ No newline at end of file