Reinforcement Learning notebook

Posted by Zongyi Liu on October 10, 2016
RL_poker

Multi-armed bandit problem with soft-max probability action selection

  • Select suitable $\tau$ value for soft-max is tricky
In [22]:
import numpy as np
from scipy import stats
import random
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set_style('whitegrid')

n = 10
arms = np.random.rand(n)
print 'Generate arms with probability'
print arms

def reward(prob):
    reward = 0;
    for i in range(10):
        if random.random() < prob:
            reward += 1
    return reward

def softmax(av):
    probs = np.zeros(n)
    for i in range(n):
        probs[i] = np.exp(av[i]/tau) / np.sum(np.exp(av[:]/tau))
    return probs
Generate arms with probability
[ 0.43035659  0.83517718  0.22396533  0.67031074  0.57686724  0.42550421
  0.27396302  0.16508204  0.92536656  0.14501953]
In [26]:
'''Softmax Action Selection ''' 
f, ((ax1, ax2), (ax3,ax4)) = plt.subplots(2, 2, figsize=(14,8))
ax1.set_xlabel('Iterations')
ax1.set_ylabel('Avg Reward')
ax1.set_ylim([0, 10])
ax1.set_xlim([-200, 1200])
ax2.set_xlabel('Arms')
ax2.set_ylabel('AV')
ax3.set_xlabel('Arms')
ax3.set_ylabel('Choice Counts')
ax4.set_xlabel('Arms')
ax4.set_ylabel('Prob')

av = np.ones(n)
counts = np.zeros(n)
av_softmax = np.zeros(n)
av_softmax[:] = 0.1

tau = 1.52

for i in range(1000):
    choice = np.where(arms == np.random.choice(arms, p=av_softmax))[0][0]       
    counts[choice] += 1
    k = counts[choice]
    rwd = reward(arms[choice])        
    old_avg = av[choice]
    new_avg = old_avg + (1/k)*(rwd - old_avg)
    av[choice] = new_avg
    av_softmax = softmax(av)
    runningMean = np.average(av, weights=np.array([counts[j]/np.sum(counts)
                 for j in range(len(counts))]))
    ax1.scatter(i, runningMean, alpha=0.4, c=sns.xkcd_rgb["denim blue"])
    
ax1.plot([-200, 1200], [np.max(arms)*10,np.max(arms)*10], sns.xkcd_rgb["pale red"], lw=3, alpha=0.8,linestyle='--')
x = np.arange(1, 11, 1)
sns.barplot(x, av, palette=sns.color_palette("Blues",10), ax=ax2)
sns.barplot(x, counts, palette=sns.color_palette("Blues",10), ax=ax3)
sns.barplot(x, arms, palette=sns.color_palette("Blues",10), ax=ax4)
Out[26]:
<matplotlib.axes._subplots.AxesSubplot at 0x11d2e95d0>

Expected regret comparison with algorithms

  • Random
  • Naive
  • Epsilon-Greedy
  • (1 - 1/t) UCB
  • 95% UCB
In [27]:
import pyprind

def generate_bernoulli_bandit_data(num_samples,K):
    CTRs_that_generated_data = np.tile(np.random.rand(K),(num_samples,1))
    true_rewards = np.random.rand(num_samples,K) < CTRs_that_generated_data
    return true_rewards,CTRs_that_generated_data

# totally random
def random(estimated_beta_params):
    return np.random.randint(0,len(estimated_beta_params))

# the naive algorithm
def naive(estimated_beta_params,number_to_explore=100):
    totals = estimated_beta_params.sum(1) # totals
    if np.any(totals < number_to_explore): # if have been explored less than specified
        least_explored = np.argmin(totals) # return the one least explored
        return least_explored
    else: # return the best mean forever
        successes = estimated_beta_params[:,0] # successes
        estimated_means = successes/totals # the current means
        best_mean = np.argmax(estimated_means) # the best mean
        return best_mean

# the epsilon greedy algorithm
def epsilon_greedy(estimated_beta_params,epsilon=0.01):
    totals = estimated_beta_params.sum(1) # totals
    successes = estimated_beta_params[:,0] # successes
    estimated_means = successes/totals # the current means
    best_mean = np.argmax(estimated_means) # the best mean
    be_exporatory = np.random.rand() < epsilon # should we explore?
    if be_exporatory: # totally random, excluding the best_mean
        other_choice = np.random.randint(0,len(estimated_beta_params))
        while other_choice == best_mean:
            other_choice = np.random.randint(0,len(estimated_beta_params))
        return other_choice
    else: # take the best mean
        return best_mean

