WUT_Computer_Science/Programming/EARIN/lab6/main.py

180 lines
6.0 KiB
Python
Raw Permalink Normal View History

2023-06-16 16:21:48 +02:00
"""
Code used to solve MountainCar-v0 gymnasium problem using Q-Learning algorithm
"""
from datetime import datetime
import gymnasium as gym
import numpy as np
# Helper function to discretize the state
def discretize_state(state, env, first_time):
# print(
# f"state: {state}, state[0]: {state[0]}, env.observation_space.low: {env.observation_space.low}")
# print(f"state[0] - env {state[0] - env.observation_space.low}")
# print(f"state - env {state - env.observation_space.low}")
if first_time:
substract_from_state = state[0] - env.observation_space.low
else:
substract_from_state = state - env.observation_space.low
discretized_state = (
substract_from_state) * np.array([10, 100])
discretized_state = np.round(discretized_state, 0).astype(int)
return discretized_state
def initialize_environment(hyperparameters):
"""
Initialize environment and video recording
"""
# Initialize environment
env = gym.make('MountainCar-v0', render_mode='rgb_array')
# Save video
now = datetime.now()
time_string = now.strftime("%H:%M:%S")
env = gym.wrappers.RecordVideo(
env,
video_folder='vid',
disable_logger=True,
name_prefix=time_string, episode_trigger=lambda x: x > 600 and x % 2 == 0)
return env
def initialize_q_table(env):
"""
Initialize "empty" Q-table
"""
# Initialize Q-table
# n_actions = env.action_space.n # Number of possible actions, should be 3
# 0 accelerate left
# 1 dont accelerate
# 2 accelerate to the right
# q_table = np.zeros((n_actions,))
num_states = (env.observation_space.high -
env.observation_space.low) * np.array([10, 100])
num_states = np.round(num_states, 0).astype(int) + 1
q_table = np.zeros((num_states[0], num_states[1], env.action_space.n))
return q_table
def initialize_hyperparameters():
"""
Initialize hyperparameters used by algorithm
"""
hyperparameters = {
"learning_rate": 0.1,
"discount_factor": 0.99,
"epsilon": 0.2,
"max_episodes": 1000,
"max_steps": 500,
"min_max_car_position": [-1.2, 0.6],
"min_max_car_velocity": [-0.07, 0.07],
"goal_x": 0.5,
"truncation": 200
}
return hyperparameters
def choose_action(hyperparameters, env, q_table, discretized_state):
"""
Choose one of 3 actions possible for the algorithm
"""
# hyperparameters["epsilon"]-greedy exploration-exploitation tradeoff
if np.random.uniform(0, 1) < hyperparameters["epsilon"]:
action = env.action_space.sample() # Choose a random action
else:
# Choose the action with the highest Q-value
action = np.argmax(q_table[discretized_state[0], discretized_state[1]])
return action
def update_q_table(q_table, action, hyperparameters, reward):
"""
Update q_table with newest reward
"""
# Q-table update
q_value = q_table[action]
max_q_value = np.max(q_table)
new_q_value = (1 - hyperparameters["learning_rate"]) * q_value + \
hyperparameters["learning_rate"] * \
(reward + hyperparameters["discount_factor"] * max_q_value)
q_table[action] = new_q_value
return q_table
def movement(hyperparameters, env, q_table, discretized_state, total_reward=0, episode_number=0):
"""
Choose action and observe consequences
"""
action = choose_action(hyperparameters, env, q_table, discretized_state)
# Take the action and observe the next state
next_state, reward, terminated, truncated, _ = env.step(action)
discretized_next_state = discretize_state(next_state, env, False)
# print(discretized_next_state[0], discretized_next_state[1])
q_table[discretized_state[0], discretized_state[1], action] += hyperparameters["learning_rate"] * (reward + hyperparameters["discount_factor"] * np.max(
q_table[discretized_next_state[0], discretized_next_state[1]]) - q_table[discretized_state[0], discretized_state[1], action])
total_reward += reward
discretized_state = discretized_next_state
done = terminated or truncated
if terminated:
print("Destination reached on episode: ", episode_number)
return hyperparameters, env, q_table, done, discretized_state, total_reward
def episode_step(env, hyperparameters, q_table, episode_rewards, episode_number):
"""
Actions done with every episode
"""
state = env.reset() # Reset the environment to an initial state
discretized_state = discretize_state(state, env, True)
done = False # Boolean to indicate episode completion
total_reward = 0 # Accumulate rewards for the episode
for step in range(hyperparameters["max_steps"]):
hyperparameters, env, q_table, done, discretized_state, total_reward = movement(
hyperparameters, env, q_table, discretized_state, total_reward, episode_number)
if done:
break
episode_rewards.append(total_reward)
return env, hyperparameters, q_table, episode_rewards
def training_loop(hyperparameters, env, q_table):
"""
Actual training for MountainCar
"""
episode_rewards = [] # List to store episode rewards
for episode_number in range(hyperparameters["max_episodes"]):
env, hyperparameters, q_table, episode_rewards = episode_step(
env, hyperparameters, q_table, episode_rewards, episode_number)
return env, q_table
def inference(env, q_table):
"""
Inference using the updated Q-table
"""
env.reset()
done = False
while not done:
# Choose the action with the highest Q-value
action = np.argmax(q_table)
# Take the action and observe the next state
_, _, terminated, truncated, _ = env.step(action)
done = terminated or truncated
if __name__ == '__main__':
HYPERPARAMETERS = initialize_hyperparameters()
ENV = initialize_environment(HYPERPARAMETERS)
Q_TABLE = initialize_q_table(ENV)
ENV, Q_TABLE = training_loop(HYPERPARAMETERS, ENV, Q_TABLE)
inference(ENV, Q_TABLE)
ENV.close()