Multi-armed bandit problem with soft-max probability action selection¶
- Select suitable $\tau$ value for soft-max is tricky
In [22]:
import numpy as np
from scipy import stats
import random
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set_style('whitegrid')
n = 10
arms = np.random.rand(n)
print 'Generate arms with probability'
print arms
def reward(prob):
reward = 0;
for i in range(10):
if random.random() < prob:
reward += 1
return reward
def softmax(av):
probs = np.zeros(n)
for i in range(n):
probs[i] = np.exp(av[i]/tau) / np.sum(np.exp(av[:]/tau))
return probs
In [26]:
'''Softmax Action Selection '''
f, ((ax1, ax2), (ax3,ax4)) = plt.subplots(2, 2, figsize=(14,8))
ax1.set_xlabel('Iterations')
ax1.set_ylabel('Avg Reward')
ax1.set_ylim([0, 10])
ax1.set_xlim([-200, 1200])
ax2.set_xlabel('Arms')
ax2.set_ylabel('AV')
ax3.set_xlabel('Arms')
ax3.set_ylabel('Choice Counts')
ax4.set_xlabel('Arms')
ax4.set_ylabel('Prob')
av = np.ones(n)
counts = np.zeros(n)
av_softmax = np.zeros(n)
av_softmax[:] = 0.1
tau = 1.52
for i in range(1000):
choice = np.where(arms == np.random.choice(arms, p=av_softmax))[0][0]
counts[choice] += 1
k = counts[choice]
rwd = reward(arms[choice])
old_avg = av[choice]
new_avg = old_avg + (1/k)*(rwd - old_avg)
av[choice] = new_avg
av_softmax = softmax(av)
runningMean = np.average(av, weights=np.array([counts[j]/np.sum(counts)
for j in range(len(counts))]))
ax1.scatter(i, runningMean, alpha=0.4, c=sns.xkcd_rgb["denim blue"])
ax1.plot([-200, 1200], [np.max(arms)*10,np.max(arms)*10], sns.xkcd_rgb["pale red"], lw=3, alpha=0.8,linestyle='--')
x = np.arange(1, 11, 1)
sns.barplot(x, av, palette=sns.color_palette("Blues",10), ax=ax2)
sns.barplot(x, counts, palette=sns.color_palette("Blues",10), ax=ax3)
sns.barplot(x, arms, palette=sns.color_palette("Blues",10), ax=ax4)
Out[26]:
In [27]:
import pyprind
def generate_bernoulli_bandit_data(num_samples,K):
CTRs_that_generated_data = np.tile(np.random.rand(K),(num_samples,1))
true_rewards = np.random.rand(num_samples,K) < CTRs_that_generated_data
return true_rewards,CTRs_that_generated_data
# totally random
def random(estimated_beta_params):
return np.random.randint(0,len(estimated_beta_params))
# the naive algorithm
def naive(estimated_beta_params,number_to_explore=100):
totals = estimated_beta_params.sum(1) # totals
if np.any(totals < number_to_explore): # if have been explored less than specified
least_explored = np.argmin(totals) # return the one least explored
return least_explored
else: # return the best mean forever
successes = estimated_beta_params[:,0] # successes
estimated_means = successes/totals # the current means
best_mean = np.argmax(estimated_means) # the best mean
return best_mean
# the epsilon greedy algorithm
def epsilon_greedy(estimated_beta_params,epsilon=0.01):
totals = estimated_beta_params.sum(1) # totals
successes = estimated_beta_params[:,0] # successes
estimated_means = successes/totals # the current means
best_mean = np.argmax(estimated_means) # the best mean
be_exporatory = np.random.rand() < epsilon # should we explore?
if be_exporatory: # totally random, excluding the best_mean
other_choice = np.random.randint(0,len(estimated_beta_params))
while other_choice == best_mean:
other_choice = np.random.randint(0,len(estimated_beta_params))
return other_choice
else: # take the best mean
return best_mean
# the UCB algorithm using
# (1 - 1/t) confidence interval using Chernoff-Hoeffding bound)
# for details of this particular confidence bound, see the UCB1-TUNED section, slide 18, of:
# http://lane.compbio.cmu.edu/courses/slides_ucb.pdf
def UCB(estimated_beta_params):
t = float(estimated_beta_params.sum()) # total number of rounds so far
totals = estimated_beta_params.sum(1)
successes = estimated_beta_params[:,0]
estimated_means = successes/totals # sample mean
estimated_variances = estimated_means - estimated_means**2
UCB = estimated_means + np.sqrt( np.minimum( estimated_variances + np.sqrt(2*np.log(t)/totals), 0.25 ) * np.log(t)/totals )
return np.argmax(UCB)
# the UCB algorithm - using fixed 95% confidence intervals
# see slide 8 for details:
# http://dept.stat.lsa.umich.edu/~kshedden/Courses/Stat485/Notes/binomial_confidence_intervals.pdf
def UCB_bernoulli(estimated_beta_params):
totals = estimated_beta_params.sum(1) # totals
successes = estimated_beta_params[:,0] # successes
estimated_means = successes/totals # sample mean
estimated_variances = estimated_means - estimated_means**2
UCB = estimated_means + 1.96*np.sqrt(estimated_variances/totals)
return np.argmax(UCB)
# the bandit algorithm
def run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,choice_func):
num_samples,K = true_rewards.shape
# seed the estimated params (to avoid )
prior_a = 1. # aka successes
prior_b = 1. # aka failures
estimated_beta_params = np.zeros((K,2))
estimated_beta_params[:,0] += prior_a # allocating the initial conditions
estimated_beta_params[:,1] += prior_b
regret = np.zeros(num_samples) # one for each of the 3 algorithms
for i in range(0,num_samples):
# pulling a lever & updating estimated_beta_params
this_choice = choice_func(estimated_beta_params)
# update parameters
if true_rewards[i,this_choice] == 1:
update_ind = 0
else:
update_ind = 1
estimated_beta_params[this_choice,update_ind] += 1
# updated expected regret
regret[i] = np.max(CTRs_that_generated_data[i,:]) - CTRs_that_generated_data[i,this_choice]
cum_regret = np.cumsum(regret)
return cum_regret
num_samples = 10000 # define number of samples and number of choices
K = 5 # number of arms
regret_accumulator = np.zeros((num_samples,5))
number_experiments = 10
pbar = pyprind.ProgBar(number_experiments)
for i in range(number_experiments):
true_rewards,CTRs_that_generated_data = generate_bernoulli_bandit_data(num_samples,K)
regret_accumulator[:,0] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,random)
regret_accumulator[:,1] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,naive)
regret_accumulator[:,2] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,epsilon_greedy)
regret_accumulator[:,3] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,UCB)
regret_accumulator[:,4] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,UCB_bernoulli)
pbar.update()
plt.semilogy(regret_accumulator/number_experiments, lw=3)
plt.title('Simulated Bandit Performance for K = 5')
plt.ylabel('Cumulative Expected Regret')
plt.xlabel('Round Index')
plt.legend(('Random','Naive','Epsilon-Greedy','(1 - 1/t) UCB','95% UCB'),loc='lower right')
plt.show()
In [32]:
from pymc import rbeta
class Bandits(object):
def __init__(self, p_array):
self.p = p_array
self.optimal = np.argmax(p_array)
def pull(self, i):
# i is which arm to pull
return np.random.rand() < self.p[i]
def __len__(self):
return len(self.p)
class BayesianStrategy(object):
def __init__(self, bandits):
self.bandits = bandits
n_bandits = len(self.bandits)
self.wins = np.zeros(n_bandits)
self.trials = np.zeros(n_bandits)
self.N = 0
self.choices = []
self.bb_score = []
def sample_bandits(self, n=1):
bb_score = np.zeros(n)
choices = np.zeros(n)
for k in range(n):
# sample from the bandits's priors, and select the largest sample
choice = np.argmax(rbeta(1 + self.wins, 1 + self.trials - self.wins))
result = self.bandits.pull(choice)
self.wins[choice] = 0.98 * self.wins[choice] + result
self.trials[choice] = 0.98 * self.trials[choice] + 1
bb_score[k] = result
self.N += 1
choices[k] = choice
self.bb_score = np.r_[self.bb_score, bb_score]
self.choices = np.r_[self.choices, choices]
return
In [33]:
import scipy.stats as stats
sns.set_style('darkgrid', {'grid.color': '.8','grid.linestyle': u'--'}) #'grid.color': '.8',
figsize(12.0, 15)
beta = stats.beta
x = np.linspace(0.001, .999, 200)
def plot_priors(bayesian_strategy, prob, lw=3, alpha=0.2, plt_vlines=True):
wins = bayesian_strategy.wins
trials = bayesian_strategy.trials
for i in range(prob.shape[0]):
y = beta(1 + wins[i], 1 + trials[i] - wins[i])
p = plt.plot(x, y.pdf(x), lw=lw)
c = p[0].get_markeredgecolor()
plt.fill_between(x, y.pdf(x), 0, color=c, alpha=alpha,
label="underlying probability: %.2f" % prob[i])
if plt_vlines:
plt.vlines(prob[i], 0, y.pdf(prob[i]),
colors=c, linestyles="--", lw=2)
plt.autoscale(tight="True")
plt.title("Posteriors After %d pull" % bayesian_strategy.N +
"s" * (bayesian_strategy.N > 1))
plt.autoscale(tight=True)
return
hidden_prob = np.array([0.85, 0.7, 0.45, 0.35, 0.55])
bandits = Bandits(hidden_prob)
bayesian_strat = BayesianStrategy(bandits)
draw_samples = [1, 4, 5, 10, 30, 50, 100, 200, 600,2000]
for j, i in enumerate(draw_samples):
plt.subplot(6, 2, j + 1)
bayesian_strat.sample_bandits(i)
plot_priors(bayesian_strat, hidden_prob)
# plt.legend(loc=0)
# plt.autoscale(tight=True)
plt.tight_layout()
Balckjack game with Monte Carlo & Tabular Methods¶
- generate 3D result that show the same strategy with human mind
In [5]:
import math
import random
from __future__ import division
def randomCard():
card = random.randint(1,13)
if card > 10:
card = 10
return card
def useable_ace(hand):
val, ace = hand
return ((ace) and ((val + 10) <= 21))
def totalValue(hand):
val, ace = hand
if (useable_ace(hand)):
return (val + 10)
else:
return val
def add_card(hand, card):
val, ace = hand
if (card == 1):
ace = True
return (val + card, ace)
def eval_dealer(dealer_hand):
while (totalValue(dealer_hand) < 17):
dealer_hand = add_card(dealer_hand, randomCard())
return dealer_hand
def play(state, dec):
#evaluate
player_hand = state[0] #val, useable ace
dealer_hand = state[1]
if dec == 0: #action = stay
#evaluate game; dealer plays
dealer_hand = eval_dealer(dealer_hand)
player_tot = totalValue(player_hand)
dealer_tot = totalValue(dealer_hand)
status = 1
if (dealer_tot > 21):
status = 2 #player wins
elif (dealer_tot == player_tot):
status = 3 #draw
elif (dealer_tot < player_tot):
status = 2 #player wins
elif (dealer_tot > player_tot):
status = 4 #player loses
elif dec == 1: #action = hit
#if hit, add new card to player's hand
player_hand = add_card(player_hand, randomCard())
d_hand = eval_dealer(dealer_hand)
player_tot = totalValue(player_hand)
status = 1
if (player_tot == 21):
if (totalValue(d_hand) == 21):
status = 3 #draw
else:
status = 2 #player wins!
elif (player_tot > 21):
status = 4 #player loses
elif (player_tot < 21):
#game still in progress
status = 1
state = (player_hand, dealer_hand, status)
return state
#random initial state
def initGame():
status = 1 #1=in progress; 2=player won; 3=draw; 4 = dealer won/player loses
player_hand = add_card((0, False), randomCard())
player_hand = add_card(player_hand, randomCard())
dealer_hand = add_card((0, False), randomCard())
#evaluate if player wins from first hand
if totalValue(player_hand) == 21:
if totalValue(dealer_hand) != 21:
status = 2 #player wins after first deal!
else:
status = 3 #draw
state = (player_hand, dealer_hand, status)
return state
In [6]:
def initStateSpace():
states = []
for card in range(1,11):
for val in range(11,22):
states.append((val, False, card))
states.append((val, True, card))
return states
def initStateActions(states):
av = {}
for state in states:
av[(state, 0)] = 0.0
av[(state, 1)] = 0.0
return av
def initSAcount(stateActions):
counts = {}
for sa in stateActions:
counts[sa] = 0
return counts
def calcReward(outcome):
return 3-outcome
#recalculates the average rewards
def updateQtable(av_table, av_count, returns):
for key in returns:
av_table[key] = av_table[key] + (1 / av_count[key]) * (returns[key]- av_table[key])
return av_table
#returns Q-value/avg rewards
def qsv(state, av_table):
stay = av_table[(state,0)]
hit = av_table[(state,1)]
return np.array([stay, hit])
def getRLstate(state):
player_hand, dealer_hand, status = state
player_val, player_ace = player_hand
return (player_val, player_ace, dealer_hand[0])
In [144]:
epochs = 5000000
epsilon = 0.1
state_space = initStateSpace()
av_table = initStateActions(state_space)
av_count = initSAcount(av_table)
for i in range(epochs):
#initialize new game
state = initGame()
player_hand, dealer_hand, status = state
while player_hand[0] < 11:
player_hand = add_card(player_hand, randomCard())
state = (player_hand, dealer_hand, status)
rl_state = getRLstate(state)
returns = {} #state, action, return
while(state[2] == 1): #while in current episode
#epsilon greedy action selection
act_probs = qsv(rl_state, av_table)
if (random.random() < epsilon):
action = random.randint(0,1)
else:
action = np.argmax(act_probs)#select an action
sa = ((rl_state, action))
returns[sa] = 0 #add a-v pair to returns list, default value to 0
av_count[sa] += 1 #increment counter for avg calc
state = play(state, action) #make a play, observe new state
rl_state = getRLstate(state)
#after an episode is complete, assign rewards to all the state-actions
for key in returns:
returns[key] = calcReward(state[2])
av_table = updateQtable(av_table, av_count, returns)
print("Done")
In [217]:
#plot of state-value space
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import seaborn as sns
sns.set_style('whitegrid')
%matplotlib inline
fig = plt.figure(figsize=(12, 9))
ax = fig.add_subplot(111, projection='3d', )
ax.set_xlabel('Dealer card')
ax.set_ylabel('Player sum')
ax.set_zlabel('State-Value')
x,y,z = [],[],[]
for key in state_space:
if (not key[1] and key[0] > 11 and key[2] < 21):
y.append(key[0])
x.append(key[2])
state_value = max([av_table[(key, 0)], av_table[(key, 1)]])
z.append(state_value)
ax.azim = 230
ax.plot_trisurf(x,y,z, linewidth=.05, cmap=cm.bone)
# ax.plot_trisurf(x,y,z, linewidth=.02, cmap=sns.cubehelix_palette(8, start=.5, rot=-.75, as_cmap=True))
Out[217]: