日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【分层强化学习】HAC源码解读

發布時間:2024/3/12 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【分层强化学习】HAC源码解读 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

論文: Learning Multi-Level Hierarchies with Hindsight
代碼github地址:https://github.com/nikhilbarhate99/Hierarchical-Actor-Critic-HAC-PyTorch

HAC理論部分:https://blog.csdn.net/qq_47997583/article/details/126013369
算法偽碼:
簡而言之就是在原理DDPG算法的基礎上,加入分層結構,為了提高不同層之間非平穩性不能并行訓練的問題,在分層結構上加入hindsight的思路修改擴充經驗回放池中的數據進行訓練。


文件結構如上圖所示,其中asset文件夾中為環境相關代碼,gif文件夾下為效果圖,preTrained文件夾下保存訓練出的參數文件,DDPG.py文件中是單層加入goal的DDPG算法實現,HAC.py為核心的HAC算法的實現,train.py中是整個項目訓練的主文件,utils.py中是經驗回放池的實現代碼,test.py中是測試的代碼。

接下來通過debug代碼走一遍訓練的過程了解HAC的實現過程
首先是一些訓練的參數設定:

下面定義了action的bounds和offset、state的bounds和offset、action和goal的探索噪聲、目標狀態、閾值。在本項目中設定的goal為position是0.48±0.01,velocity是0.04±0.02。

關于bounds和offset的設置,作者在issue中進行了解釋:https://github.com/nikhilbarhate99/Hierarchical-Actor-Critic-HAC-PyTorch/issues/2
本項目中環境為MountainCarContinuous-h-v1,其中
狀態空間為連續的二維信息,
self.observation_space = spaces.Box(low=self.low_state, high=self.high_state, dtype=np.float32)
[position, velocity] between min value = [-1.2, -0.07] and max value = [0.6, 0.07],
動作空間為連續的一維信息,
self.action_space = spaces.Box(low=self.min_action, high=self.max_action,shape=(1,), dtype=np.float32)
the action space is between [-1, 1]

我的理解是在本項目應用的環境中action和state空間不都是[-1,1]之間,神經網絡最后一層是Tanh激活函數會將其歸一化到[-1,1]之間,因此需要進行一些修改。
作者使用以下公式實現
action = ( network output (Tanh) * bounds ) + offset
對于action space:
the action space is between (-1, 1), and as the mean value [ (1 + (-1)) / 2 ] is 0 we do not require an offset, and the value of bound = 1, since our network only outputs between (-1, 1), so,
action = ( network output (Tanh) * bounds ) + offset
i.e action = (network output * 1) + 0
對于state space:
here the position variable (-1.2, 0.6) is NOT normalised to (-1,1) and its mean value
[ (0.6 + (-1.2)) / 2 ] is 0.3
action = ( network output (Tanh) * bounds ) + offset
for position variable:
action = (network output * 0.9) + 0.3
this bounds the value of the action to (-1.2, 0.6)

similarly, the velocity variable (-0.07, 0.07) is NOT normalised to (-1,1) and its mean value [ (0.6 + (-1.2)) / 2 ] is 0, so,for velocity variable:
action = (network output * 0.07) + 0
this bounds the value of the action to (-0.07, 0.07)

接下來HAC算法、DDPG算法以及模型參數文件保存路徑的設定

接下來創建HAC agent并設置相關參數,前面定義了這是一個兩層的網絡,因此HAC類下的HAC屬性為一個包含兩個DDPG的列表。

class HAC:def __init__(self, k_level, H, state_dim, action_dim, render, threshold, action_bounds, action_offset, state_bounds, state_offset, lr):# adding lowest levelself.HAC = [DDPG(state_dim, action_dim, action_bounds, action_offset, lr, H)]self.replay_buffer = [ReplayBuffer()]# adding remaining levelsfor _ in range(k_level-1):self.HAC.append(DDPG(state_dim, state_dim, state_bounds, state_offset, lr, H))self.replay_buffer.append(ReplayBuffer())# set some parametersself.k_level = k_levelself.H = Hself.action_dim = action_dimself.state_dim = state_dimself.threshold = thresholdself.render = render# logging parametersself.goals = [None]*self.k_levelself.reward = 0self.timestep = 0def set_parameters(self, lamda, gamma, action_clip_low, action_clip_high, state_clip_low, state_clip_high, exploration_action_noise, exploration_state_noise):self.lamda = lamdaself.gamma = gammaself.action_clip_low = action_clip_lowself.action_clip_high = action_clip_highself.state_clip_low = state_clip_lowself.state_clip_high = state_clip_highself.exploration_action_noise = exploration_action_noiseself.exploration_state_noise = exploration_state_noise

接下來是整個訓練過程

# training procedure for i_episode in range(1, max_episodes+1):agent.reward = 0agent.timestep = 0state = env.reset()# collecting experience in environmentlast_state, done = agent.run_HAC(env, k_level-1, state, goal_state, False)if agent.check_goal(last_state, goal_state, threshold):print("################ Solved! ################ ")name = filename + '_solved'agent.save(directory, name)# update all levelsagent.update(n_iter, batch_size)# logging updates:log_f.write('{},{}\n'.format(i_episode, agent.reward))log_f.flush()if i_episode % save_episode == 0:agent.save(directory, filename)print("Episode: {}\t Reward: {}".format(i_episode, agent.reward))

