feat: implement positonal and velocity arguments

This commit is contained in:
Krzysztof Rudnicki 2023-05-21 21:16:58 +02:00
parent 135083a5a5
commit 8075421c49

76
main.py
View File

@ -5,6 +5,23 @@ from datetime import datetime
import gymnasium as gym
import numpy as np
# Helper function to discretize the state
def discretize_state(state, env, first_time):
# print(
# f"state: {state}, state[0]: {state[0]}, env.observation_space.low: {env.observation_space.low}")
# print(f"state[0] - env {state[0] - env.observation_space.low}")
# print(f"state - env {state - env.observation_space.low}")
if first_time:
substract_from_state = state[0] - env.observation_space.low
else:
substract_from_state = state - env.observation_space.low
discretized_state = (
substract_from_state) * np.array([10, 100])
discretized_state = np.round(discretized_state, 0).astype(int)
return discretized_state
def initialize_environment(hyperparameters):
"""
@ -18,9 +35,8 @@ def initialize_environment(hyperparameters):
env = gym.wrappers.RecordVideo(
env,
video_folder='vid',
episode_trigger=lambda x: x == hyperparameters["max_episodes"],
disable_logger=False,
name_prefix=time_string)
disable_logger=True,
name_prefix=time_string, episode_trigger=lambda x: x > 600 and x % 2 == 0)
return env
@ -29,11 +45,15 @@ def initialize_q_table(env):
Initialize "empty" Q-table
"""
# Initialize Q-table
n_actions = env.action_space.n # Number of possible actions, should be 3
# n_actions = env.action_space.n # Number of possible actions, should be 3
# 0 accelerate left
# 1 dont accelerate
# 2 accelerate to the right
q_table = np.zeros((n_actions,))
# q_table = np.zeros((n_actions,))
num_states = (env.observation_space.high -
env.observation_space.low) * np.array([10, 100])
num_states = np.round(num_states, 0).astype(int) + 1
q_table = np.zeros((num_states[0], num_states[1], env.action_space.n))
return q_table
@ -45,12 +65,17 @@ def initialize_hyperparameters():
"learning_rate": 0.1,
"discount_factor": 0.99,
"epsilon": 0.2,
"max_episodes": 1000
"max_episodes": 1000,
"max_steps": 500,
"min_max_car_position": [-1.2, 0.6],
"min_max_car_velocity": [-0.07, 0.07],
"goal_x": 0.5,
"truncation": 200
}
return hyperparameters
def choose_action(hyperparameters, env, q_table):
def choose_action(hyperparameters, env, q_table, discretized_state):
"""
Choose one of 3 actions possible for the algorithm
"""
@ -59,7 +84,7 @@ def choose_action(hyperparameters, env, q_table):
action = env.action_space.sample() # Choose a random action
else:
# Choose the action with the highest Q-value
action = np.argmax(q_table)
action = np.argmax(q_table[discretized_state[0], discretized_state[1]])
return action
@ -77,31 +102,40 @@ def update_q_table(q_table, action, hyperparameters, reward):
return q_table
def movement(hyperparameters, env, q_table, total_reward=0):
def movement(hyperparameters, env, q_table, discretized_state, total_reward=0, episode_number=0):
"""
Choose action and observe consequences
"""
action = choose_action(hyperparameters, env, q_table)
action = choose_action(hyperparameters, env, q_table, discretized_state)
# Take the action and observe the next state
_, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
q_table = update_q_table(q_table, action, hyperparameters, reward)
next_state, reward, terminated, truncated, _ = env.step(action)
discretized_next_state = discretize_state(next_state, env, False)
# print(discretized_next_state[0], discretized_next_state[1])
q_table[discretized_state[0], discretized_state[1], action] += hyperparameters["learning_rate"] * (reward + hyperparameters["discount_factor"] * np.max(
q_table[discretized_next_state[0], discretized_next_state[1]]) - q_table[discretized_state[0], discretized_state[1], action])
total_reward += reward
return hyperparameters, env, q_table, done, total_reward
discretized_state = discretized_next_state
done = terminated or truncated
if terminated:
print("Destination reached on episode: ", episode_number)
return hyperparameters, env, q_table, done, discretized_state, total_reward
def episode_step(env, hyperparameters, q_table, episode_rewards):
def episode_step(env, hyperparameters, q_table, episode_rewards, episode_number):
"""
Actions done with every episode
"""
env.reset() # Reset the environment to an initial state
state = env.reset() # Reset the environment to an initial state
discretized_state = discretize_state(state, env, True)
done = False # Boolean to indicate episode completion
total_reward = 0 # Accumulate rewards for the episode
while not done:
hyperparameters, env, q_table, done, total_reward = movement(
hyperparameters, env, q_table, total_reward)
for step in range(hyperparameters["max_steps"]):
hyperparameters, env, q_table, done, discretized_state, total_reward = movement(
hyperparameters, env, q_table, discretized_state, total_reward, episode_number)
if done:
break
episode_rewards.append(total_reward)
return env, hyperparameters, q_table, episode_rewards
@ -113,9 +147,9 @@ def training_loop(hyperparameters, env, q_table):
"""
episode_rewards = [] # List to store episode rewards
for _ in range(hyperparameters["max_episodes"]):
for episode_number in range(hyperparameters["max_episodes"]):
env, hyperparameters, q_table, episode_rewards = episode_step(
env, hyperparameters, q_table, episode_rewards)
env, hyperparameters, q_table, episode_rewards, episode_number)
return env, q_table