# the UCB algorithm using 
# (1 - 1/t) confidence interval using Chernoff-Hoeffding bound)
# for details of this particular confidence bound, see the UCB1-TUNED section, slide 18, of: 
# http://lane.compbio.cmu.edu/courses/slides_ucb.pdf
def UCB(estimated_beta_params):
    t = float(estimated_beta_params.sum()) # total number of rounds so far
    totals = estimated_beta_params.sum(1)
    successes = estimated_beta_params[:,0]
    estimated_means = successes/totals # sample mean
    estimated_variances = estimated_means - estimated_means**2    
    UCB = estimated_means + np.sqrt( np.minimum( estimated_variances + np.sqrt(2*np.log(t)/totals), 0.25 ) * np.log(t)/totals )
    return np.argmax(UCB)

# the UCB algorithm - using fixed 95% confidence intervals
# see slide 8 for details: 
# http://dept.stat.lsa.umich.edu/~kshedden/Courses/Stat485/Notes/binomial_confidence_intervals.pdf
def UCB_bernoulli(estimated_beta_params):
    totals = estimated_beta_params.sum(1) # totals
    successes = estimated_beta_params[:,0] # successes
    estimated_means = successes/totals # sample mean
    estimated_variances = estimated_means - estimated_means**2
    UCB = estimated_means + 1.96*np.sqrt(estimated_variances/totals)
    return np.argmax(UCB)
    
# the bandit algorithm
def run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,choice_func):
    num_samples,K = true_rewards.shape
    # seed the estimated params (to avoid )
    prior_a = 1. # aka successes 
    prior_b = 1. # aka failures
    estimated_beta_params = np.zeros((K,2))
    estimated_beta_params[:,0] += prior_a # allocating the initial conditions
    estimated_beta_params[:,1] += prior_b
    regret = np.zeros(num_samples) # one for each of the 3 algorithms

    for i in range(0,num_samples):
        # pulling a lever & updating estimated_beta_params
        this_choice = choice_func(estimated_beta_params)

        # update parameters
        if true_rewards[i,this_choice] == 1:
            update_ind = 0
        else:
            update_ind = 1
            
        estimated_beta_params[this_choice,update_ind] += 1
        
        # updated expected regret
        regret[i] = np.max(CTRs_that_generated_data[i,:]) - CTRs_that_generated_data[i,this_choice]

    cum_regret = np.cumsum(regret)

    return cum_regret

num_samples = 10000 # define number of samples and number of choices
K = 5 # number of arms
regret_accumulator = np.zeros((num_samples,5))
number_experiments = 10
pbar = pyprind.ProgBar(number_experiments)

for i in range(number_experiments):
    true_rewards,CTRs_that_generated_data = generate_bernoulli_bandit_data(num_samples,K)
    regret_accumulator[:,0] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,random)
    regret_accumulator[:,1] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,naive)
    regret_accumulator[:,2] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,epsilon_greedy)
    regret_accumulator[:,3] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,UCB)
    regret_accumulator[:,4] += run_bandit_dynamic_alg(true_rewards,CTRs_that_generated_data,UCB_bernoulli)
    pbar.update()
