library.py 4.29 KB
Newer Older
Alain Shakour's avatar
Alain Shakour committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
def ca_set_binary_threshold_from_skew(input_dict):
    cost_false_pos = input_dict['cost_false_pos']
    cost_false_neg = input_dict['cost_false_neg']
    ratio_pos_neg = input_dict['ratio_pos_neg']
    output_dict = {}
    output_dict['bin_thres'] =  float(ratio_pos_neg) * (float(cost_false_pos) / float(cost_false_neg))
    return output_dict

def ca_estimate_pos_neg_from_prd_fct(input_dict):
    import re
    output_dict = {}
    deploy_data = input_dict['deploy_data']
    target_att = input_dict['target_att']
    pos_col = input_dict['pos_col']
    neg_col = input_dict['neg_col']

    
    with open(deploy_data) as f:
        deploy_file = f.read()
        
    pos_arr = re.findall(target_att+"\(.*," +pos_col+"\)\.", deploy_file)
    print len(pos_arr)

    neg_arr = re.findall(target_att+"\(.*," +neg_col+"\)\.", deploy_file)
    print len(neg_arr)
    
    output_dict['ratio_pos_neg'] = len(pos_arr)/float(len(neg_arr))  
    return output_dict

def ca_apply_binary_threshold(input_dict):
    performance = input_dict['score']
    thres = input_dict['bin_thres']
    
    n = len(performance['predicted'])
    for i in range(n):
        if performance['predicted'][i] >= thres:
            performance['predicted'][i] = 1
        else:
            performance['predicted'][i] = 0
                
    output_dict = {}
    output_dict['classes'] = performance
    return output_dict

def ca_rate_driven_threshold_selection(input_dict):
    from collections import Counter

    performance = input_dict['score']
    list_score = []
    labels = ''
    n = len(performance['actual'])
    for i in range(n):
        list_score.append((performance['actual'][i],performance['predicted'][i]))
    output_dict = {}
    sorted_score = sorted(list_score, key=lambda scr: scr[1],reverse=True)
    counter_neg = len([score for score in list_score if score[0] == 0])
    counter_pos = len([score for score in list_score if score[0] == 1])
    output_dict['bin_thres'] = find_best_roc_weight('rate',sorted_score,counter_pos,counter_neg)        
    return output_dict

def ca_score_driven_threshold_selection(input_dict):
    from collections import Counter

    performance = input_dict['score']
    method = input_dict['method']
    list_score = []
    labels = ''
    n = len(performance['actual'])
    for i in range(n):
        list_score.append((performance['actual'][i],performance['predicted'][i]))
    output_dict = {}
    sorted_score = sorted(list_score, key=lambda scr: scr[1],reverse=True)
    counter_neg = len([score for score in list_score if score[0] == 0])
    counter_pos = len([score for score in list_score if score[0] == 1])
    output_dict['bin_thres'] = find_best_roc_weight(method,sorted_score,counter_pos,counter_neg)        
    return output_dict

def find_best_roc_weight(method,a_list,a_num_positives,a_num_negatives):
    previous = float('inf')
    xpos = 0
    xneg = a_num_negatives
    the_best_value = get_value(method,xpos,xneg,a_num_positives,a_num_negatives)
    best = previous
    for the_elt in a_list:
        the_roc = the_elt
        current = the_roc[1]
        if current != previous:
            possible_best_value = get_value(method,xpos,xneg,a_num_positives,a_num_negatives)
            if  possible_best_value > the_best_value:
                the_best_value = possible_best_value
                best = (previous + current) / float(2)
        if the_roc[0] == 1:
            xpos += 1
        else:
            xneg -= 1
        previous = current;        

    possible_best_value = get_value(method,xpos,xneg,a_num_positives,a_num_negatives)
    if  possible_best_value > the_best_value:
        the_best_value = possible_best_value
        best = (previous + float('-inf')) / float(2)   
    return best

def get_value(method, TP, TN, P, N):
    if method == 'accuracy':
        accuracy = ( TP + TN ) / float( N + P )
        return accuracy
    elif method == 'balanced':
        balanced = ( TP / float(P) + TN / float(N)) / 2
        return balanced
    FN = P - TP
    FP = N - TN
    recall = TN / float(N)
    if TN + FN > 0:
        precision = TN / float(TN + FN)
        if method == 'precision':
            return precision
        if precision + recall > 0:
            F_measure = 2 * precision * recall / (precision + recall)
        else:
            F_measure = 0
    else:
        F_measure = 0
    return F_measure