在上面訓練代碼中有三行比較核心的代碼,接下來逐一分析

1.跑一層HAC

last_state, done = agent.run_HAC(env, k_level-1, state, goal_state, False) def run_HAC(self, env, i_level, state, goal, is_subgoal_test):next_state = Nonedone = Nonegoal_transitions = []# logging updatesself.goals[i_level] = goal# H attemptsfor _ in range(self.H):# if this is a subgoal test, then next/lower level goal has to be a subgoal testis_next_subgoal_test = is_subgoal_testaction = self.HAC[i_level].select_action(state, goal)# <================ high level policy ================>if i_level > 0:# add noise or take random action if not subgoal testingif not is_subgoal_test:if np.random.random_sample() > 0.2: # p=0.8 exploratopmaction = action + np.random.normal(0, self.exploration_state_noise)action = action.clip(self.state_clip_low, self.state_clip_high)else: # p=0.2 random sampleaction = np.random.uniform(self.state_clip_low, self.state_clip_high)# Determine whether to test subgoal (action)if np.random.random_sample() < self.lamda:is_next_subgoal_test = True# Pass subgoal to lower level next_state, done = self.run_HAC(env, i_level-1, state, action, is_next_subgoal_test)# if subgoal was tested but not achieved, add subgoal testing transitionif is_next_subgoal_test and not self.check_goal(action, next_state, self.threshold):self.replay_buffer[i_level].add((state, action, -self.H, next_state, goal, 0.0, float(done)))# for hindsight action transitionaction = next_state# <================ low level policy ================>else:# add noise or take random action if not subgoal testingif not is_subgoal_test:if np.random.random_sample() > 0.2:action = action + np.random.normal(0, self.exploration_action_noise)action = action.clip(self.action_clip_low, self.action_clip_high)else:action = np.random.uniform(self.action_clip_low, self.action_clip_high)# take primitive actionnext_state, rew, done, _ = env.step(action)if self.render:# env.render() ##########if self.k_level == 2:env.unwrapped.render_goal(self.goals[0], self.goals[1])elif self.k_level == 3:env.unwrapped.render_goal_2(self.goals[0], self.goals[1], self.goals[2])# this is for loggingself.reward += rewself.timestep +=1# <================ finish one step/transition ================># check if goal is achievedgoal_achieved = self.check_goal(next_state, goal, self.threshold)# hindsight action transitionif goal_achieved:self.replay_buffer[i_level].add((state, action, 0.0, next_state, goal, 0.0, float(done)))else:self.replay_buffer[i_level].add((state, action, -1.0, next_state, goal, self.gamma, float(done)))# copy for goal transitiongoal_transitions.append([state, action, -1.0, next_state, None, self.gamma, float(done)])state = next_stateif done or goal_achieved:break# <================ finish H attempts ================># hindsight goal transition# last transition reward and discount is 0goal_transitions[-1][2] = 0.0goal_transitions[-1][5] = 0.0for transition in goal_transitions:# last state is goal for all transitionstransition[4] = next_stateself.replay_buffer[i_level].add(tuple(transition))return next_state, done

2.檢查goal是否實現

agent.check_goal(last_state, goal_state, threshold) def check_goal(self, state, goal, threshold):for i in range(self.state_dim):if abs(state[i]-goal[i]) > threshold[i]:return Falsereturn True

3.更新agent

agent.update(n_iter, batch_size)

HAC.py

def update(self, n_iter, batch_size):for i in range(self.k_level):self.HAC[i].update(self.replay_buffer[i], n_iter, batch_size)

DDPG.py

def update(self, buffer, n_iter, batch_size):for i in range(n_iter):# Sample a batch of transitions from replay buffer:state, action, reward, next_state, goal, gamma, done = buffer.sample(batch_size)# convert np arrays into tensorsstate = torch.FloatTensor(state).to(device)action = torch.FloatTensor(action).to(device)reward = torch.FloatTensor(reward).reshape((batch_size,1)).to(device)next_state = torch.FloatTensor(next_state).to(device)goal = torch.FloatTensor(goal).to(device)gamma = torch.FloatTensor(gamma).reshape((batch_size,1)).to(device)done = torch.FloatTensor(done).reshape((batch_size,1)).to(device)# select next actionnext_action = self.actor(next_state, goal).detach()# Compute target Q-value:target_Q = self.critic(next_state, next_action, goal).detach()target_Q = reward + ((1-done) * gamma * target_Q)# Optimize Critic:critic_loss = self.mseLoss(self.critic(state, action, goal), target_Q)self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# Compute actor loss:actor_loss = -self.critic(state, self.actor(state, goal), goal).mean()# Optimize the actorself.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()

未完待續

總結

以上是生活随笔為你收集整理的【分层强化学习】HAC源码解读的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。