mirror of
https://github.com/kuhyx/WUT_Computer_Science.git
synced 2026-07-04 17:43:12 +02:00
git-subtree-dir: Programming/EARIN git-subtree-mainline:635e287095git-subtree-split:a09c96dd65
180 lines
6.0 KiB
Python
180 lines
6.0 KiB
Python
"""
|
|
Code used to solve MountainCar-v0 gymnasium problem using Q-Learning algorithm
|
|
"""
|
|
from datetime import datetime
|
|
import gymnasium as gym
|
|
import numpy as np
|
|
|
|
# Helper function to discretize the state
|
|
|
|
|
|
def discretize_state(state, env, first_time):
|
|
# print(
|
|
# f"state: {state}, state[0]: {state[0]}, env.observation_space.low: {env.observation_space.low}")
|
|
# print(f"state[0] - env {state[0] - env.observation_space.low}")
|
|
# print(f"state - env {state - env.observation_space.low}")
|
|
if first_time:
|
|
substract_from_state = state[0] - env.observation_space.low
|
|
else:
|
|
substract_from_state = state - env.observation_space.low
|
|
discretized_state = (
|
|
substract_from_state) * np.array([10, 100])
|
|
discretized_state = np.round(discretized_state, 0).astype(int)
|
|
return discretized_state
|
|
|
|
|
|
def initialize_environment(hyperparameters):
|
|
"""
|
|
Initialize environment and video recording
|
|
"""
|
|
# Initialize environment
|
|
env = gym.make('MountainCar-v0', render_mode='rgb_array')
|
|
# Save video
|
|
now = datetime.now()
|
|
time_string = now.strftime("%H:%M:%S")
|
|
env = gym.wrappers.RecordVideo(
|
|
env,
|
|
video_folder='vid',
|
|
disable_logger=True,
|
|
name_prefix=time_string, episode_trigger=lambda x: x > 600 and x % 2 == 0)
|
|
return env
|
|
|
|
|
|
def initialize_q_table(env):
|
|
"""
|
|
Initialize "empty" Q-table
|
|
"""
|
|
# Initialize Q-table
|
|
# n_actions = env.action_space.n # Number of possible actions, should be 3
|
|
# 0 accelerate left
|
|
# 1 dont accelerate
|
|
# 2 accelerate to the right
|
|
# q_table = np.zeros((n_actions,))
|
|
num_states = (env.observation_space.high -
|
|
env.observation_space.low) * np.array([10, 100])
|
|
num_states = np.round(num_states, 0).astype(int) + 1
|
|
q_table = np.zeros((num_states[0], num_states[1], env.action_space.n))
|
|
return q_table
|
|
|
|
|
|
def initialize_hyperparameters():
|
|
"""
|
|
Initialize hyperparameters used by algorithm
|
|
"""
|
|
hyperparameters = {
|
|
"learning_rate": 0.1,
|
|
"discount_factor": 0.99,
|
|
"epsilon": 0.2,
|
|
"max_episodes": 1000,
|
|
"max_steps": 500,
|
|
"min_max_car_position": [-1.2, 0.6],
|
|
"min_max_car_velocity": [-0.07, 0.07],
|
|
"goal_x": 0.5,
|
|
"truncation": 200
|
|
}
|
|
return hyperparameters
|
|
|
|
|
|
def choose_action(hyperparameters, env, q_table, discretized_state):
|
|
"""
|
|
Choose one of 3 actions possible for the algorithm
|
|
"""
|
|
# hyperparameters["epsilon"]-greedy exploration-exploitation tradeoff
|
|
if np.random.uniform(0, 1) < hyperparameters["epsilon"]:
|
|
action = env.action_space.sample() # Choose a random action
|
|
else:
|
|
# Choose the action with the highest Q-value
|
|
action = np.argmax(q_table[discretized_state[0], discretized_state[1]])
|
|
return action
|
|
|
|
|
|
def update_q_table(q_table, action, hyperparameters, reward):
|
|
"""
|
|
Update q_table with newest reward
|
|
"""
|
|
# Q-table update
|
|
q_value = q_table[action]
|
|
max_q_value = np.max(q_table)
|
|
new_q_value = (1 - hyperparameters["learning_rate"]) * q_value + \
|
|
hyperparameters["learning_rate"] * \
|
|
(reward + hyperparameters["discount_factor"] * max_q_value)
|
|
q_table[action] = new_q_value
|
|
return q_table
|
|
|
|
|
|
def movement(hyperparameters, env, q_table, discretized_state, total_reward=0, episode_number=0):
|
|
"""
|
|
Choose action and observe consequences
|
|
"""
|
|
action = choose_action(hyperparameters, env, q_table, discretized_state)
|
|
# Take the action and observe the next state
|
|
next_state, reward, terminated, truncated, _ = env.step(action)
|
|
discretized_next_state = discretize_state(next_state, env, False)
|
|
# print(discretized_next_state[0], discretized_next_state[1])
|
|
q_table[discretized_state[0], discretized_state[1], action] += hyperparameters["learning_rate"] * (reward + hyperparameters["discount_factor"] * np.max(
|
|
q_table[discretized_next_state[0], discretized_next_state[1]]) - q_table[discretized_state[0], discretized_state[1], action])
|
|
|
|
total_reward += reward
|
|
discretized_state = discretized_next_state
|
|
done = terminated or truncated
|
|
if terminated:
|
|
print("Destination reached on episode: ", episode_number)
|
|
return hyperparameters, env, q_table, done, discretized_state, total_reward
|
|
|
|
|
|
def episode_step(env, hyperparameters, q_table, episode_rewards, episode_number):
|
|
"""
|
|
Actions done with every episode
|
|
"""
|
|
state = env.reset() # Reset the environment to an initial state
|
|
discretized_state = discretize_state(state, env, True)
|
|
done = False # Boolean to indicate episode completion
|
|
total_reward = 0 # Accumulate rewards for the episode
|
|
|
|
for step in range(hyperparameters["max_steps"]):
|
|
hyperparameters, env, q_table, done, discretized_state, total_reward = movement(
|
|
hyperparameters, env, q_table, discretized_state, total_reward, episode_number)
|
|
if done:
|
|
break
|
|
|
|
episode_rewards.append(total_reward)
|
|
return env, hyperparameters, q_table, episode_rewards
|
|
|
|
|
|
def training_loop(hyperparameters, env, q_table):
|
|
"""
|
|
Actual training for MountainCar
|
|
"""
|
|
episode_rewards = [] # List to store episode rewards
|
|
|
|
for episode_number in range(hyperparameters["max_episodes"]):
|
|
env, hyperparameters, q_table, episode_rewards = episode_step(
|
|
env, hyperparameters, q_table, episode_rewards, episode_number)
|
|
|
|
return env, q_table
|
|
|
|
|
|
def inference(env, q_table):
|
|
"""
|
|
Inference using the updated Q-table
|
|
"""
|
|
env.reset()
|
|
done = False
|
|
|
|
while not done:
|
|
# Choose the action with the highest Q-value
|
|
action = np.argmax(q_table)
|
|
# Take the action and observe the next state
|
|
_, _, terminated, truncated, _ = env.step(action)
|
|
done = terminated or truncated
|
|
|
|
|
|
if __name__ == '__main__':
|
|
HYPERPARAMETERS = initialize_hyperparameters()
|
|
ENV = initialize_environment(HYPERPARAMETERS)
|
|
Q_TABLE = initialize_q_table(ENV)
|
|
ENV, Q_TABLE = training_loop(HYPERPARAMETERS, ENV, Q_TABLE)
|
|
inference(ENV, Q_TABLE)
|
|
|
|
ENV.close()
|