from math import floor from numpy import size def ca_set_binary_threshold_from_skew(input_dict): import math cost_false_pos = input_dict['cost_false_pos'] cost_false_neg = input_dict['cost_false_neg'] ratio_pos_neg = input_dict['ratio_pos_neg'] output_dict = {} output_dict['bin_thres'] = math.log10(float(ratio_pos_neg) * (float(cost_false_neg) / float(cost_false_pos))) return output_dict def ca_estimate_pos_neg_from_prd_fct(input_dict): import re output_dict = {} deploy_data = input_dict['deploy_data'] target_att = input_dict['target_att'] pos_col = input_dict['pos_col'] neg_col = input_dict['neg_col'] with open(deploy_data) as f: deploy_file = f.read() pos_arr = re.findall(target_att+"\(.*," +pos_col+"\)\.", deploy_file) print len(pos_arr) neg_arr = re.findall(target_att+"\(.*," +neg_col+"\)\.", deploy_file) print len(neg_arr) output_dict['ratio_pos_neg'] = len(pos_arr)/float(len(neg_arr)) return output_dict def ca_apply_binary_threshold(input_dict): performance = input_dict['score'] thres = input_dict['bin_thres'] n = len(performance['predicted']) for i in range(n): if performance['predicted'][i] >= thres: performance['predicted'][i] = 1 else: performance['predicted'][i] = 0 output_dict = {} output_dict['classes'] = performance return output_dict def ca_rank_driven_binary_threshold_selection(input_dict): from collections import Counter performance = input_dict['score'] rate = input_dict['rate'] list_score = [] labels = '' n = len(performance['actual']) for i in range(n): list_score.append((performance['actual'][i],performance['predicted'][i])) output_dict = {} sorted_score = sorted(list_score, key=lambda scr: scr[1],reverse=True) rank = floor(n * (float(rate) / float(100))) current_rank = 0 previous = float('inf') current = previous for i in range(n): current = sorted_score[i][1] current_rank = current_rank + 1 if current_rank > rank: output_dict['bin_thres'] = (previous + current) / float(2) break previous = sorted_score[i][1] return output_dict def ca_optimal_binary_threshold_selection(input_dict): from collections import Counter performance = input_dict['score'] method = input_dict['method'] #print method list_score = [] labels = '' n = len(performance['actual']) for i in range(n): list_score.append((performance['actual'][i],performance['predicted'][i])) output_dict = {} sorted_score = sorted(list_score, key=lambda scr: scr[1],reverse=True) counter_neg = len([score for score in list_score if score[0] == 0]) #print counter_neg counter_pos = len([score for score in list_score if score[0] == 1]) #print counter_pos output_dict['bin_thres'] = find_best_roc_weight(method,sorted_score,counter_pos,counter_neg) return output_dict def find_best_roc_weight(method,a_list,a_num_positives,a_num_negatives): previous = float('inf') xpos = 0 xneg = a_num_negatives the_best_value = get_value(method,xpos,xneg,a_num_positives,a_num_negatives) best = previous for the_elt in a_list: the_roc = the_elt current = the_roc[1] if current != previous: possible_best_value = get_value(method,xpos,xneg,a_num_positives,a_num_negatives) print '%f > %f' %(possible_best_value,the_best_value) if possible_best_value > the_best_value: the_best_value = possible_best_value print '%f -> %f' %(best,(previous + current) / float(2)) best = (previous + current) / float(2) if the_roc[0] == 1: xpos += 1 else: xneg -= 1 previous = current; possible_best_value = get_value(method,xpos,xneg,a_num_positives,a_num_negatives) if possible_best_value > the_best_value: the_best_value = possible_best_value best = (previous + float('-inf')) / float(2) return best def get_value(method, TP, TN, P, N): if method == 'accuracy': accuracy = (TP + TN) / float(P+N) return accuracy elif method == 'balanced': balanced = ( TP / float(P) + TN / float(N)) / 2 return balanced FN = P - TP FP = N - TN recall = TP / float(P) if method == 'recall': return recall if TP + FP > 0: precision = TP / float(TP + FP) if method == 'precision': return precision if precision + recall > 0: F_measure = 2 * precision * recall / (precision + recall) else: F_measure = 0 else: F_measure = 0 return F_measure def Context_Bcalibration(input_dict): non_calibrated_scores=input_dict['non_calibrated_scores'] learner=input_dict['learner'] old_actual=non_calibrated_scores['actual'] old_predicted=non_calibrated_scores['predicted'] g2=sort_list2(old_predicted,old_actual) #print g2 Z=g2['actual'] #print len(Z) #print Z Z2=g2['predicted'] output_dict={} list3=[] for i in Z: list3.append(i) #print list3 L=search_for_position(list3) while non_decreasing(L)!=True: L=search_for_position(L) g2=sort_list2(old_predicted,old_actual) output_dict['builded_scores']= { 'noncalibrated_scr':Z2,'class':Z,'calibrated_scr':L} print output_dict return output_dict def Context_Acalibration(input_dict): from numpy import inf import math dict=input_dict['test_scores'] sc=input_dict['builded_scores'] X1=sc['calibrated_scr'] #print len(X1) #print dict['predicted'] k=0 X2=sc['noncalibrated_scr'] probs=[] list_scr=[] list_max=[] test_cls=dict['actual'] test_data=dict['predicted'] i=1 begin_score=[] end_score=[] begin_score.append(X2[0]) i=0 for item in X1[1:]: if (item != X1[i]) and (i<=size(X1)-1): begin_score.append(X2[i+1]) end_score.append(X2[i]) i+=1 end_score.append(X2[size(X2)-1]) max=0 i=0 aux =True #list_max.append(0) if 0 in X1: list_max.append(0) for j in X1: if (j>max): list_max.append(j) max=j for scr in dict['predicted']: k+=1 scr2=0 for i in range(len(begin_score)): if (scr >=begin_score[i] and scr<=end_score[i])or(scr==begin_score[i])or(scr==end_score[i]): scr2=list_max[i] elif scr>end_score[i]and scr