Merge pull request #13 from kuhyx/simpler_threads

Simpler threads
This commit is contained in:
Gromiusz 2025-01-13 23:22:58 +01:00 committed by GitHub
commit 0a62d75a82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 334 additions and 89 deletions

23
README.md Normal file
View File

@ -0,0 +1,23 @@
cd code
pyenv init
COPY AND PASTE PYENV CONFIGURATION TO YOUR SHELL CONFIG
source ~/.zshrc (or your shell)
pyenv install 3.11
pyenv global 3.11
Ensure that python DID change its version
python --version
python -m venv ./venv
source ./venv/bin/activate
pip install -r requirements.txt
python main.py

View File

@ -2,6 +2,7 @@ import cmath
import math import math
import itertools import itertools
import operator import operator
import multiprocessing
from multiprocessing import Pool from multiprocessing import Pool
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@ -10,7 +11,7 @@ from time_measurement import time_measurement, time_accumulator
import numpy as np import numpy as np
import dask.array as da import dask.array as da
class LinearAlgebraUtils(ABC): class LinearAlgebraUtils:
@staticmethod @staticmethod
@abstractmethod @abstractmethod
def dot_product(v1, v2): def dot_product(v1, v2):
@ -62,7 +63,7 @@ class LinearAlgebraUtils(ABC):
pass pass
class SequentialLinearAlgebraUtils(ABC): class SequentialLinearAlgebraUtils:
@staticmethod @staticmethod
def dot_product(v1, v2): def dot_product(v1, v2):
return sum(x*y for x, y in zip(v1, v2)) return sum(x*y for x, y in zip(v1, v2))
@ -106,11 +107,10 @@ class SequentialLinearAlgebraUtils(ABC):
return [[A[i][j] - B[i][j] for j in range(len(A[0]))] for i in range(len(A))] return [[A[i][j] - B[i][j] for j in range(len(A[0]))] for i in range(len(A))]
class ThreadsLinearAlgebraUtils(ABC): class ThreadsLinearAlgebraUtils:
NUM_THREADS = 4 NUM_THREADS = multiprocessing.cpu_count()
@staticmethod @staticmethod
@time_measurement(time_accumulator)
def get_chunk_size(data): def get_chunk_size(data):
num_elements = len(data) num_elements = len(data)
num_threads = min(ThreadsLinearAlgebraUtils.NUM_THREADS, num_elements) num_threads = min(ThreadsLinearAlgebraUtils.NUM_THREADS, num_elements)
@ -118,9 +118,7 @@ class ThreadsLinearAlgebraUtils(ABC):
remainder = num_elements % num_threads remainder = num_elements % num_threads
return chunk_size, num_threads, remainder return chunk_size, num_threads, remainder
@staticmethod @staticmethod
@time_measurement(time_accumulator)
def divide_vectors_to_chunks(v1, v2): def divide_vectors_to_chunks(v1, v2):
chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v1) chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v1)
@ -134,7 +132,6 @@ class ThreadsLinearAlgebraUtils(ABC):
return chunks return chunks
@staticmethod @staticmethod
@time_measurement(time_accumulator)
def divide_vector_or_matrix_to_chunks(v): def divide_vector_or_matrix_to_chunks(v):
chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v) chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v)
@ -147,15 +144,6 @@ class ThreadsLinearAlgebraUtils(ABC):
return chunks return chunks
@staticmethod
@time_measurement(time_accumulator)
def dot_product(v1, v2):
chunks = ThreadsLinearAlgebraUtils.divide_vectors_to_chunks(v1, v2)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks)
return sum(results)
@staticmethod @staticmethod
@time_measurement(time_accumulator) @time_measurement(time_accumulator)
def matrix_vector_multiply(A, x): def matrix_vector_multiply(A, x):
@ -178,23 +166,6 @@ class ThreadsLinearAlgebraUtils(ABC):
total_sum = sum(results) total_sum = sum(results)
return total_sum**0.5 return total_sum**0.5
@staticmethod
@time_measurement(time_accumulator)
def vector_scalar_divide(x, scalar):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(x)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.vector_scalar_divide(chunk, scalar), chunks)
return [item for sublist in results for item in sublist]
@staticmethod
@time_measurement(time_accumulator)
def matrix_scalar_multiply(A, w):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(lambda chunk: SequentialLinearAlgebraUtils.matrix_scalar_multiply(w, chunk), chunks)
return [item for sublist in results for item in sublist]
@staticmethod @staticmethod
@time_measurement(time_accumulator) @time_measurement(time_accumulator)
def vector_vector_subtraction(v1, v2): def vector_vector_subtraction(v1, v2):
@ -221,54 +192,6 @@ class ThreadsLinearAlgebraUtils(ABC):
return [item for sublist in results for item in sublist] return [item for sublist in results for item in sublist]
@staticmethod
@time_measurement(time_accumulator)
def matrix_norm(A):
chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
def partial_norm(chunk):
return sum(element ** 2 for row in chunk for element in row)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(partial_norm, chunks)
total_sum = sum(results)
return math.sqrt(total_sum)
@staticmethod
@time_measurement(time_accumulator)
def divide_matrixes_to_chunks(A, B):
num_rows = len(A)
num_threads = ThreadsLinearAlgebraUtils.NUM_THREADS
if num_threads > num_rows:
num_threads = num_rows
if num_rows == 0:
return []
chunk_size = num_rows // num_threads
remainder = num_rows % num_threads
chunks = []
start = 0
for _ in range(num_threads):
end = start + chunk_size + (1 if remainder > 0 else 0)
chunks.append((A[start:end], B[start:end]))
start = end
if remainder > 0:
remainder -= 1
return chunks
@staticmethod
@time_measurement(time_accumulator)
def matrix_matrix_subtraction(A, B):
def subtract_chunk(pair):
chunk_A, chunk_B = pair
return [[chunk_A[i][j] - chunk_B[i][j] for j in range(len(chunk_A[0]))] for i in range(len(chunk_A))]
chunks = ThreadsLinearAlgebraUtils.divide_matrixes_to_chunks(A, B)
with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
results = executor.map(subtract_chunk, chunks)
return [row for chunk in results for row in chunk]
@time_measurement(time_accumulator) @time_measurement(time_accumulator)
def process_row(params): def process_row(params):

