feat: moving to numba package

This commit is contained in:
Gromiusz 2025-01-13 17:05:20 +01:00
parent 8b457a2f43
commit d1a3deab53
2 changed files with 163 additions and 72 deletions

View File

@ -1,103 +1,91 @@
import multiprocessing
import gc
import time
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from numba import njit, prange
from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time
import linear_algebra_utils as linAlg
# Funkcje równoległe z Numba
@njit(parallel=True)
def numba_matrix_vector_multiply(A, input_x, Ax):
n, m = A.shape
for i in prange(n):
Ax[i] = np.dot(A[i], input_x)
@njit(parallel=True)
def numba_vector_vector_subtraction(b, Ax, residual):
for i in prange(len(b)):
residual[i] = b[i] - Ax[i]
@njit(parallel=True)
def numba_scalar_vector_multiply(omega, vector, result):
for i in prange(len(vector)):
result[i] = omega * vector[i]
@njit(parallel=True)
def numba_vector_vector_addition(input_x, vector, output_x):
for i in prange(len(input_x)):
output_x[i] = input_x[i] + vector[i]
# Funkcje z dekoratorem
@time_measurement_longest(longest_threads_time_accumulator)
def matrix_vector_multiply(A, input_x, Ax):
numba_matrix_vector_multiply(A, input_x, Ax)
@time_measurement_longest(longest_threads_time_accumulator)
def matrix_vector_multiply(A, input_x, start, end, Ax):
Ax[start:end] = [sum(x*y for x, y in zip(row, input_x)) for row in A[start:end]]
def vector_vector_subtraction(b, Ax, residual):
numba_vector_vector_subtraction(b, Ax, residual)
@time_measurement_longest(longest_threads_time_accumulator)
def vector_vector_subtraction(b, Ax, start, end, residual):
residual[start:end] = [x-y for x, y in zip(b[start:end], Ax[start:end])]
def scalar_vector_multiply(omega, vector, result):
numba_scalar_vector_multiply(omega, vector, result)
@time_measurement_longest(longest_threads_time_accumulator)
def scalar_vector_multiply(omega, vector, start, end, result):
result[start:end] = [omega * x for x in vector[start:end]]
@time_measurement_longest(longest_threads_time_accumulator)
def vector_vector_addition(input_x, vector, start, end, output_x):
output_x[start:end] = [x+y for x, y in zip(input_x[start:end], vector[start:end])]
def vector_vector_addition(input_x, vector, output_x):
numba_vector_vector_addition(input_x, vector, output_x)
# Metoda Richardson z obsługą wątków
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
longest_threads_time_accumulator.hard_reset()
gc.disable()
start_time = time.perf_counter()
n = len(b)
x0 = x0 if x0 is not None else [0.0] * len(b)
x = x0[:]
A = np.array(A)
b = np.array(b)
x0 = np.array(x0) if x0 is not None else np.zeros_like(b)
x = x0.copy()
omega = 2 / (lambda_min + lambda_max)
num_threads = multiprocessing.cpu_count()
chunk_size = n // num_threads
n = len(b)
with ThreadPoolExecutor(max_workers=num_threads) as executor: # wątki są tworzone raz i nie są niszczone
for iteration in range(max_iterations):
for iteration in range(max_iterations):
Ax = np.zeros_like(x)
matrix_vector_multiply(A, x, Ax)
longest_threads_time_accumulator.save_lap_and_reset()
Ax = [0] * len(x) # tutaj zostanie przypisany wynik z mnożenia macierzy A z wektorem x
futures = []
residual = np.zeros_like(b)
vector_vector_subtraction(b, Ax, residual)
longest_threads_time_accumulator.save_lap_and_reset()
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(matrix_vector_multiply, A, x, start, end, Ax))
for future in futures:
future.result()
change_vector = np.zeros_like(residual)
scalar_vector_multiply(omega, residual, change_vector)
longest_threads_time_accumulator.save_lap_and_reset()
longest_threads_time_accumulator.save_lap_and_reset()
residual = [0] * len(b) # tutaj zostanie przypisany wynik z vector_vector_subtraction
futures = []
_x = np.zeros_like(x)
vector_vector_addition(x, change_vector, _x)
longest_threads_time_accumulator.save_lap_and_reset()
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(vector_vector_subtraction, b, Ax, start, end, residual))
for future in futures:
future.result()
x = _x.copy()
longest_threads_time_accumulator.save_lap_and_reset()
change_vector = [0] * len(residual) # zostanie tu przypisany wynik scalar_vector_multiply po pracy wątków
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(scalar_vector_multiply, omega, residual, start, end, change_vector))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
_x = x[:] # do _x zostanie przez wątki przypisany wynik pracy w danej iteracji
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(vector_vector_addition, x, change_vector, start, end, _x))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
x = _x[:]
if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol):
break
if linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol:
break
end_time = time.perf_counter()
gc.enable()
total_time = end_time - start_time
sequential_time = total_time - longest_threads_time_accumulator.total_time
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
return x, 0
return x, 0

103
code/threads_indep_old.py Normal file
View File

@ -0,0 +1,103 @@
import multiprocessing
import gc
import time
from concurrent.futures import ThreadPoolExecutor
from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time
import linear_algebra_utils as linAlg
@time_measurement_longest(longest_threads_time_accumulator)
def matrix_vector_multiply(A, input_x, start, end, Ax):
Ax[start:end] = [sum(x*y for x, y in zip(row, input_x)) for row in A[start:end]]
@time_measurement_longest(longest_threads_time_accumulator)
def vector_vector_subtraction(b, Ax, start, end, residual):
residual[start:end] = [x-y for x, y in zip(b[start:end], Ax[start:end])]
@time_measurement_longest(longest_threads_time_accumulator)
def scalar_vector_multiply(omega, vector, start, end, result):
result[start:end] = [omega * x for x in vector[start:end]]
@time_measurement_longest(longest_threads_time_accumulator)
def vector_vector_addition(input_x, vector, start, end, output_x):
output_x[start:end] = [x+y for x, y in zip(input_x[start:end], vector[start:end])]
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
longest_threads_time_accumulator.hard_reset()
gc.disable()
start_time = time.perf_counter()
n = len(b)
x0 = x0 if x0 is not None else [0.0] * len(b)
x = x0[:]
omega = 2 / (lambda_min + lambda_max)
num_threads = multiprocessing.cpu_count()
chunk_size = n // num_threads
with ThreadPoolExecutor(max_workers=num_threads) as executor: # wątki są tworzone raz i nie są niszczone
for iteration in range(max_iterations):
Ax = [0] * len(x) # tutaj zostanie przypisany wynik z mnożenia macierzy A z wektorem x
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(matrix_vector_multiply, A, x, start, end, Ax))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
residual = [0] * len(b) # tutaj zostanie przypisany wynik z vector_vector_subtraction
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(vector_vector_subtraction, b, Ax, start, end, residual))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
change_vector = [0] * len(residual) # zostanie tu przypisany wynik scalar_vector_multiply po pracy wątków
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(scalar_vector_multiply, omega, residual, start, end, change_vector))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
_x = x[:] # do _x zostanie przez wątki przypisany wynik pracy w danej iteracji
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(vector_vector_addition, x, change_vector, start, end, _x))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
x = _x[:]
if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol):
break
end_time = time.perf_counter()
gc.enable()
total_time = end_time - start_time
sequential_time = total_time - longest_threads_time_accumulator.total_time
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
return x, 0