| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- #!/home/tflucke/bin/bin/python3
- from sklearn.model_selection import KFold
- import numpy as np
- import typing
- try:
- import sample
- except ImportError:
- import os, sys
- sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + \
- '/../feature-extractor')
- import sample
- DEFAULT_FEATURES = ["average_iat", "high.avg_burst_size", "high.burst_count"]
- def main(options: list):
- args = parse_args(options)
- try:
- import cPickle as pickle
- except:
- import pickle
- samples = pickle.load(args.features_file)
- features = args.feature if args.feature else DEFAULT_FEATURES
- from Vector import FeatureVector
- data, labels = map(np.array,
- zip(*[(FeatureVector(p, features).get(), p.user)
- for p in samples]))
- num_users = len(np.unique([s.user for s in samples]))
- s = np.arange(data.shape[0])
- np.random.shuffle(s)
- res, matrix = random_forest(data[s], labels[s], fn=args.criterion,
- n=args.folds, verbose=args.verbose,
- estimators=args.estimators)
- print("Overall Accuracy: %f" % np.average(res))
- if args.p_value:
- print("P-Value: %f" % p)
- if args.graph:
- import seaborn as sns
- from pandas import DataFrame
- from matplotlib import pyplot as plt
- label_list = list(map(lambda l: l[0:6], sorted(np.unique(labels))))
- dataset = DataFrame(matrix, columns=label_list, index=label_list)
- sns.set(font_scale=0.8)
- graph = sns.heatmap(data=dataset, annot=True, cbar=False)
- graph.set_xticklabels(graph.get_xticklabels(), rotation=50,
- horizontalalignment="right")
- plt.subplots_adjust(left=0.15, bottom=0.2)
- plt.ylabel('True Label')
- plt.xlabel('Predicted Label')
- plt.title('K-Nearest Neighbor Confusion Matrix')
- graph.get_figure().savefig("random-forest.png")
- def parse_args(args: list):
- import argparse
- parser = argparse.ArgumentParser(
- description='Run a data set through a Random Forest classifier.')
- parser.add_argument('features_file', type=argparse.FileType('rb'),
- help='File of extracted features.')
- parser.add_argument('-v', '--verbose', action="count", default=0,
- help='Show more information')
- parser.add_argument('-n', '--folds', type=int, default=5,
- help='Number of cross-validation folds (default: 5)')
- parser.add_argument('-e', '--estimators', type=int, default=100,
- help='Number of random decision trees (default: 100)')
- parser.add_argument('-c', '--criterion', choices=["gini", "entropy"],
- default="gini", help='Function to evaluate tree split \
- value (default: \"gini\")')
- parser.add_argument('-f', '--feature', action='append', type=str,
- help='Add feature to list of features to test with.')
- parser.add_argument('-p', '--p-value', action='store_const', default=False,
- const=True, help='Calculate a p-value from a t-test.')
- parser.add_argument('-g', '--graph', action="store_true",
- help='Generates a confusion matrix.')
- return parser.parse_args(args)
- def classify(data, labels, num_users: int, args):
- s = np.arange(data.shape[0])
- np.random.shuffle(s)
- res, _ = random_forest(data[s], labels[s],
- n=args.folds, verbose=args.verbose, fn=args.criterion,
- estimators=args.estimators)
- return (np.average(res), t_test(res, num_users)[1] / 2)
- def random_forest(data: list, labels: list, n=5, verbose=0, estimators=100,
- fn="gini"):
- from sklearn.ensemble import RandomForestClassifier
- from sklearn.metrics import confusion_matrix
- folds = KFold(n_splits=n)
- i = 1
- avg = 0
- accuracies = []
- output = []
- truth = []
- label_list = sorted(np.unique(labels))
- for train_index, test_index in folds.split(data):
- if verbose >= 1:
- print("Round %d:" % i)
- i += 1
- if verbose >= 2:
- print("Training on: ", train_index)
- rfc = RandomForestClassifier(n_estimators=estimators, criterion=fn)
- rfc.fit(data[train_index], labels[train_index])
- predictions = rfc.predict(data[test_index])
- output.extend(predictions)
- truth.extend(labels[test_index])
- accuracy = [a == p
- for a, p in zip(labels[test_index], predictions)
- ].count(True)/len(predictions)
- if verbose >= 1:
- print(accuracy)
- accuracies.append(accuracy)
- return (accuracies, confusion_matrix(truth, output, labels=label_list))
- def t_test(accuracy: list, num_users: int):
- from scipy import stats
- random_avg = 1.0/num_users
- res = stats.ttest_1samp(accuracy, random_avg, nan_policy="omit")
- # If all numbers are identical, p-value = 1
- return res if not np.isnan(res[0]) else (0, 1)
- if __name__ == '__main__':
- import sys
- main(sys.argv[1:])
|