View File

@ -1,3 +1,4 @@
scipy scipy
pytest pytest
dask dask
numba

View File

@ -2,6 +2,7 @@ import pytest
import numpy as np import numpy as np
from matrix_generator import MatrixGenerator from matrix_generator import MatrixGenerator
from richardson_method import RichardsonMethod from richardson_method import RichardsonMethod
from threads_indep import RichardsonMethodThreads
from processing_type import ProcessingType from processing_type import ProcessingType
from time_measurement import time_measurement, tests_time from time_measurement import time_measurement, tests_time
@ -67,11 +68,15 @@ def test_richardson_vs_cg(n: int, processing_type: ProcessingType, matrix_type:
else: else:
raise ValueError("Invalid matrix type specified. Choose 'spd', 'poli3', or 'nemeth12'.") raise ValueError("Invalid matrix type specified. Choose 'spd', 'poli3', or 'nemeth12'.")
richardson_solver = RichardsonMethod(processing_type, A, b, lambda_min, lambda_max, max_iterations, size=A.shape[0], tol=1e-7)
solution_richardson, info_richardson = None, None solution_richardson, info_richardson = None, None
with capsys.disabled():
solution_richardson, info_richardson = richardson_solver.solve() if processing_type != ProcessingType.THREADS:
richardson_solver = RichardsonMethod(processing_type, A, b, lambda_min, lambda_max, max_iterations, size=A.shape[0], tol=1e-7)
with capsys.disabled():
solution_richardson, info_richardson = richardson_solver.solve()
else:
with capsys.disabled():
solution_richardson, info_richardson = RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, tol=1e-7)
# Przechwytywanie wyjścia po solve # Przechwytywanie wyjścia po solve
captured = capsys.readouterr() captured = capsys.readouterr()

93
code/threads_indep.py Normal file
View File

