Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Mathieu Reymond
deep-sea-treasure
Commits
4589bd74
Commit
4589bd74
authored
Jun 25, 2019
by
Mathieu Reymond
Browse files
approximate pareto-front with polynomial regressors
parent
258067be
Changes
1
Hide whitespace changes
Inline
Side-by-side
poly_pql.py
0 → 100644
View file @
4589bd74
import
numpy
as
np
from
pareto_q
import
Agent
,
get_non_dominated
,
compute_hypervolume
from
sklearn.preprocessing
import
PolynomialFeatures
from
sklearn.linear_model
import
LinearRegression
import
matplotlib.pyplot
as
plt
def
dst_non_dominated
(
env
):
non_dominated
=
[[[
np
.
zeros
(
2
)]
for
_
in
range
(
env
.
nA
)]
for
_
in
range
(
env
.
nS
)]
# compute all points for each position on the grid
for
s
in
range
(
env
.
nS
):
pos
=
np
.
unravel_index
(
s
,
env
.
shape
)
for
a_i
,
a
in
enumerate
(
np
.
array
([[
-
1
,
0
],
[
0
,
1
],
[
1
,
0
],
[
0
,
-
1
]])):
pos_a
=
np
.
minimum
([
10
,
9
],
np
.
maximum
([
0
,
0
],
pos
+
a
))
for
t_pos
,
t
in
env
.
_treasures
().
items
():
# ignore left action (and up-action is always worse, so only look at right, down)
# if t_pos[1] >= pos_a[1]:
# manhattan distance = fuel consumption
fuel
=
-
np
.
sum
(
np
.
absolute
(
np
.
array
(
t_pos
)
-
pos_a
))
non_dominated
[
s
][
a_i
].
append
(
np
.
array
([
t
,
fuel
]))
# if position is treasure position, no other points are on the pareto front (end of episode)
if
np
.
all
(
t_pos
==
pos_a
):
non_dominated
[
s
][
a_i
]
=
np
.
array
([[
0
,
0
],
[
0
,
0
]])
break
# only keep non-dominated points, ignore (0, 0) (except for treasure positions)
non_dominated
[
s
][
a_i
]
=
get_non_dominated
(
np
.
array
(
non_dominated
[
s
][
a_i
][
1
:]))
for
t_pos
in
env
.
_treasures
().
keys
():
t_i
=
np
.
ravel_multi_index
(
t_pos
,
env
.
shape
)
for
a_i
in
range
(
4
):
non_dominated
[
t_i
][
a_i
]
=
np
.
array
([[
0
,
0
]])
return
non_dominated
class
PolyApproximator
(
object
):
degree
=
4
def
__init__
(
self
,
points
,
bounds
=
None
):
if
bounds
is
None
:
bounds
=
np
.
array
([
0
,
0
])
self
.
bounds
=
bounds
pf
,
pm
=
self
.
_fit_poly
(
points
)
self
.
poly_feat
=
pf
self
.
poly_model
=
pm
def
__call__
(
self
,
x
):
if
len
(
x
.
shape
)
==
1
:
x
=
x
[:,
None
]
x_poly
=
self
.
poly_feat
.
transform
(
x
)
return
self
.
poly_model
.
predict
(
x_poly
)
def
samples
(
self
,
n
=
10
):
x
=
np
.
linspace
(
self
.
bounds
[
0
],
self
.
bounds
[
1
],
n
)
y
=
self
(
x
).
squeeze
()
y
=
np
.
clip
(
y
,
-
23
,
0
)
# join each axis to make points
points
=
np
.
stack
((
x
,
y
),
axis
=
1
)
return
points
def
_fit_poly
(
self
,
points
):
x
=
points
[:,
0
,
None
]
y
=
points
[:,
1
,
None
]
poly_feat
=
PolynomialFeatures
(
degree
=
PolyApproximator
.
degree
)
x_poly
=
poly_feat
.
fit_transform
(
x
)
poly_model
=
LinearRegression
()
poly_model
.
fit
(
x_poly
,
y
)
return
poly_feat
,
poly_model
def
joint_points
(
self
,
poly
,
n
=
10
):
joint_bounds
=
np
.
minimum
(
self
.
bounds
[
0
],
poly
.
bounds
[
0
]),
np
.
maximum
(
self
.
bounds
[
1
],
poly
.
bounds
[
1
])
# sample over joint bounds
x
=
np
.
linspace
(
joint_bounds
[
0
],
joint_bounds
[
1
],
n
)
# only keep individual bounds for each model prediction
x1_arg
=
np
.
nonzero
(
np
.
logical_and
(
x
>=
self
.
bounds
[
0
],
x
<=
self
.
bounds
[
1
]))[
0
]
x2_arg
=
np
.
nonzero
(
np
.
logical_and
(
x
>=
poly
.
bounds
[
0
],
x
<=
poly
.
bounds
[
1
]))[
0
]
# predict points between each model's individual bounds
y1
=
self
(
x
[
x1_arg
]).
squeeze
()
y2
=
poly
(
x
[
x2_arg
]).
squeeze
()
# extend individual borders to cover joint borders, by repeating leftmost and rightmost prediction
# first estimator
y1_full
=
np
.
zeros
(
len
(
x
))
y1_full
[
x1_arg
]
=
y1
y1_full
[:
x1_arg
[
0
]]
=
y1
[
0
]
y1_full
[
x1_arg
[
1
]:]
=
y1
[
-
1
]
# second estimator
y2_full
=
np
.
zeros
(
len
(
x
))
y2_full
[
x2_arg
]
=
y2
y2_full
[:
x2_arg
[
0
]]
=
y2
[
0
]
y2_full
[
x2_arg
[
1
]:]
=
y2
[
-
1
]
# the joint pareto front is the nondominated part:
# as both predictions have the same x-coordinates, keep the max prediction for each coord
y
=
np
.
maximum
(
y1_full
,
y2_full
)
# join each axis to make points
points
=
np
.
stack
((
x
,
y
),
axis
=
1
)
return
points
,
np
.
array
(
joint_bounds
)
class
PolyPQL
(
Agent
):
def
__init__
(
self
,
env
,
choose_action
,
ref_point
,
nO
=
2
,
gamma
=
1.
):
self
.
env
=
env
self
.
choose_action
=
choose_action
self
.
gamma
=
gamma
self
.
ref_point
=
ref_point
# start with dummy approximators all pointing towards ref_point
base
=
ref_point
+
np
.
ones
(
2
)
*
1e-8
base
=
np
.
zeros
(
2
)
self
.
non_dominated
=
[[
PolyApproximator
(
base
[
None
,
:],
bounds
=
np
.
array
([
base
[
0
],
base
[
0
]]))
for
_
in
range
(
env
.
nA
)]
for
_
in
range
(
env
.
nS
)]
self
.
avg_r
=
np
.
zeros
((
env
.
nS
,
env
.
nA
,
nO
))
self
.
n_visits
=
np
.
zeros
((
env
.
nS
,
env
.
nA
))
def
start
(
self
,
log
=
None
):
self
.
epsilon
=
1.
state
=
self
.
env
.
reset
()
return
{
'observation'
:
state
,
'terminal'
:
False
}
def
compute_q_set
(
self
,
s
,
n
=
10
):
q_set
=
[]
for
a
in
range
(
self
.
env
.
nA
):
sa_appr
=
self
.
non_dominated
[
s
][
a
]
rew
=
self
.
avg_r
[
s
,
a
]
nd_sa
=
sa_appr
.
samples
(
n
=
n
)
q_set
.
append
([
rew
+
self
.
gamma
*
nd
for
nd
in
nd_sa
])
return
np
.
array
(
q_set
)
def
update_non_dominated
(
self
,
s
,
a
,
r
,
s_n
,
t
):
# if terminal, the state-action will lead to a single immediate return
if
t
:
self
.
non_dominated
[
s
][
a
]
=
PolyApproximator
(
r
[
None
,
:],
np
.
array
([
r
[
0
],
r
[
0
]]))
# else, compute pareto front of next state, update current state's pareto front
else
:
q_set_n
=
self
.
compute_q_set
(
s_n
)
# update for all actions, flatten
solutions
=
np
.
concatenate
(
q_set_n
,
axis
=
0
)
# compute pareto front
nd
=
get_non_dominated
(
solutions
)
# get joint bounds
l
=
np
.
min
([
e
.
bounds
for
e
in
self
.
non_dominated
[
s_n
]])
r
=
np
.
max
([
e
.
bounds
for
e
in
self
.
non_dominated
[
s_n
]])
# make a pareto front estimator
self
.
non_dominated
[
s
][
a
]
=
PolyApproximator
(
nd
,
np
.
array
([
l
,
r
]))
def
step
(
self
,
previous
,
log
=
None
):
state
=
previous
[
'observation'
]
q_set
=
self
.
compute_q_set
(
state
)
try
:
action
=
self
.
choose_action
(
state
,
q_set
,
self
.
epsilon
)
except
:
print
(
f
'state
{
state
}
'
)
for
a
in
range
(
self
.
env
.
nA
):
m
=
self
.
non_dominated
[
state
][
a
]
p
=
m
.
samples
(
100
)
plt
.
plot
(
p
[:,
0
],
p
[:,
1
],
label
=
f
'action
{
a
}
'
)
plt
.
legend
()
plt
.
show
()
raise
next_state
,
reward
,
terminal
,
_
=
self
.
env
.
step
(
action
)
# update non-dominated set
self
.
update_non_dominated
(
state
,
action
,
reward
,
next_state
,
terminal
)
# update avg immediate reward
self
.
n_visits
[
state
,
action
]
+=
1
self
.
avg_r
[
state
,
action
]
+=
(
reward
-
self
.
avg_r
[
state
,
action
])
/
self
.
n_visits
[
state
,
action
]
self
.
epsilon
*=
0.997
return
{
'observation'
:
next_state
,
'terminal'
:
terminal
,
'reward'
:
reward
}
def
end
(
self
,
log
=
None
,
writer
=
None
):
if
writer
is
not
None
:
h_v
=
compute_hypervolume
(
self
.
compute_q_set
(
0
),
self
.
env
.
nA
,
self
.
ref_point
)
writer
.
add_scalar
(
'hypervolume'
,
np
.
amax
(
h_v
),
log
.
episode
)
fig
=
plt
.
figure
()
if
self
.
avg_r
.
shape
[
2
]
==
3
:
ax
=
plt
.
axes
(
projection
=
'3d'
)
pareto
=
np
.
concatenate
([
e
.
samples
()
for
e
in
self
.
non_dominated
[
0
]])
if
len
(
pareto
):
# number of objectives
if
pareto
.
shape
[
1
]
==
2
:
for
a
in
range
(
self
.
env
.
nA
):
e
=
self
.
non_dominated
[
0
][
a
]
p
=
e
.
samples
(
100
)
plt
.
plot
(
p
[:,
0
],
p
[:,
1
],
label
=
f
'estimated pf for action
{
a
}
'
)
# plt.plot(pareto[:, 0], pareto[:, 1], 'o', label='estimated pareto-front')
elif
pareto
.
shape
[
1
]
==
3
:
ax
.
plot3D
(
pareto
[:,
0
],
pareto
[:,
1
],
pareto
[:,
2
],
'o'
)
true_nd
=
dst_non_dominated
(
self
.
env
)[
0
]
true_nd
=
get_non_dominated
(
np
.
concatenate
(
true_nd
))
plt
.
plot
(
true_nd
[:,
0
],
true_nd
[:,
1
],
'+'
,
label
=
'ground truth'
)
plt
.
legend
()
fig
.
canvas
.
draw
()
# Now we can save it to a numpy array.
data
=
np
.
frombuffer
(
fig
.
canvas
.
tostring_rgb
(),
dtype
=
np
.
uint8
)
data
=
data
.
reshape
(
fig
.
canvas
.
get_width_height
()[::
-
1
]
+
(
3
,))
data
=
np
.
rollaxis
(
data
,
-
1
,
0
)
writer
.
add_image
(
'pareto_front'
,
data
,
log
.
episode
)
plt
.
close
(
fig
)
def
evaluate
(
self
,
w
=
np
.
array
([.
5
,
.
5
])):
p
=
self
.
start
()
state
,
done
=
p
[
'observation'
],
p
[
'terminal'
]
norm
=
np
.
array
([
124
,
19
])
i
=
np
.
ravel_multi_index
((
3
,
5
),
self
.
env
.
shape
)
qi
=
self
.
compute_q_set
(
i
)
print
([
np
.
array
(
qi
[
a
])
for
a
in
range
(
4
)])
while
not
done
:
q
=
self
.
compute_q_set
(
state
)
q_s
=
[
np
.
amax
(
np
.
sum
(
q
[
a
]
*
w
/
norm
,
axis
=
1
))
for
a
in
range
(
self
.
env
.
nA
)]
action
=
np
.
random
.
choice
(
np
.
argwhere
(
q_s
==
np
.
amax
(
q_s
)).
flatten
())
print
(
q_s
,
action
)
state
,
reward
,
done
,
_
=
self
.
env
.
step
(
action
)
print
(
w
,
reward
)
if
__name__
==
'__main__'
:
import
gym
import
deep_sea_treasure
from
gym
import
wrappers
from
pareto_q
import
action_selection
env
=
gym
.
make
(
'deep-sea-treasure-v0'
)
ref_point
=
np
.
array
([
0.
,
-
25.
])
agent
=
PolyPQL
(
env
,
lambda
s
,
q
,
e
:
action_selection
(
s
,
q
,
e
,
ref_point
),
ref_point
,
nO
=
2
,
gamma
=
1.
)
# env = gym.make('resource-gathering-v0')
# ref_point = np.array([-1, -1, -2])
# agent = ParetoQ(env, lambda s, q, e: action_selection(s, q, e, ref_point), ref_point, nO=3, gamma=0.96)
logdir
=
'runs/poly-pql/'
agent
.
train
(
4000
,
logdir
=
logdir
)
print
(
agent
.
non_dominated
[
0
])
\ No newline at end of file
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment