diff --git a/main.py b/main.py index 6e0d98ed..3dde8d0a 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,23 @@ from datetime import datetime import gymnasium as gym import numpy as np +# Helper function to discretize the state + + +def discretize_state(state, env, first_time): + # print( + # f"state: {state}, state[0]: {state[0]}, env.observation_space.low: {env.observation_space.low}") + # print(f"state[0] - env {state[0] - env.observation_space.low}") + # print(f"state - env {state - env.observation_space.low}") + if first_time: + substract_from_state = state[0] - env.observation_space.low + else: + substract_from_state = state - env.observation_space.low + discretized_state = ( + substract_from_state) * np.array([10, 100]) + discretized_state = np.round(discretized_state, 0).astype(int) + return discretized_state + def initialize_environment(hyperparameters): """ @@ -18,9 +35,8 @@ def initialize_environment(hyperparameters): env = gym.wrappers.RecordVideo( env, video_folder='vid', - episode_trigger=lambda x: x == hyperparameters["max_episodes"], - disable_logger=False, - name_prefix=time_string) + disable_logger=True, + name_prefix=time_string, episode_trigger=lambda x: x > 600 and x % 2 == 0) return env @@ -29,11 +45,15 @@ def initialize_q_table(env): Initialize "empty" Q-table """ # Initialize Q-table - n_actions = env.action_space.n # Number of possible actions, should be 3 + # n_actions = env.action_space.n # Number of possible actions, should be 3 # 0 accelerate left # 1 dont accelerate # 2 accelerate to the right - q_table = np.zeros((n_actions,)) + # q_table = np.zeros((n_actions,)) + num_states = (env.observation_space.high - + env.observation_space.low) * np.array([10, 100]) + num_states = np.round(num_states, 0).astype(int) + 1 + q_table = np.zeros((num_states[0], num_states[1], env.action_space.n)) return q_table @@ -45,12 +65,17 @@ def initialize_hyperparameters(): "learning_rate": 0.1, "discount_factor": 0.99, "epsilon": 0.2, - "max_episodes": 1000 + "max_episodes": 1000, + "max_steps": 500, + "min_max_car_position": [-1.2, 0.6], + "min_max_car_velocity": [-0.07, 0.07], + "goal_x": 0.5, + "truncation": 200 } return hyperparameters -def choose_action(hyperparameters, env, q_table): +def choose_action(hyperparameters, env, q_table, discretized_state): """ Choose one of 3 actions possible for the algorithm """ @@ -59,7 +84,7 @@ def choose_action(hyperparameters, env, q_table): action = env.action_space.sample() # Choose a random action else: # Choose the action with the highest Q-value - action = np.argmax(q_table) + action = np.argmax(q_table[discretized_state[0], discretized_state[1]]) return action @@ -77,31 +102,40 @@ def update_q_table(q_table, action, hyperparameters, reward): return q_table -def movement(hyperparameters, env, q_table, total_reward=0): +def movement(hyperparameters, env, q_table, discretized_state, total_reward=0, episode_number=0): """ Choose action and observe consequences """ - action = choose_action(hyperparameters, env, q_table) + action = choose_action(hyperparameters, env, q_table, discretized_state) # Take the action and observe the next state - _, reward, terminated, truncated, _ = env.step(action) - done = terminated or truncated - q_table = update_q_table(q_table, action, hyperparameters, reward) + next_state, reward, terminated, truncated, _ = env.step(action) + discretized_next_state = discretize_state(next_state, env, False) + # print(discretized_next_state[0], discretized_next_state[1]) + q_table[discretized_state[0], discretized_state[1], action] += hyperparameters["learning_rate"] * (reward + hyperparameters["discount_factor"] * np.max( + q_table[discretized_next_state[0], discretized_next_state[1]]) - q_table[discretized_state[0], discretized_state[1], action]) total_reward += reward - return hyperparameters, env, q_table, done, total_reward + discretized_state = discretized_next_state + done = terminated or truncated + if terminated: + print("Destination reached on episode: ", episode_number) + return hyperparameters, env, q_table, done, discretized_state, total_reward -def episode_step(env, hyperparameters, q_table, episode_rewards): +def episode_step(env, hyperparameters, q_table, episode_rewards, episode_number): """ Actions done with every episode """ - env.reset() # Reset the environment to an initial state + state = env.reset() # Reset the environment to an initial state + discretized_state = discretize_state(state, env, True) done = False # Boolean to indicate episode completion total_reward = 0 # Accumulate rewards for the episode - while not done: - hyperparameters, env, q_table, done, total_reward = movement( - hyperparameters, env, q_table, total_reward) + for step in range(hyperparameters["max_steps"]): + hyperparameters, env, q_table, done, discretized_state, total_reward = movement( + hyperparameters, env, q_table, discretized_state, total_reward, episode_number) + if done: + break episode_rewards.append(total_reward) return env, hyperparameters, q_table, episode_rewards @@ -113,9 +147,9 @@ def training_loop(hyperparameters, env, q_table): """ episode_rewards = [] # List to store episode rewards - for _ in range(hyperparameters["max_episodes"]): + for episode_number in range(hyperparameters["max_episodes"]): env, hyperparameters, q_table, episode_rewards = episode_step( - env, hyperparameters, q_table, episode_rewards) + env, hyperparameters, q_table, episode_rewards, episode_number) return env, q_table