@ -0,0 +1,93 @@
import gc
import time
import numpy as np
from numba import njit, prange
from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time
import linear_algebra_utils as linAlg
@njit(parallel=True)
def numba_matrix_vector_multiply(A, input_x, Ax):
for i in prange(len(A)):
acc = 0.0
for j in range(len(input_x)):
acc += A[i][j] * input_x[j]
Ax[i] = acc
@njit(parallel=True)
def numba_vector_vector_subtraction(b, Ax, residual):
for i in prange(len(b)):
residual[i] = b[i] - Ax[i]
@njit(nopython=True)
def numba_scalar_vector_multiply(omega, vector, result):
omega_real = omega.real
for i in range(len(vector)):
result[i] = omega_real * vector[i]
@njit(parallel=True)
def numba_vector_vector_addition(input_x, vector, output_x):
for i in prange(len(input_x)):
output_x[i] = input_x[i] + vector[i]
# Funkcje z dekoratorem
@time_measurement_longest(longest_threads_time_accumulator)
def matrix_vector_multiply(A, input_x, Ax):
numba_matrix_vector_multiply(A, input_x, Ax)
@time_measurement_longest(longest_threads_time_accumulator)
def vector_vector_subtraction(b, Ax, residual):
numba_vector_vector_subtraction(b, Ax, residual)
@time_measurement_longest(longest_threads_time_accumulator)
def scalar_vector_multiply(omega, vector, result):
numba_scalar_vector_multiply(omega, vector, result)
@time_measurement_longest(longest_threads_time_accumulator)
def vector_vector_addition(input_x, vector, output_x):
numba_vector_vector_addition(input_x, vector, output_x)
# Metoda Richardson z obsługą wątków
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
longest_threads_time_accumulator.hard_reset()
gc.disable()
start_time = time.perf_counter()
x0 = x0 if x0 is not None else [0.0] * len(b)
x = x0[:]
omega = 2 / (lambda_min + lambda_max)
n = len(b)
for iteration in range(max_iterations):
Ax = [0.0] * n
matrix_vector_multiply(A, x, Ax)
longest_threads_time_accumulator.save_lap_and_reset()
residual = [0.0] * n
vector_vector_subtraction(b, Ax, residual)
longest_threads_time_accumulator.save_lap_and_reset()
change_vector = [0.0] * n
scalar_vector_multiply(omega, residual, change_vector)
longest_threads_time_accumulator.save_lap_and_reset()
_x = [0.0] * n
vector_vector_addition(x, change_vector, _x)
longest_threads_time_accumulator.save_lap_and_reset()
x = _x[:]
if linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol:
break
end_time = time.perf_counter()
gc.enable()
total_time = end_time - start_time
sequential_time = total_time - longest_threads_time_accumulator.total_time
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
return x, 0

103
code/threads_indep_old.py Normal file
View File

@ -0,0 +1,103 @@
import multiprocessing
import gc
import time
from concurrent.futures import ThreadPoolExecutor
from time_measurement import time_measurement_longest, longest_threads_time_accumulator, tests_time
import linear_algebra_utils as linAlg
@time_measurement_longest(longest_threads_time_accumulator)
def matrix_vector_multiply(A, input_x, start, end, Ax):
Ax[start:end] = [sum(x*y for x, y in zip(row, input_x)) for row in A[start:end]]
@time_measurement_longest(longest_threads_time_accumulator)
def vector_vector_subtraction(b, Ax, start, end, residual):
residual[start:end] = [x-y for x, y in zip(b[start:end], Ax[start:end])]
@time_measurement_longest(longest_threads_time_accumulator)
def scalar_vector_multiply(omega, vector, start, end, result):
result[start:end] = [omega * x for x in vector[start:end]]
@time_measurement_longest(longest_threads_time_accumulator)
def vector_vector_addition(input_x, vector, start, end, output_x):
output_x[start:end] = [x+y for x, y in zip(input_x[start:end], vector[start:end])]
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
longest_threads_time_accumulator.hard_reset()
gc.disable()
start_time = time.perf_counter()
n = len(b)
x0 = x0 if x0 is not None else [0.0] * len(b)
x = x0[:]
omega = 2 / (lambda_min + lambda_max)
num_threads = multiprocessing.cpu_count()
chunk_size = n // num_threads
with ThreadPoolExecutor(max_workers=num_threads) as executor: # wątki są tworzone raz i nie są niszczone
for iteration in range(max_iterations):
Ax = [0] * len(x) # tutaj zostanie przypisany wynik z mnożenia macierzy A z wektorem x
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(matrix_vector_multiply, A, x, start, end, Ax))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
residual = [0] * len(b) # tutaj zostanie przypisany wynik z vector_vector_subtraction
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(vector_vector_subtraction, b, Ax, start, end, residual))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
change_vector = [0] * len(residual) # zostanie tu przypisany wynik scalar_vector_multiply po pracy wątków
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(scalar_vector_multiply, omega, residual, start, end, change_vector))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
_x = x[:] # do _x zostanie przez wątki przypisany wynik pracy w danej iteracji
futures = []
for i in range(num_threads):
start = i * chunk_size
end = n if i == num_threads - 1 else (i + 1) * chunk_size
futures.append(executor.submit(vector_vector_addition, x, change_vector, start, end, _x))
for future in futures:
future.result()
longest_threads_time_accumulator.save_lap_and_reset()
x = _x[:]
if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol):
break
end_time = time.perf_counter()
gc.enable()
total_time = end_time - start_time
sequential_time = total_time - longest_threads_time_accumulator.total_time
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_threads_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
return x, 0