plt.semilogy(regret_accumulator/number_experiments, lw=3)
plt.title('Simulated Bandit Performance for K = 5')
plt.ylabel('Cumulative Expected Regret')
plt.xlabel('Round Index')
plt.legend(('Random','Naive','Epsilon-Greedy','(1 - 1/t) UCB','95% UCB'),loc='lower right')
plt.show()
0%      100%
[##########] | ETA: 00:00:00
Total time elapsed: 00:00:21

RL Iteration visualization

  • Use rbeta Thompson Sampling algorithm to select action
In [32]:
from pymc import rbeta
class Bandits(object):
    def __init__(self, p_array):
        self.p = p_array
        self.optimal = np.argmax(p_array)

    def pull(self, i):
        # i is which arm to pull
        return np.random.rand() < self.p[i]

    def __len__(self):
        return len(self.p)

class BayesianStrategy(object):
    def __init__(self, bandits):

        self.bandits = bandits
        n_bandits = len(self.bandits)
        self.wins = np.zeros(n_bandits)
        self.trials = np.zeros(n_bandits)
        self.N = 0
        self.choices = []
        self.bb_score = []

    def sample_bandits(self, n=1):

        bb_score = np.zeros(n)
        choices = np.zeros(n)

        for k in range(n):
            # sample from the bandits's priors, and select the largest sample
            choice = np.argmax(rbeta(1 + self.wins, 1 + self.trials - self.wins))
            result = self.bandits.pull(choice)
            self.wins[choice] = 0.98 * self.wins[choice] + result
            self.trials[choice] = 0.98 * self.trials[choice] + 1
            bb_score[k] = result
            self.N += 1
            choices[k] = choice

        self.bb_score = np.r_[self.bb_score, bb_score]
        self.choices = np.r_[self.choices, choices]
        return
In [33]:
import scipy.stats as stats
sns.set_style('darkgrid', {'grid.color': '.8','grid.linestyle': u'--'})  #'grid.color': '.8',
figsize(12.0, 15)

beta = stats.beta
x = np.linspace(0.001, .999, 200)

def plot_priors(bayesian_strategy, prob, lw=3, alpha=0.2, plt_vlines=True):
    wins = bayesian_strategy.wins
    trials = bayesian_strategy.trials
    for i in range(prob.shape[0]):
        y = beta(1 + wins[i], 1 + trials[i] - wins[i])
        p = plt.plot(x, y.pdf(x), lw=lw)
        c = p[0].get_markeredgecolor()
        plt.fill_between(x, y.pdf(x), 0, color=c, alpha=alpha,
                         label="underlying probability: %.2f" % prob[i])
        if plt_vlines:
            plt.vlines(prob[i], 0, y.pdf(prob[i]),
                       colors=c, linestyles="--", lw=2)
        plt.autoscale(tight="True")
        plt.title("Posteriors After %d pull" % bayesian_strategy.N +
                  "s" * (bayesian_strategy.N > 1))
        plt.autoscale(tight=True)
    return

hidden_prob = np.array([0.85, 0.7, 0.45, 0.35, 0.55])
bandits = Bandits(hidden_prob)
bayesian_strat = BayesianStrategy(bandits)

draw_samples = [1, 4, 5, 10, 30, 50, 100, 200, 600,2000]

for j, i in enumerate(draw_samples):
    plt.subplot(6, 2, j + 1)
    bayesian_strat.sample_bandits(i)
    plot_priors(bayesian_strat, hidden_prob)
#     plt.legend(loc=0)
#     plt.autoscale(tight=True)
plt.tight_layout()

Balckjack game with Monte Carlo & Tabular Methods

  • generate 3D result that show the same strategy with human mind
In [5]:
import math
import random
from __future__ import division

def randomCard():
    card = random.randint(1,13)
    if card > 10:
        card = 10
    return card

def useable_ace(hand):
    val, ace = hand
    return ((ace) and ((val + 10) <= 21))

def totalValue(hand):
    val, ace = hand
    if (useable_ace(hand)):
        return (val + 10)
    else:
        return val

def add_card(hand, card):
    val, ace = hand
    if (card == 1):
        ace = True
    return (val + card, ace)

def eval_dealer(dealer_hand):
    while (totalValue(dealer_hand) < 17):
        dealer_hand = add_card(dealer_hand, randomCard())
    return dealer_hand

def play(state, dec):
    #evaluate
    player_hand = state[0] #val, useable ace
    dealer_hand = state[1]
    if dec == 0: #action = stay
        #evaluate game; dealer plays
        dealer_hand = eval_dealer(dealer_hand)

        player_tot = totalValue(player_hand)
        dealer_tot = totalValue(dealer_hand)
        status = 1
        if (dealer_tot > 21):
            status = 2 #player wins
        elif (dealer_tot == player_tot):
            status = 3 #draw
        elif (dealer_tot < player_tot):
            status = 2 #player wins
        elif (dealer_tot > player_tot):
            status = 4 #player loses

    elif dec == 1: #action = hit
        #if hit, add new card to player's hand
        player_hand = add_card(player_hand, randomCard())
        d_hand = eval_dealer(dealer_hand)
        player_tot = totalValue(player_hand)
        status = 1
        if (player_tot == 21):
            if (totalValue(d_hand) == 21):
                status = 3 #draw
            else:
                status = 2 #player wins!
        elif (player_tot > 21):
            status = 4 #player loses
        elif (player_tot < 21):
            #game still in progress
            status = 1
    state = (player_hand, dealer_hand, status)

    return state

#random initial state
def initGame():
    status = 1 #1=in progress; 2=player won; 3=draw; 4 = dealer won/player loses
    player_hand = add_card((0, False), randomCard())
    player_hand = add_card(player_hand, randomCard())
    dealer_hand = add_card((0, False), randomCard())
    #evaluate if player wins from first hand
    if totalValue(player_hand) == 21:
        if totalValue(dealer_hand) != 21:
            status = 2 #player wins after first deal!
        else:
            status = 3 #draw

    state = (player_hand, dealer_hand, status)
    return state
In [6]:
def initStateSpace():
    states = []
    for card in range(1,11):
        for val in range(11,22):
            states.append((val, False, card))
            states.append((val, True, card))
    return states

def initStateActions(states):
    av = {}
    for state in states:
        av[(state, 0)] = 0.0
        av[(state, 1)] = 0.0
    return av

def initSAcount(stateActions):
    counts = {}
    for sa in stateActions:
        counts[sa] = 0
    return counts

def calcReward(outcome):
    return 3-outcome

#recalculates the average rewards
def updateQtable(av_table, av_count, returns):
    for key in returns:
        av_table[key] = av_table[key] + (1 / av_count[key]) * (returns[key]- av_table[key])
    return av_table

#returns Q-value/avg rewards 
def qsv(state, av_table):
    stay = av_table[(state,0)]
    hit = av_table[(state,1)]
    return np.array([stay, hit])

def getRLstate(state):
    player_hand, dealer_hand, status = state
    player_val, player_ace = player_hand
    return (player_val, player_ace, dealer_hand[0])
In [144]:
epochs = 5000000
epsilon = 0.1

state_space = initStateSpace()
av_table = initStateActions(state_space)
av_count = initSAcount(av_table)

for i in range(epochs):
    #initialize new game
    state = initGame()
    player_hand, dealer_hand, status = state
    
    while player_hand[0] < 11:
        player_hand = add_card(player_hand, randomCard())
        state = (player_hand, dealer_hand, status)
    rl_state = getRLstate(state)

    returns = {} #state, action, return
    while(state[2] == 1): #while in current episode
        #epsilon greedy action selection
        act_probs = qsv(rl_state, av_table)
        if (random.random() < epsilon):
            action = random.randint(0,1)
        else:
            action = np.argmax(act_probs)#select an action
        sa = ((rl_state, action))
        returns[sa] = 0 #add a-v pair to returns list, default value to 0
        av_count[sa] += 1 #increment counter for avg calc
        state = play(state, action) #make a play, observe new state
        rl_state = getRLstate(state)
    #after an episode is complete, assign rewards to all the state-actions
    for key in returns:
        returns[key] = calcReward(state[2])
    av_table = updateQtable(av_table, av_count, returns)
print("Done")
Done
In [217]:
#plot of state-value space
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import seaborn as sns
sns.set_style('whitegrid')
%matplotlib inline
fig = plt.figure(figsize=(12, 9))
ax = fig.add_subplot(111, projection='3d', )

ax.set_xlabel('Dealer card')
ax.set_ylabel('Player sum')
ax.set_zlabel('State-Value')

x,y,z = [],[],[]
for key in state_space:
    if (not key[1] and key[0] > 11 and key[2] < 21):
        y.append(key[0])
        x.append(key[2])
        state_value = max([av_table[(key, 0)], av_table[(key, 1)]])
        z.append(state_value)
ax.azim = 230
ax.plot_trisurf(x,y,z, linewidth=.05, cmap=cm.bone)
# ax.plot_trisurf(x,y,z, linewidth=.02, cmap=sns.cubehelix_palette(8, start=.5, rot=-.75, as_cmap=True))
Out[217]:
<mpl_toolkits.mplot3d.art3d.Poly3DCollection at 0x122c5e410>