Complete all threads functions in Linear Algebra Utils, add new parameters to tests (#5)

* complete threads function (tests FAIL)

Test fail due to indivisibility of the problem into threads. Test is turned to ProcessingType.THREADS!. On ProcessingType.SEQUENTIAL all test PASS.
TODO: make sure that len(problem) is divisible by num of threads. It can be less threads for small problem.

* fixing bugs in linear_algebra_utils, properly adress chunks to threads

* the refactor of the ThreadsLinearAlgebraUtils

* set test to execute both sequential and threads processing types

* Delete unnecessary lines tests.py

* rename scalar_matrix_multiply to scalar_vector_multiply
This commit is contained in:
Gromiusz 2024-10-27 20:13:43 +01:00 committed by GitHub
parent b02117c11b
commit c938e63b7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 145 additions and 23 deletions

View File

@ -1,5 +1,7 @@
import math
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from functools import partial
class LinearAlgebraUtils(ABC):
@staticmethod
@ -39,7 +41,7 @@ class LinearAlgebraUtils(ABC):
@staticmethod
@abstractmethod
def scalar_matrix_multiply(omega, vector):
def scalar_vector_multiply(omega, vector):
pass
@staticmethod
@ -88,7 +90,7 @@ class SequentialLinearAlgebraUtils(ABC):
return [x+y for x, y in zip(v1, v2)]
@staticmethod
def scalar_matrix_multiply(omega, vector):
def scalar_vector_multiply(omega, vector): # na pewno scalar matrix? a nie scalar vector?
return [omega * x for x in vector]
@ -130,46 +132,169 @@ class SequentialLinearAlgebraUtils(ABC):
class ThreadsLinearAlgebraUtils(ABC):
NUM_THREADS = 4
@staticmethod
def get_chunk_size(data):
num_elements = len(data)
num_threads = min(ThreadsLinearAlgebraUtils.NUM_THREADS, num_elements)
chunk_size = num_elements // num_threads
remainder = num_elements % num_threads
return chunk_size, num_threads, remainder
@staticmethod
def divide_vectors_to_chunks(v1, v2):
chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v1)
chunks = []
start = 0
for i in range(num_threads):
end = start + chunk_size + (1 if i < remainder else 0)
chunks.append((v1[start:end], v2[start:end]))
start = end
return chunks
@staticmethod
def divide_vector_or_matrix_to_chunks(v):
chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v)
chunks = []
start = 0
for i in range(num_threads):
end = start + chunk_size + (1 if i < remainder else 0)
chunks.append(v[start:end])
start = end
return chunks
@staticmethod
def dot_product(v1, v2):
pass
chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks)
return sum(results)
@staticmethod
def matrix_vector_multiply(A, x):
pass
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x)
results = executor.map(func, chunks)
return [item for sublist in results for item in sublist]
@staticmethod
def vector_norm(v):
pass
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(v)
def partial_norm(chunk):
return sum(x * x for x in chunk)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(partial_norm, chunks)
total_sum = sum(results)
return total_sum**0.5
@staticmethod
def vector_scalar_divide(x, scalar):
pass
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(x)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.vector_scalar_divide(chunk, scalar), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
def matrix_scalar_multiply(A, w):
pass
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.matrix_scalar_multiply(w, chunk), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
def vector_vector_subtraction(v1, v2):
pass
chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.vector_vector_subtraction(*pair), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
def vector_vector_addition(v1, v2):
pass
chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.vector_vector_addition(*pair), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
def scalar_matrix_multiply(omega, vector):
pass
def scalar_vector_multiply(omega, vector):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(vector)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.scalar_vector_multiply(omega, chunk), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
def matrix_norm(A):
pass
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
def partial_norm(chunk):
return sum(element ** 2 for row in chunk for element in row)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(partial_norm, chunks)
total_sum = sum(results)
return math.sqrt(total_sum)
@staticmethod
def divide_matrixes_to_chunks(A, B):
chunk_size = len(A) // ThreadsLinearAlgebraUtils.NUM_THREADS
return [(A[i:i + chunk_size], B[i:i + chunk_size]) for i in range(0, len(A), chunk_size)]
@staticmethod
def matrix_matrix_subtraction(A, B):
pass
def subtract_chunk(pair):
chunk_A, chunk_B = pair
return [[chunk_A[i][j] - chunk_B[i][j] for j in range(len(chunk_A[0]))] for i in range(len(chunk_A))]
chunks = ThreadsLinearAlgebraUtils.divide_matrixes_to_chunks(A, B)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(subtract_chunk, chunks)
return [row for chunk in results for row in chunk]
@staticmethod
def gaussian_elimination(A, b):
pass
n = len(A)
M = [row[:] for row in A]
for i in range(n):
M[i].append(b[i])
for k in range(n):
# Pivoting
if M[k][k] == 0:
for i in range(k + 1, n):
if M[i][k] != 0:
M[k], M[i] = M[i], M[k]
break
# Threads
def eliminate_row(i):
factor = M[i][k] / M[k][k]
for j in range(k, n + 1):
M[i][j] -= factor * M[k][j]
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
rows_to_eliminate = range(k + 1, n)
executor.map(eliminate_row, rows_to_eliminate)
x = [0] * n
for i in range(n - 1, -1, -1):
x[i] = M[i][-1] / M[i][i]
for k in range(i - 1, -1, -1):
M[k][-1] -= M[k][i] * x[i]
return x

View File

@ -54,6 +54,6 @@ class RichardsonMethod:
for iteration in range(self.max_iterations):
Ax = self.LinAlg.matrix_vector_multiply(self.A, x)
residual = self.LinAlg.vector_vector_subtraction(self.b, Ax)
x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_matrix_multiply(self.omega, residual))
x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual))
return x, 0

View File

@ -30,15 +30,14 @@ def calcualte_norm_from_matrix_numpy(A, n, max_iterations):
I = np.eye(n)
return calculate_norm_numpy(I, omega, A)
@pytest.mark.parametrize("n", [2, 3, 4, 5, 10, 20, 50, 100])
def test_richardson_vs_cg(n: int):
@pytest.mark.parametrize("processing_type", [ProcessingType.SEQUENTIAL, ProcessingType.THREADS])
def test_richardson_vs_cg(n: int, processing_type: ProcessingType):
print("matrix size: ", n)
tolerance = 1e-5
max_iterations=1000
A, b = MatrixGenerator.generate_random_matrix_and_vector(n)
richardson_solver = RichardsonMethod(ProcessingType.SEQUENTIAL , A, b, max_iterations, size=n, tol=1e-7)
richardson_solver = RichardsonMethod(processing_type, A, b, max_iterations, size=n, tol=1e-7)
solution_richardson, info_richardson = richardson_solver.solve()
solution_cg, info = cg(A, b)
@ -76,6 +75,4 @@ def assert_scipy_not_converged(solution_richardson, info_richardson, A, b):
assert False, "Richardson converged while SciPy did not"
if __name__ == "__main__":
# Run pytest and exit with the appropriate status code
for n in [2, 3, 4, 5, 10, 20, 50, 100]:
test_richardson_vs_cg(n)
pytest.main()