View File

@ -0,0 +1,60 @@
import numpy as np
import threading
import multiprocessing
import gc
import time
import sys
from time_measurement import time_measurement_longest, longest_time_accumulator, tests_time
import linear_algebra_utils as linAlg
@time_measurement_longest(longest_time_accumulator)
def RichardsonThread(A, b, x, _x, omega, start, end):
for i in range(start, end):
sigma = np.dot(A[i, :], _x) - A[i, i] * _x[i]
x[i] = (1 - omega) * _x[i] + omega * (b[i] - sigma) / A[i, i]
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
longest_time_accumulator.total_time = 0
longest_time_accumulator.start = sys.float_info.max
longest_time_accumulator.end = 0
gc.disable()
start_time = time.perf_counter()
n = len(b)
x0 = x0 if x0 is not None else [0.0] * len(b)
x = x0[:]
omega = 0.05#2 / (lambda_min + lambda_max)
num_threads = multiprocessing.cpu_count()
threads = []
chunk_size = n // num_threads
max_iterations = 1000
for _ in range(max_iterations):
_x = x[:]
for i in range(num_threads):
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
end = n if i == num_threads - 1 else (i + 1) * chunk_size
thread = threading.Thread(target=RichardsonThread, args=(A, b, x, _x, omega, start, end))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# Ax = linAlg.SequentialLinearAlgebraUtils.matrix_vector_multiply(A, x)
# residual = linAlg.SequentialLinearAlgebraUtils.vector_vector_subtraction(b, Ax)
# if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol):
# break
end_time = time.perf_counter()
gc.enable()
total_time = end_time - start_time
sequential_time = total_time - longest_time_accumulator.total_time
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
return x, 0

View File

@ -1,14 +1,35 @@
import time import time
import sys
from functools import wraps from functools import wraps
class TimeAccumulator: class TimeAccumulator:
def __init__(self): def __init__(self):
self.total_time = 0 self.total_time = 0
class ComplexTimeAcumulator:
def __init__(self):
self.hard_reset()
def hard_reset(self):
self.total_time = 0
self.reset()
def reset(self):
self.lap_time = 0
self.start = sys.float_info.max
self.end = 0
def save_lap_and_reset(self):
self.total_time += self.lap_time
self.reset()
time_accumulator = TimeAccumulator() time_accumulator = TimeAccumulator()
tests_time = TimeAccumulator() tests_time = TimeAccumulator()
def time_measurement(accumulator): longest_threads_time_accumulator = ComplexTimeAcumulator()
def time_measurement(accumulator: TimeAccumulator):
def decorator(func): def decorator(func):
@wraps(func) @wraps(func)
def inner(*args, **kwargs): def inner(*args, **kwargs):
@ -20,6 +41,22 @@ def time_measurement(accumulator):
return inner return inner
return decorator return decorator
def time_measurement_longest(accumulator: ComplexTimeAcumulator):
def decorator(func):
@wraps(func)
def inner(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
if start < accumulator.start:
accumulator.start = start
if end > accumulator.end:
accumulator.end = end
accumulator.lap_time = accumulator.end - accumulator.start # "=" instead of "+="
return result
return inner
return decorator