WUT_Computer_Science/code/linear_algebra_utils.py

570 lines
18 KiB
Python
Raw Normal View History

import math
2024-10-27 13:59:23 +01:00
import itertools
import operator
from multiprocessing import Pool
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time_measurement import time_measurement, time_accumulator
2024-12-05 20:09:44 +01:00
import dask.array as da
2024-10-27 13:59:23 +01:00
class LinearAlgebraUtils(ABC):
@staticmethod
@abstractmethod
def dot_product(v1, v2):
pass
@staticmethod
@abstractmethod
def matrix_vector_multiply(A, x):
pass
@staticmethod
@abstractmethod
def vector_norm(v):
pass
@staticmethod
@abstractmethod
def vector_scalar_divide(x, scalar):
pass
@staticmethod
@abstractmethod
def matrix_scalar_multiply(A, w):
pass
@staticmethod
@abstractmethod
def vector_vector_subtraction(v1, v2):
pass
@staticmethod
@abstractmethod
def vector_vector_addition(v1, v2):
pass
@staticmethod
@abstractmethod
def scalar_vector_multiply(omega, vector):
pass
@staticmethod
@abstractmethod
def matrix_norm(A):
pass
@staticmethod
@abstractmethod
def matrix_matrix_subtraction(A, B):
pass
@staticmethod
@abstractmethod
def gaussian_elimination(A, b):
pass
class SequentialLinearAlgebraUtils(ABC):
@staticmethod
def dot_product(v1, v2):
return sum(x*y for x, y in zip(v1, v2))
@staticmethod
def matrix_vector_multiply(A, x):
return [SequentialLinearAlgebraUtils.dot_product(row, x) for row in A]
@staticmethod
def vector_norm(v):
2024-10-27 13:59:23 +01:00
return math.sqrt(sum(x*x for x in v))
@staticmethod
def vector_scalar_divide(x, scalar):
return [xi / scalar for xi in x]
@staticmethod
def matrix_scalar_multiply(A, w):
return [[w * A[i][j] for j in range(len(A[0]))] for i in range(len(A))]
@staticmethod
def vector_vector_subtraction(v1, v2):
return [x-y for x, y in zip(v1, v2)]
@staticmethod
def vector_vector_addition(v1, v2):
return [x+y for x, y in zip(v1, v2)]
@staticmethod
def scalar_vector_multiply(omega, vector):
return [omega * x for x in vector]
@staticmethod
def matrix_norm(A):
return math.sqrt(sum(sum(element ** 2 for element in row) for row in A))
@staticmethod
def matrix_matrix_subtraction(A, B):
return [[A[i][j] - B[i][j] for j in range(len(A[0]))] for i in range(len(A))]
2024-10-20 19:19:31 +02:00
@staticmethod
def gaussian_elimination(A, b):
n = len(A)
M = [row[:] for row in A]
for i in range(n):
M[i].append(b[i])
for k in range(n):
if M[k][k] == 0:
for i in range(k + 1, n):
if M[i][k] != 0:
M[k], M[i] = M[i], M[k]
break
for i in range(k + 1, n):
factor = M[i][k] / M[k][k]
for j in range(k, n + 1):
M[i][j] -= factor * M[k][j]
x = [0] * n
for i in range(n - 1, -1, -1):
x[i] = M[i][-1] / M[i][i]
for k in range(i - 1, -1, -1):
M[k][-1] -= M[k][i] * x[i]
return x
class ThreadsLinearAlgebraUtils(ABC):
NUM_THREADS = 4
@staticmethod
def get_chunk_size(data):
num_elements = len(data)
num_threads = min(ThreadsLinearAlgebraUtils.NUM_THREADS, num_elements)
chunk_size = num_elements // num_threads
remainder = num_elements % num_threads
return chunk_size, num_threads, remainder
@staticmethod
def divide_vectors_to_chunks(v1, v2):
chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v1)
chunks = []
start = 0
for i in range(num_threads):
end = start + chunk_size + (1 if i < remainder else 0)
chunks.append((v1[start:end], v2[start:end]))
start = end
return chunks
@staticmethod
def divide_vector_or_matrix_to_chunks(v):
chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v)
chunks = []
start = 0
for i in range(num_threads):
end = start + chunk_size + (1 if i < remainder else 0)
chunks.append(v[start:end])
start = end
return chunks
@staticmethod
@time_measurement(time_accumulator)
def dot_product(v1, v2):
chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks)
return sum(results)
@staticmethod
@time_measurement(time_accumulator)
def matrix_vector_multiply(A, x):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x)
results = executor.map(func, chunks)
return [item for sublist in results for item in sublist]
@staticmethod
@time_measurement(time_accumulator)
def vector_norm(v):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(v)
def partial_norm(chunk):
return sum(x * x for x in chunk)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(partial_norm, chunks)
total_sum = sum(results)
return total_sum**0.5
@staticmethod
@time_measurement(time_accumulator)
def vector_scalar_divide(x, scalar):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(x)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.vector_scalar_divide(chunk, scalar), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
@time_measurement(time_accumulator)
def matrix_scalar_multiply(A, w):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.matrix_scalar_multiply(w, chunk), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
@time_measurement(time_accumulator)
def vector_vector_subtraction(v1, v2):
chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.vector_vector_subtraction(*pair), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
@time_measurement(time_accumulator)
def vector_vector_addition(v1, v2):
chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.vector_vector_addition(*pair), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
@time_measurement(time_accumulator)
def scalar_vector_multiply(omega, vector):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(vector)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.scalar_vector_multiply(omega, chunk), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
@time_measurement(time_accumulator)
def matrix_norm(A):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
def partial_norm(chunk):
return sum(element ** 2 for row in chunk for element in row)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(partial_norm, chunks)
total_sum = sum(results)
return math.sqrt(total_sum)
@staticmethod
@time_measurement(time_accumulator)
def divide_matrixes_to_chunks(A, B):
chunk_size = len(A) // ThreadsLinearAlgebraUtils.NUM_THREADS
return [(A[i:i + chunk_size], B[i:i + chunk_size]) for i in range(0, len(A), chunk_size)]
@staticmethod
@time_measurement(time_accumulator)
def matrix_matrix_subtraction(A, B):
def subtract_chunk(pair):
chunk_A, chunk_B = pair
return [[chunk_A[i][j] - chunk_B[i][j] for j in range(len(chunk_A[0]))] for i in range(len(chunk_A))]
chunks = ThreadsLinearAlgebraUtils.divide_matrixes_to_chunks(A, B)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(subtract_chunk, chunks)
return [row for chunk in results for row in chunk]
@staticmethod
@time_measurement(time_accumulator)
def gaussian_elimination(A, b):
n = len(A)
M = [row[:] for row in A]
for i in range(n):
M[i].append(b[i])
for k in range(n):
# Pivoting
if M[k][k] == 0:
for i in range(k + 1, n):
if M[i][k] != 0:
M[k], M[i] = M[i], M[k]
break
# Threads
def eliminate_row(i):
factor = M[i][k] / M[k][k]
for j in range(k, n + 1):
M[i][j] -= factor * M[k][j]
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
rows_to_eliminate = range(k + 1, n)
executor.map(eliminate_row, rows_to_eliminate)
x = [0] * n
for i in range(n - 1, -1, -1):
x[i] = M[i][-1] / M[i][i]
for k in range(i - 1, -1, -1):
M[k][-1] -= M[k][i] * x[i]
return x
2024-10-27 13:59:23 +01:00
@time_measurement(time_accumulator)
def process_row(params):
A, k, i = params
factor = A[i][k] / A[k][k]
return [A[i][j] - factor * A[k][j] for j in range(len(A[0]))]
@time_measurement(time_accumulator)
def divide_by_scalar(pair):
xi, scalar = pair
return xi / scalar
@time_measurement(time_accumulator)
def multiply_by_scalar(pair):
element, scalar = pair
return element * scalar
2024-10-27 13:59:23 +01:00
class ProcessLinearAlgebraUtils:
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def dot_product(v1, v2):
with Pool() as pool:
result = pool.starmap(ProcessLinearAlgebraUtils.multiply_elements, zip(v1, v2))
return sum(result)
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def multiply_elements(x, y):
return x * y
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def matrix_vector_multiply_row(params):
row, vector = params
return SequentialLinearAlgebraUtils.dot_product(row, vector)
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def matrix_vector_multiply(A, x):
with Pool() as pool:
result = pool.map(ProcessLinearAlgebraUtils.matrix_vector_multiply_row, [(row, x) for row in A])
return list(result)
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def vector_norm(v):
with Pool() as pool:
squared = pool.map(ProcessLinearAlgebraUtils.square, v)
return math.sqrt(sum(squared))
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def square(x):
return x * x
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def vector_scalar_divide(x, scalar):
with Pool() as pool:
result = pool.map(divide_by_scalar, [(xi, scalar) for xi in x])
return list(result)
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def divide_vector_by_scalar(x, scalar):
with Pool() as pool:
result = pool.map(ProcessLinearAlgebraUtils.vector_scalar_divide, [(xi, scalar) for xi in x])
return list(result)
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def matrix_scalar_multiply_row(params):
row, w = params
return [w * element for element in row]
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def matrix_scalar_multiply(A, w):
with Pool() as pool:
result = pool.map(ProcessLinearAlgebraUtils.matrix_scalar_multiply_row, [(row, w) for row in A])
return result
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def vector_vector_operation(params):
v1, v2, op = params
return op(v1, v2)
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def vector_vector_subtraction(v1, v2):
with Pool() as pool:
result = pool.map(ProcessLinearAlgebraUtils.vector_vector_operation, zip(v1, v2, itertools.repeat(operator.sub)))
return list(result)
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def vector_vector_addition(v1, v2):
with Pool() as pool:
result = pool.map(ProcessLinearAlgebraUtils.vector_vector_operation, zip(v1, v2, itertools.repeat(operator.add)))
return list(result)
@staticmethod
@time_measurement(time_accumulator)
def scalar_vector_multiply(omega, vector):
2024-10-27 13:59:23 +01:00
with Pool() as pool:
result = pool.map(multiply_by_scalar, [(element, omega) for element in vector])
return list(result)
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def matrix_norm(A):
with Pool() as pool:
row_sums = pool.map(lambda row: sum(x ** 2 for x in row), A)
2024-10-27 13:59:23 +01:00
return math.sqrt(sum(row_sums))
@staticmethod
@time_measurement(time_accumulator)
2024-10-27 13:59:23 +01:00
def matrix_matrix_subtraction(A, B):
def subtract_rows(row_pair):
return [a - b for a, b in zip(*row_pair)]
2024-10-27 13:59:23 +01:00
with Pool() as pool:
result = pool.starmap(subtract_rows, zip(A, B))
2024-10-27 13:59:23 +01:00
return result
@staticmethod
@time_measurement(time_accumulator)
def gaussian_elimination(A, b):
try:
n = len(A)
A = [list(row) + [b_i] for row, b_i in zip(A, b)]
for k in range(n):
# Pivoting
max_index = max(range(k, n), key=lambda x: abs(A[x][k]))
if A[max_index][k] == 0:
raise ValueError("Matrix is singular and cannot be solved.")
A[k], A[max_index] = A[max_index], A[k]
# Parallel row processing
with Pool() as pool:
results = pool.map(process_row, [(A, k, i) for i in range(k + 1, n)])
# Update remaining rows in matrix
for i in range(k + 1, n):
A[i] = results[i - (k + 1)]
# Back substitution
x = [0] * n
for i in range(n - 1, -1, -1):
sum_ax = sum(A[i][j] * x[j] for j in range(i + 1, n))
x[i] = (A[i][-1] - sum_ax) / A[i][i]
return x
except Exception as e:
print(f"Error during Gaussian elimination: {e}")
return None
2024-12-05 20:09:44 +01:00
class DistributedArraysLinearAlgebraUtils(ABC):
@staticmethod
@time_measurement(time_accumulator)
def dot_product(v1, v2):
dv1 = da.from_array(v1, chunks='auto')
dv2 = da.from_array(v2, chunks='auto')
return da.dot(dv1, dv2).compute()
@staticmethod
@time_measurement(time_accumulator)
def matrix_vector_multiply(A, x):
dA = da.from_array(A, chunks='auto')
dx = da.from_array(x, chunks='auto')
return da.dot(dA, dx).compute().tolist()
@staticmethod
@time_measurement(time_accumulator)
def vector_norm(v):
dv = da.from_array(v, chunks='auto')
return da.linalg.norm(dv).compute()
@staticmethod
@time_measurement(time_accumulator)
def vector_scalar_divide(x, scalar):
dx = da.from_array(x, chunks='auto')
return (dx / scalar).compute().tolist()
@staticmethod
@time_measurement(time_accumulator)
def matrix_scalar_multiply(A, w):
dA = da.from_array(A, chunks='auto')
return (dA * w).compute().tolist()
@staticmethod
@time_measurement(time_accumulator)
def vector_vector_subtraction(v1, v2):
dv1 = da.from_array(v1, chunks='auto')
dv2 = da.from_array(v2, chunks='auto')
return (dv1 - dv2).compute().tolist()
@staticmethod
@time_measurement(time_accumulator)
def vector_vector_addition(v1, v2):
dv1 = da.from_array(v1, chunks='auto')
dv2 = da.from_array(v2, chunks='auto')
return (dv1 + dv2).compute().tolist()
@staticmethod
@time_measurement(time_accumulator)
def scalar_vector_multiply(omega, vector):
dvector = da.from_array(vector, chunks='auto')
return (omega * dvector).compute().tolist()
@staticmethod
@time_measurement(time_accumulator)
def matrix_norm(A):
dA = da.from_array(A, chunks='auto')
return da.linalg.norm(dA).compute()
@staticmethod
@time_measurement(time_accumulator)
def matrix_matrix_subtraction(A, B):
dA = da.from_array(A, chunks='auto')
dB = da.from_array(B, chunks='auto')
return (dA - dB).compute().tolist()
@staticmethod
@time_measurement(time_accumulator)
def gaussian_elimination(A, b):
try:
dA = da.from_array(A, chunks='auto')
db = da.from_array(b, chunks='auto')
Ab = da.hstack([dA, db[:, None]])
Ab = Ab.persist()
def elimination_step(Ab, k):
n = Ab.shape[0]
max_index = da.argmax(da.abs(Ab[k:, k])) + k
Ab[[k, max_index]] = Ab[[max_index, k]]
Ab = Ab.persist()
factor = Ab[k + 1:, k] / Ab[k, k]
Ab[k + 1:] -= factor[:, None] * Ab[k]
return Ab
for k in range(A.shape[0]):
Ab = elimination_step(Ab, k)
x = da.zeros(A.shape[0])
for i in range(A.shape[0] - 1, -1, -1):
x[i] = (Ab[i, -1] - da.dot(Ab[i, i + 1:-1], x[i + 1:])) / Ab[i, i]
return x.compute().tolist()
except Exception as e:
print(f"Error during Gaussian elimination: {e}")
return None