单臂摆是强化学习的一个经典模型,本文采用了4种不同的算法来解决这个问题,使用Pytorch实现。
DQN:
参考:
算法思想:
https://mofanpy.com/tutorials/machine-learning/torch/DQN/
算法实现
https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
个人理解:DQN算法将Q学习和神经网络算法结合,解决了状态空间连续的问题。由于Q学习是off-policy的,所以需要target网络,即需要一个滞后版本的神经网络,防止一些并非最优的动作被采样之后,该动作的reward增加,之后就一直选择该非最优动作,从而影响学习的效率。由于神经网络的输入和Target要求独立同分布,所以采用ReplayBuffer和随机采样来解决这个问题。DQN的神经网络目标是让Q值预测的更准,所以loss是target和eval的完全平方差。
代码:
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import gym import random import numpy as np from collections import namedtuple GAMMA = 0.99 lr = 0.1 EPSION = 0.1 buffer_size = 10000 # replay池的大小 batch_size = 32 num_episode = 100000 target_update = 10 # 每过多少个episode将net的参数复制到target_net # 定义神经网络 class Net(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Net, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) self.Linear2 = nn.Linear(hidden_size, hidden_size) self.Linear3 = nn.Linear(hidden_size, output_size) def forward(self, x): # print('x: ', x) x = F.relu(self.Linear1(x)) x = F.relu(self.Linear2(x)) x = self.Linear3(x) return x # nametuple容器 Transition = namedtuple('Transition', ('state', 'action', 'reward', 'done', 'next_state')) class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size): # 采样 return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) class DQN(object): def __init__(self, input_size, hidden_size, output_size): self.net = Net(input_size, hidden_size, output_size) self.target_net = Net(input_size, hidden_size, output_size) self.optim = optim.Adam(self.net.parameters(), lr=lr) self.target_net.load_state_dict(self.net.state_dict()) self.buffer = ReplayMemory(buffer_size) self.loss_func = nn.MSELoss() self.steps_done = 0 def put(self, s0, a0, r, t, s1): self.buffer.push(s0, a0, r, t, s1) def select_action(self, state): eps_threshold = random.random() action = self.net(torch.Tensor(state)) if eps_threshold > EPSION: choice = torch.argmax(action).numpy() else: choice = np.random.randint(0, action.shape[ 0]) # 随机[0, action.shape[0]]之间的数 return choice def update_parameters(self): if self.buffer.__len__() < batch_size: return samples = self.buffer.sample(batch_size) batch = Transition(*zip(*samples)) # 将tuple转化为numpy tmp = np.vstack(batch.action) # 转化成Tensor state_batch = torch.Tensor(batch.state) action_batch = torch.LongTensor(tmp.astype(int)) reward_batch = torch.Tensor(batch.reward) done_batch = torch.Tensor(batch.done) next_state_batch = torch.Tensor(batch.next_state) q_next = torch.max(self.target_net(next_state_batch).detach(), dim=1, keepdim=True)[0] q_eval = self.net(state_batch).gather(1, action_batch) q_tar = reward_batch.unsqueeze(1) + (1-done_batch) * GAMMA * q_next loss = self.loss_func(q_eval, q_tar) # print(loss) self.optim.zero_grad() loss.backward() self.optim.step() if __name__ == '__main__': env = gym.make('CartPole-v0') # 状态空间:4维 # 动作空间:1维,并且是离散的,只有0和1两个动作 Agent = DQN(env.observation_space.shape[0], 256, env.action_space.n) average_reward = 0 # 目前所有的episode的reward的平均值 for i_episode in range(num_episode): s0 = env.reset() tot_reward = 0 # 每个episode的总reward tot_time = 0 # 实际每轮运行的时间 (reward的定义可能不一样) while True: env.render() a0 = Agent.select_action(s0) s1, r, done, _ = env.step(a0) tot_time += r # 计算当前episode的总时间 # 网上定义的reward方法 # x, x_dot, theta, theta_dot = s1 # r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8 # r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5 # r = r1 + r2 tot_reward += r # 计算当前episode的总reward if done: t = 1 else: t = 0 Agent.put(s0, a0, r, t, s1) # 放入replay池 s0 = s1 Agent.update_parameters() if done: average_reward = average_reward + 1 / (i_episode + 1) * ( tot_reward - average_reward) print('Episode ', i_episode, 'tot_time: ', tot_time, ' tot_reward: ', tot_reward, ' average_reward: ', average_reward) break if i_episode % target_update == 0: Agent.target_net.load_state_dict(Agent.net.state_dict())
有一个点需要注意,网上有些DQN的实现没有考虑终止状态,所以需要修改Reward才能达到好的效果。在考虑终止状态后,使用原始的reward就可以学习。
Reinforce:
参考:
思路及代码:
https://blog.csdn.net/qq_37266917/article/details/109855244
个人理解:
Reinforce是一种策略梯度算法,对参数化的策略梯度算法进行梯度上升。需要注意网络不能太复杂,不然会过拟合导致很难学习。通过策略梯度定理,我们知道了怎么进行梯度上升。概率前面的回报可以看成梯度上升的幅度,即回报越大提升的概率也越多。所以在Policy Gradient中引入的基线(baseline),以防止某些非最优的动作被选择之后概率变得过大(虽然在样本足够多的时候这个问题也能解决)
神经网络的loss是 t时刻的回报 * t时刻的动作的概率取对数。以及要取负,因为神经网络是梯度下降,最小化loss,取负数就是最大化回报。
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import gym import random import numpy as np from torch.distributions import Categorical from collections import deque from collections import namedtuple GAMMA = 1.0 lr = 0.1 EPSION = 0.9 buffer_size = 10000 batch_size = 32 num_episode = 100000 target_update = 10 EPS_START = 0.9 EPS_END = 0.05 EPS_DECAY = 200 class Policy(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Policy, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) self.Linear1.weight.data.normal_(0, 0.1) # self.Linear2 = nn.Linear(hidden_size, hidden_size) # self.Linear2.weight.data.normal_(0, 0.1) self.Linear3 = nn.Linear(hidden_size, output_size) self.Linear3.weight.data.normal_(0, 0.1) def forward(self, x): x = F.relu(self.Linear1(x)) # x = F.relu(self.Linear2(x)) x = F.softmax(self.Linear3(x), dim=1) return x # x = F.relu(self.fc1(x)) # x = self.fc2(x) # return F.softmax(x, dim=1) class Reinforce(object): def __init__(self, input_size, hidden_size, output_size): self.net = Policy(input_size, hidden_size, output_size) self.optim = optim.Adam(self.net.parameters(), lr=0.01) def select_action(self, s): s = torch.Tensor(s).unsqueeze(0) probs = self.net(s) tmp = Categorical(probs) a = tmp.sample() log_prob = tmp.log_prob(a) return a.item(), log_prob def update_parameters(self, rewards, log_probs): R = 0 loss = 0 # for i in reversed(range(len(rewards))): # R = rewards[i] + GAMMA * R for i in reversed(range(len(rewards))): R = rewards[i] + GAMMA * R loss = loss - R * log_probs[i] # discounts = [GAMMA ** i for i in range(len(rewards) + 1)] # R = sum([a * b for a, b in zip(discounts, rewards)]) # policy_loss = [] # for log_prob in log_probs: # policy_loss.append(-log_prob * R) # loss = torch.cat(policy_loss).sum() # print('loss: ', len(loss)) # loss = loss / len(loss) self.optim.zero_grad() loss.backward() self.optim.step() if __name__ == '__main__': env = gym.make('CartPole-v0') average_reward = 0 Agent = Reinforce(env.observation_space.shape[0], 16, env.action_space.n) # scores_deque = deque(maxlen=100) # scores = [] for i_episode in range(1, num_episode + 1): s = env.reset() log_probs = [] rewards = [] while True: env.render() a, prob = Agent.select_action(s) s1, r, done, _ = env.step(a) # scores_deque.append(sum(rewards)) # scores.append(sum(rewards)) log_probs.append(prob) rewards.append(r) s = s1 if done: average_reward = average_reward + (1 / (i_episode + 1)) * (np.sum(rewards) - average_reward) if i_episode % 100 == 0: # print('Episode {}\t Average Score: {:.2f}'.format(i_episode, np.mean(scores_deque))) print('episode: ', i_episode, "tot_rewards: ", np.sum(rewards), 'average_rewards: ', average_reward) break Agent.update_parameters(rewards, log_probs)
DDPG:
参考:
思路:https://www.cnblogs.com/pinard/p/10345762.html
实现:https://zhuanlan.zhihu.com/p/99406809
个人理解:DDPG算法采用了Actor-Critic框架,像是DQN和Policy Gradient的结合。在DDPG中,Actor输出的是一个具体的动作,而不是动作的概率分布,Critic输出的是动作的Q值。Actor和Critic都需要一个Tareget网络,需要ReplayBuffer打破相关性。网上我没找到用DDPG和Pytorch解决单臂杆问题的代码,所以我的解决方法可能不是最好的。因为单臂杆的动作是离散的2个(0,1),最开始我给Actor设置了2个输出并用argmax决定是哪个。后面发现argmax没有梯度,于是我将输出改为了一个,并套了一层sigmoid,输出小于0.5当0算,大于0.5当1算。Critic的loss和DQN类似,都是target和eval的完全平方差。Actor的loss需要自己先输出a,再用critic得到估值,求平均值,再取负。
代码:
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import gym import random import numpy as np from collections import namedtuple import math GAMMA = 0.9 lr = 0.1 EPSION = 0.9 buffer_size = 10000 batch_size = 32 num_episode = 100000 target_update = 10 EPS_START = 0.9 EPS_END = 0.05 EPS_DECAY = 200 tau = 0.02 #定义神经网络 class Actor(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Actor, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) self.Linear1.weight.data.normal_(0, 0.1) # self.Linear2 = nn.Linear(hidden_size, hidden_size) # self.Linear2.weight.data.normal_(0, 0.1) self.Linear3 = nn.Linear(hidden_size, output_size) self.Linear3.weight.data.normal_(0, 0.1) def forward(self, x): # print('x: ', x) x = F.relu(self.Linear1(x)) # x = F.relu(self.Linear2(x)) x = torch.sigmoid(self.Linear3(x)) return x class Critic(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(Critic, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) self.Linear1.weight.data.normal_(0, 0.1) # self.Linear2 = nn.Linear(hidden_size, hidden_size) # self.Linear2.weight.data.normal_(0, 0.1) self.Linear3 = nn.Linear(hidden_size, output_size) self.Linear3.weight.data.normal_(0, 0.1) def forward(self, s, a): # print('s1: ', s) # print('a1: ', a) x = torch.cat([s, a], dim=1) # print('x: ', x) x = F.relu(self.Linear1(x)) # x = F.relu(self.Linear2(x)) x = self.Linear3(x) return x #nametuple容器 Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done')) class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size):#采样 return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) class DDPG(object): def __init__(self, input_size, action_shape, hidden_size, output_size): self.actor = Actor(input_size, hidden_size, action_shape) self.actor_target = Actor(input_size, hidden_size, action_shape) self.critic = Critic(input_size + action_shape, hidden_size, action_shape) self.critic_target = Critic(input_size + action_shape, hidden_size, action_shape) self.actor_optim = optim.Adam(self.actor.parameters(), lr=0.01) self.critic_optim = optim.Adam(self.critic.parameters(), lr=0.01) self.actor_target.load_state_dict(self.actor.state_dict()) self.critic_target.load_state_dict(self.critic.state_dict()) self.buffer = ReplayMemory(buffer_size) self.loss_func = nn.MSELoss() self.steps_done = 0 def put(self, s0, a0, r, s1, done): self.buffer.push(s0, a0, r, s1, done) def select_action(self, state): state = torch.Tensor(state) a = self.actor(state) return a def update_parameters(self): if self.buffer.__len__() < batch_size: return samples = self.buffer.sample(batch_size) batch = Transition(*zip(*samples)) # print(batch.action) #将tuple转化为numpy # tmp = np.vstack(batch.action) # print(tmp) #转化成Tensor state_batch = torch.Tensor(batch.state) action_batch = torch.Tensor(batch.action).unsqueeze(0).view(-1, 1) reward_batch = torch.Tensor(batch.reward) next_state_batch = torch.Tensor(batch.next_state) done_batch = torch.Tensor(batch.done) #critic更新 next_action_batch = self.actor_target(next_state_batch).unsqueeze(0).detach().view(-1, 1) # print('batch: ', next_action_batch) r_eval = self.critic(state_batch, action_batch) # print('s: ', next_state_batch) # print('a: ', next_action_batch) r_target = reward_batch + GAMMA * self.critic_target(next_state_batch, next_action_batch).detach().view(1, -1) * done_batch r_eval = torch.squeeze(r_eval) r_target = torch.squeeze(r_target) loss = self.loss_func(r_eval, r_target) self.critic_optim.zero_grad() loss.backward() self.critic_optim.step() #actor更新 a = self.actor(state_batch).unsqueeze(0).view(-1, 1) # print('a: ', a) loss = -torch.mean(self.critic(state_batch, a)) self.actor_optim.zero_grad() loss.backward() # print('a: ', a) self.actor_optim.step() #soft update def soft_update(net_target, net): for target_param, param in zip(net_target.parameters(), net.parameters()): target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau) soft_update(self.actor_target, self.actor) soft_update(self.critic_target, self.critic) if __name__ == '__main__': env = gym.make('CartPole-v0') Agent = DDPG(env.observation_space.shape[0], 1, 16, env.action_space.n) average_reward = 0 for i_episode in range(num_episode): s0 = env.reset() tot_reward = 0 tot_time = 0 while True: env.render() a0 = Agent.select_action(s0) s1, r, done, _ = env.step(round(a0.detach().numpy()[0])) tot_time += r tot_reward += r Agent.put(s0, a0, r, s1, 1 - done) #结束状态很重要,不然会很难学习。 s0 = s1 Agent.update_parameters() if done: average_reward = average_reward + 1 / (i_episode + 1) * (tot_time - average_reward) # if i_episode % 100 == 0: print('Episode ', i_episode, 'tot_time: ', tot_time, ' tot_reward: ', tot_reward, ' average_reward: ', average_reward) break # if i_episode % target_update == 0: # Agent.target_net.load_state_dict(Agent.net.state_dict())
PPO:
参考:
PPO算法流程及思想:
https://blog.csdn.net/qq_30615903/article/details/86308045
https://www.jianshu.com/p/9f113adc0c50
PPO算法的实现:
https://blog.csdn.net/weixin_42165585/article/details/112362125
个人理解:
PPO算法也是Actor-Critic架构,但是与DDPG不同,PPO为on-policy算法,所以不需要设计target网络,也不需要ReplayBuffer, 并且Actor和Critic的网络参数可以共享以便加快学习。PPO引入了重要度采样,使得每个episode的数据可以被多训练几次(实际的情况中,采样可能非常耗时)从而节省时间,clip保证的更新的幅度不会太大。
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from collections import namedtuple import random import gym import math lr = 0.0005 Capacity = 10000 num_epidose = 10000 Gamma = 0.98 lmbda = 0.95 eps_clip = 0.1 class Net(nn.Module): def __init__(self, input_size,hidden_size, output_size): super(Net, self).__init__() self.Linear1 = nn.Linear(input_size, hidden_size) # self.Linear2 = nn.Linear(hidden_size, hidden_size) self.Linear_actor = nn.Linear(hidden_size, output_size) self.Linear_critic = nn.Linear(hidden_size, 1) def actor_forward(self, s, dim): s = F.relu(self.Linear1(s)) prob = F.softmax(self.Linear_actor(s), dim=dim) # print(prob) return prob def critic_forward(self, s): s = F.relu(self.Linear1(s)) # s = F.relu(self.Linear2(s)) return self.Linear_critic(s) Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'rate', 'done')) class ReplayBuffer(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size):#采样 return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) def clean(self): self.position = 0 self.memory = [] class PPO(object): def __init__(self, input_size, hidden_size, output_size): super(PPO, self).__init__() self.net = Net(input_size, hidden_size, output_size) self.optim = optim.Adam(self.net.parameters(), lr=lr) self.buffer = ReplayBuffer(capacity=Capacity) def act(self, s, dim): s = torch.Tensor(s) prob = self.net.actor_forward(s, dim) return prob def critic(self, s): return self.net.critic_forward(s) def put(self, s0, a0, r, s1, rate, done): self.buffer.push(s0, a0, r, s1, rate, done) def make_batch(self): batch = self.buffer.memory samples = self.buffer.memory batch = Transition(*zip(*samples)) state_batch = torch.Tensor(batch.state).view(-1, 1) action_batch = torch.LongTensor(batch.action).view(-1, 1) reward_batch = torch.Tensor(batch.reward).view(-1, 1) next_state_batch = torch.Tensor(batch.next_state) rate_batch = torch.Tensor(batch.rate).view(-1, 1) done_batch = torch.LongTensor(batch.done).view(-1, 1) return state_batch, action_batch, reward_batch, next_state_batch, done_batch, rate_batch def update_parameters(self): samples = self.buffer.memory batch = Transition(*zip(*samples)) batch = self.buffer.memory samples = self.buffer.memory batch = Transition(*zip(*samples)) state_batch = torch.Tensor(batch.state) action_batch = torch.LongTensor(batch.action).view(-1, 1) reward_batch = torch.Tensor(batch.reward).view(-1, 1) next_state_batch = torch.Tensor(batch.next_state) rate_batch = torch.Tensor(batch.rate).view(-1, 1) done_batch = torch.LongTensor(batch.done).view(-1, 1) for i in range(3): td_target = reward_batch + Gamma * self.critic(next_state_batch) * done_batch delta = td_target - self.critic(state_batch) delta = delta.detach().numpy() advantage_list = [] advantage = 0.0 for delta_t in delta[::-1]: advantage = Gamma * advantage + delta_t advantage_list.append(advantage) advantage_list.reverse() advantage = torch.Tensor(advantage_list) prob = self.act(state_batch, 1).squeeze(0) prob_a = prob.gather(1, action_batch.view(-1, 1)) ratio = torch.exp(torch.log(prob_a) - torch.log(rate_batch)) surr1 = ratio * advantage surr2 = torch.clamp(ratio, 1 - eps_clip, 1 + eps_clip) * advantage loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.critic(state_batch), td_target.detach()) self.optim.zero_grad() loss.mean().backward() self.optim.step() if __name__ == '__main__': env = gym.make('CartPole-v0') Agent = PPO(env.observation_space.shape[0], 256, env.action_space.n) average_reward = 0 for i_episode in range(num_epidose): s0 = env.reset() tot_reward = 0 while True: env.render() prob = Agent.act(torch.from_numpy(s0).float(), 0) a0 = int(prob.multinomial(1)) s1, r, done, _ = env.step(a0) rate = prob[a0].item() Agent.put(s0, a0, r, s1, rate, 1 - done) s0 = s1 tot_reward += r if done: average_reward = average_reward + 1 / (i_episode + 1) * ( tot_reward - average_reward) if i_episode % 20 == 0: print('Episode ', i_episode, ' tot_reward: ', tot_reward, ' average_reward: ', average_reward) break # Agent.train_net() Agent.update_parameters() Agent.buffer.clean()
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:强化学习 单臂摆(CartPole) (DQN, Reinforce, DDPG, PPO)Pytorch - Python技术站