mirror of
https://github.com/kuhyx/WUT_Computer_Science.git
synced 2026-07-04 21:03:07 +02:00
feat: trying and testing different ways to implement threads
This commit is contained in:
parent
9ca9b35f8c
commit
fd94f597a5
@ -2,6 +2,7 @@ import cmath
|
||||
import math
|
||||
import itertools
|
||||
import operator
|
||||
import multiprocessing
|
||||
from multiprocessing import Pool
|
||||
from abc import ABC, abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
@ -10,7 +11,7 @@ from time_measurement import time_measurement, time_accumulator
|
||||
import numpy as np
|
||||
import dask.array as da
|
||||
|
||||
class LinearAlgebraUtils(ABC):
|
||||
class LinearAlgebraUtils:
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
def dot_product(v1, v2):
|
||||
@ -62,7 +63,7 @@ class LinearAlgebraUtils(ABC):
|
||||
pass
|
||||
|
||||
|
||||
class SequentialLinearAlgebraUtils(ABC):
|
||||
class SequentialLinearAlgebraUtils:
|
||||
@staticmethod
|
||||
def dot_product(v1, v2):
|
||||
return sum(x*y for x, y in zip(v1, v2))
|
||||
@ -106,11 +107,10 @@ class SequentialLinearAlgebraUtils(ABC):
|
||||
return [[A[i][j] - B[i][j] for j in range(len(A[0]))] for i in range(len(A))]
|
||||
|
||||
|
||||
class ThreadsLinearAlgebraUtils(ABC):
|
||||
NUM_THREADS = 4
|
||||
class ThreadsLinearAlgebraUtils:
|
||||
NUM_THREADS = multiprocessing.cpu_count()
|
||||
|
||||
@staticmethod
|
||||
@time_measurement(time_accumulator)
|
||||
def get_chunk_size(data):
|
||||
num_elements = len(data)
|
||||
num_threads = min(ThreadsLinearAlgebraUtils.NUM_THREADS, num_elements)
|
||||
@ -120,7 +120,6 @@ class ThreadsLinearAlgebraUtils(ABC):
|
||||
|
||||
|
||||
@staticmethod
|
||||
@time_measurement(time_accumulator)
|
||||
def divide_vectors_to_chunks(v1, v2):
|
||||
chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v1)
|
||||
|
||||
@ -134,7 +133,6 @@ class ThreadsLinearAlgebraUtils(ABC):
|
||||
return chunks
|
||||
|
||||
@staticmethod
|
||||
@time_measurement(time_accumulator)
|
||||
def divide_vector_or_matrix_to_chunks(v):
|
||||
chunk_size, num_threads, remainder = ThreadsLinearAlgebraUtils.get_chunk_size(v)
|
||||
|
||||
@ -156,6 +154,15 @@ class ThreadsLinearAlgebraUtils(ABC):
|
||||
results = executor.map(lambda pair: SequentialLinearAlgebraUtils.dot_product(*pair), chunks)
|
||||
return sum(results)
|
||||
|
||||
# @staticmethod
|
||||
# @time_measurement(time_accumulator)
|
||||
# def matrix_vector_multiply(A, x):
|
||||
# chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
|
||||
# with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
|
||||
# func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x)
|
||||
# results = executor.map(func, chunks)
|
||||
# return [item for sublist in results for item in sublist]
|
||||
|
||||
@staticmethod
|
||||
@time_measurement(time_accumulator)
|
||||
def matrix_vector_multiply(A, x):
|
||||
|
||||
@ -2,6 +2,7 @@ import pytest
|
||||
import numpy as np
|
||||
from matrix_generator import MatrixGenerator
|
||||
from richardson_method import RichardsonMethod
|
||||
from threads import RichardsonMethodThreads
|
||||
from processing_type import ProcessingType
|
||||
from time_measurement import time_measurement, tests_time
|
||||
|
||||
@ -40,10 +41,10 @@ def solution_lib(A, b):
|
||||
10000
|
||||
])
|
||||
@pytest.mark.parametrize("processing_type", [
|
||||
ProcessingType.SEQUENTIAL,
|
||||
ProcessingType.THREADS,
|
||||
ProcessingType.PROCESSES,
|
||||
ProcessingType.DISTRIBUTED_ARRAYS
|
||||
# ProcessingType.SEQUENTIAL,
|
||||
ProcessingType.THREADS#,
|
||||
# ProcessingType.PROCESSES,
|
||||
# ProcessingType.DISTRIBUTED_ARRAYS
|
||||
])
|
||||
@pytest.mark.parametrize("matrix_type", [
|
||||
"spd",
|
||||
@ -67,12 +68,16 @@ def test_richardson_vs_cg(n: int, processing_type: ProcessingType, matrix_type:
|
||||
else:
|
||||
raise ValueError("Invalid matrix type specified. Choose 'spd', 'poli3', or 'nemeth12'.")
|
||||
|
||||
richardson_solver = RichardsonMethod(processing_type, A, b, lambda_min, lambda_max, max_iterations, size=A.shape[0], tol=1e-7)
|
||||
|
||||
solution_richardson, info_richardson = None, None
|
||||
with capsys.disabled():
|
||||
solution_richardson, info_richardson = richardson_solver.solve()
|
||||
|
||||
|
||||
if processing_type != ProcessingType.THREADS:
|
||||
richardson_solver = RichardsonMethod(processing_type, A, b, lambda_min, lambda_max, max_iterations, size=A.shape[0], tol=1e-7)
|
||||
with capsys.disabled():
|
||||
solution_richardson, info_richardson = richardson_solver.solve()
|
||||
else:
|
||||
with capsys.disabled():
|
||||
solution_richardson, info_richardson = RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, tol=1e-7)
|
||||
|
||||
# Przechwytywanie wyjścia po solve
|
||||
captured = capsys.readouterr()
|
||||
print("Captured output:", captured.out)
|
||||
|
||||
72
code/threads.py
Normal file
72
code/threads.py
Normal file
@ -0,0 +1,72 @@
|
||||
import numpy as np
|
||||
import threading
|
||||
import multiprocessing
|
||||
import gc
|
||||
import time
|
||||
import sys
|
||||
from time_measurement import time_measurement_longest, longest_time_accumulator, tests_time
|
||||
import linear_algebra_utils as linAlg
|
||||
|
||||
|
||||
@time_measurement_longest(longest_time_accumulator)
|
||||
def RichardsonThread(A, b, x, _x, omega, start, end):
|
||||
for i in range(start, end):
|
||||
sigma = np.dot(A[i, :], _x) - A[i, i] * _x[i]
|
||||
x[i] = (1 - omega) * _x[i] + omega * (b[i] - sigma) / A[i, i]
|
||||
|
||||
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
|
||||
longest_time_accumulator.total_time = 0
|
||||
longest_time_accumulator.start = sys.float_info.max
|
||||
longest_time_accumulator.end = 0
|
||||
|
||||
gc.disable()
|
||||
start_time = time.perf_counter()
|
||||
|
||||
n = len(b)
|
||||
x0 = x0 if x0 is not None else [0.0] * len(b)
|
||||
x = x0[:]
|
||||
omega = 0.05#2 / (lambda_min + lambda_max)
|
||||
num_threads = multiprocessing.cpu_count()
|
||||
threads = []
|
||||
chunk_size = n // num_threads
|
||||
max_iterations = 1000
|
||||
|
||||
for _ in range(max_iterations):
|
||||
_x = x[:]
|
||||
for i in range(num_threads):
|
||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||
thread = threading.Thread(target=RichardsonThread, args=(A, b, x, _x, omega, start, end))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# Ax = linAlg.SequentialLinearAlgebraUtils.matrix_vector_multiply(A, x)
|
||||
# residual = linAlg.SequentialLinearAlgebraUtils.vector_vector_subtraction(b, Ax)
|
||||
# if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < tol):
|
||||
# break
|
||||
|
||||
end_time = time.perf_counter()
|
||||
gc.enable()
|
||||
total_time = end_time - start_time
|
||||
sequential_time = total_time - longest_time_accumulator.total_time
|
||||
|
||||
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
|
||||
|
||||
return x, 0
|
||||
|
||||
|
||||
|
||||
|
||||
# # Przykładowe dane wejściowe
|
||||
# np.random.seed(0) # Ustalanie ziarna dla powtarzalności wyników
|
||||
# A = np.random.rand(20, 20) + 20 * np.eye(20) # Macierz przekątniowa z losowymi elementami
|
||||
# b = np.random.rand(20) # Wektor wyrazów wolnych
|
||||
# omega = 0.2
|
||||
# n_iterations = 1000
|
||||
|
||||
# # Rozwiązanie układu równań metodą Richardson'a
|
||||
# x = RichardsonMethodThreads(A, b, 5, 5, n_iterations)
|
||||
# print("Rozwiązanie: ", x)
|
||||
119
code/threads_nowy.py
Normal file
119
code/threads_nowy.py
Normal file
@ -0,0 +1,119 @@
|
||||
import numpy as np
|
||||
import threading
|
||||
import multiprocessing
|
||||
import gc
|
||||
import time
|
||||
import sys
|
||||
from time_measurement import time_measurement_longest, longest_time_accumulator, tests_time
|
||||
import linear_algebra_utils as linAlg
|
||||
|
||||
|
||||
@time_measurement_longest(longest_time_accumulator)
|
||||
def RichardsonThread(A, b, x, _x, omega, start, end):
|
||||
for i in range(start, end):
|
||||
sigma = np.dot(A[i, :], x) - A[i, i] * x[i]
|
||||
x[i] = (1 - omega) * x[i] + omega * (b[i] - sigma) / A[i, i]
|
||||
|
||||
def matrix_vector_multiply(A, x, start, end, Ax):
|
||||
Ax[start:end] = [sum(xx*yy for xx, yy in zip(row, x)) for row in A[start:end]]
|
||||
|
||||
def vector_vector_subtraction(b, Ax, start, end, residual):
|
||||
residual[start:end] = [xx-yy for xx, yy in zip(b[start:end], Ax[start:end])]
|
||||
|
||||
def RichardsonMethodThreads(A, b, lambda_min, lambda_max, max_iterations, x0=None, tol=1e-5):
|
||||
longest_time_accumulator.total_time = 0
|
||||
longest_time_accumulator.start = sys.float_info.max
|
||||
longest_time_accumulator.end = 0
|
||||
|
||||
gc.disable()
|
||||
start_time = time.perf_counter()
|
||||
|
||||
n = len(b)
|
||||
x0 = x0 if x0 is not None else [0.0] * len(b)
|
||||
x = x0[:]
|
||||
omega = 2 / (lambda_min + lambda_max)
|
||||
num_threads = multiprocessing.cpu_count()
|
||||
threads = []
|
||||
chunk_size = n // num_threads
|
||||
|
||||
for iteration in range(max_iterations):
|
||||
# chunks = ThreadsLinearAlgebraUtils.divide_vector_or_matrix_to_chunks(A)
|
||||
|
||||
# with ThreadPoolExecutor(max_workers=ThreadsLinearAlgebraUtils.NUM_THREADS) as executor:
|
||||
# func = partial(SequentialLinearAlgebraUtils.matrix_vector_multiply, x=x)
|
||||
# results = executor.map(func, chunks)
|
||||
Ax = [0] * len(x)
|
||||
for i in range(num_threads):
|
||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||
thread = threading.Thread(target=matrix_vector_multiply, args=(A, x, start, end, Ax))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
|
||||
residual = [0] * len(b)
|
||||
|
||||
for i in range(num_threads):
|
||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||
thread = threading.Thread(target=vector_vector_subtraction, args=(b, Ax, start, end, residual))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
|
||||
x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual))
|
||||
|
||||
for i in range(num_threads):
|
||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||
thread = threading.Thread(target=scalar_vector_multiply, args=(A, b, x, omega, start, end))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
|
||||
x = self.LinAlg.vector_vector_addition(x, self.LinAlg.scalar_vector_multiply(self.omega, residual))
|
||||
|
||||
for i in range(num_threads):
|
||||
start = i * chunk_size # start jest indeksem w A. Wątki otrzymują kolejny punkt startowy będący wielokrotnością rozmiaru porcji na wątek
|
||||
end = n if i == num_threads - 1 else (i + 1) * chunk_size
|
||||
thread = threading.Thread(target=vector_vector_addition, args=(A, b, x, omega, start, end))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
if (linAlg.SequentialLinearAlgebraUtils.vector_norm(residual) < self.tol):
|
||||
break
|
||||
|
||||
end_time = time.perf_counter()
|
||||
gc.enable()
|
||||
total_time = end_time - start_time
|
||||
sequential_time = total_time - longest_time_accumulator.total_time
|
||||
|
||||
print(f"Total: {total_time:.3e}s, Seq: {sequential_time:.3e}s, Parallel (threads): {longest_time_accumulator.total_time:.3e}s, Tests time: {tests_time.total_time:.3e}s")
|
||||
|
||||
return x, 0
|
||||
|
||||
|
||||
|
||||
|
||||
# # Przykładowe dane wejściowe
|
||||
# np.random.seed(0) # Ustalanie ziarna dla powtarzalności wyników
|
||||
# A = np.random.rand(20, 20) + 20 * np.eye(20) # Macierz przekątniowa z losowymi elementami
|
||||
# b = np.random.rand(20) # Wektor wyrazów wolnych
|
||||
# omega = 0.2
|
||||
# n_iterations = 1000
|
||||
|
||||
# # Rozwiązanie układu równań metodą Richardson'a
|
||||
# x = RichardsonMethodThreads(A, b, 5, 5, n_iterations)
|
||||
# print("Rozwiązanie: ", x)
|
||||
@ -1,13 +1,22 @@
|
||||
import time
|
||||
import sys
|
||||
from functools import wraps
|
||||
|
||||
class TimeAccumulator:
|
||||
def __init__(self):
|
||||
self.total_time = 0
|
||||
|
||||
class ComplexTimeAcumulator:
|
||||
def __init__(self):
|
||||
self.total_time = 0
|
||||
self.start = sys.float_info.max
|
||||
self.end = 0
|
||||
|
||||
time_accumulator = TimeAccumulator()
|
||||
tests_time = TimeAccumulator()
|
||||
|
||||
longest_time_accumulator = ComplexTimeAcumulator()
|
||||
|
||||
def time_measurement(accumulator):
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
@ -20,6 +29,22 @@ def time_measurement(accumulator):
|
||||
return inner
|
||||
return decorator
|
||||
|
||||
def time_measurement_longest(accumulator: ComplexTimeAcumulator):
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def inner(*args, **kwargs):
|
||||
start = time.perf_counter()
|
||||
result = func(*args, **kwargs)
|
||||
end = time.perf_counter()
|
||||
if start < accumulator.start:
|
||||
accumulator.start = start
|
||||
if end > accumulator.end:
|
||||
accumulator.end = end
|
||||
accumulator.total_time = accumulator.end - accumulator.start # "=" instead of "+="
|
||||
return result
|
||||
return inner
|
||||
return decorator
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user