-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathACDir.py
122 lines (87 loc) · 3.5 KB
/
ACDir.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
print("Start file")
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
#from pytorch_lightning.core.lightning import LightningModule
print("Imported all")
class ActorCriticDiscrete(nn.Module):
def __init__(self, num_inputs=1, num_actions=10, hidden_size=128, learning_rate=3e-4):
super(ActorCriticDiscrete, self).__init__()
self.affine = nn.Linear(num_inputs, hidden_size)
self.action_layer = nn.Linear(hidden_size, num_actions)
self.value_layer = nn.Linear(hidden_size, 1)
self.action_layer.weight.data.fill_(1/hidden_size)
self.action_layer.bias.data.fill_(1/hidden_size)
self.logprobs = []
self.state_values = []
self.rewards = []
def forward(self, state):
Delta = state.Deltas
OR = state.O_R
Delta = Delta.reshape((1,)).float()
Delta = F.relu(self.affine(Delta))
#abs_O_R = torch.abs(O_R)
state_value = self.value_layer(Delta)
action_probs = F.softmax(self.action_layer(Delta))
print("action_probs", "%.2f "*2 % tuple(action_probs.tolist()))
action_distribution = Categorical(action_probs)
action = action_distribution.sample()
self.logprobs.append(action_distribution.log_prob(action))
#print("action_distribution.log_prob(action)", action_distribution.log_prob(action))
self.state_values.append(state_value)
return action.item()
def calculateLoss_old(self, gamma=0.99):
# calculating discounted rewards:
rewards = []
dis_reward = 0
for reward in self.rewards[::-1]:
dis_reward = reward + gamma * dis_reward
rewards.insert(0, dis_reward)
#print("self.rewards", self.rewards)
#print("rewards befo", rewards)
# normalizing the rewards:
rewards = torch.tensor(rewards).reshape((-1,1)).float()
rewards = (rewards - rewards.mean()) / (rewards.std())
#print("rewards afte", rewards)
#rewards /= rewards.std()
loss = torch.tensor(0.).reshape((1,))
for logprob, value, reward in zip(self.logprobs, self.state_values, rewards):
advantage = reward - value.item()
action_loss = -logprob * advantage
value_loss = F.smooth_l1_loss(value, reward)
loss += (action_loss + value_loss)
return loss
def calculateLoss(self, gamma=0.995):
# calculating discounted rewards:
# normalizing the rewards:
rewards = torch.tensor(self.rewards).reshape((-1,1)).float()
#rewards = (rewards - rewards.mean()) / (rewards.std())
rewards /= rewards.std()
loss = 0#torch.tensor(0.).reshape((1,))
for logprob, value, reward in zip(self.logprobs, self.state_values, rewards):
advantage = reward - value.item()
action_loss = -logprob * advantage
value_loss = F.smooth_l1_loss(value, reward)
loss += (action_loss + value_loss)
return loss
def calculateLoss_new(self, gamma=0.99):
rewards = torch.tensor(self.rewards, requires_grad=True).float().flip(0)
tril = torch.arange(len(rewards)).repeat(len(rewards), 1).T - torch.arange(len(rewards))
tril = torch.tril(gamma**tril, 0)
rewards = torch.mv(tril, rewards).flip(0)#.tolist()
# normalizing the rewards:
rewards = (rewards - rewards.mean()) / (rewards.std())
#print("rewards afte", rewards)
#rewards /= rewards.std()
logprobs = torch.tensor(self.logprobs, requires_grad=True)
state_values = torch.tensor(self.state_values, requires_grad=True)
advantage = rewards - state_values
action_loss = -logprobs * advantage
value_loss = F.smooth_l1_loss(state_values, rewards)
loss = (action_loss + value_loss).sum()
return loss
def clear_memory(self):
del self.state_values[:]
del self.logprobs[:]
del self.rewards[:]