Browse Source

Added the ability to have an intermediary in distributed execution.

Improves performance by keeping 1 python session per connection and allows
for data to be compressed as it's written.  Also improves accuracy by running
multiple times and keeping the middle run.
Tom Flucke 6 years ago
parent
commit
5e1a13580f

+ 3 - 1
src/classifiers/nearestneighbors.py

@@ -148,7 +148,9 @@ def find_in_predictions(probabilities: list, tests: int, labels: list):
 def t_test(accuracy: list, num_users: int):
     from scipy import stats
     random_avg = 1.0/num_users
-    return stats.ttest_1samp(accuracy, random_avg)
+    res = stats.ttest_1samp(accuracy, random_avg, nan_policy="omit")
+    # If all numbers are identical, p-value = 1
+    return res if not np.isnan(res[0]) else (0, 1)
 
 
 if __name__ == '__main__':

+ 78 - 24
src/distributer/distribute.sh

@@ -3,15 +3,12 @@
 readonly DEFAULT_OUT_FMT="%s.out"
 readonly CMD_FEED="$(mktemp -u /tmp/distributer-XXX.fifo)"
 readonly PROC_BUFFER=10
-readonly MAX_PROCS=$(expr $(ulimit -u) / 3 - $PROC_BUFFER)
-readonly MAX_CMD_SIZE=350
+readonly PROCS_PER_SERVER=6
+readonly MAX_PROCS=$(expr $(ulimit -u) / $PROCS_PER_SERVER - $PROC_BUFFER)
+readonly MAX_CMD_SIZE=512
 readonly READER="$(dirname $(realpath $0))/fixed-read"
 readonly TIMEOUT=600 # 10 minutes
 
-readonly CONF_LIST="$1"
-readonly SERVER_LIST="$2"
-readonly OUT_FMT="${3:-$DEFAULT_OUT_FMT}"
-
 readonly MK_DIR_CMD="if [ ! -d \"$(dirname "$OUT_FMT")\" ]; then
                         mkdir -p \"$(dirname "$OUT_FMT")\" > /dev/null
                      fi"
@@ -21,16 +18,21 @@ readonly R_DASH="s/-//g"
 readonly S_QUOTE="s/'/\\\\'/g"
 
 help() {
-    printf "Usage: $(basename $0) cmd_list server_list [out_file_fmt]\n" >&2
+    printf "Usage: $(basename $0) [options] cmd_list server_list [out_file_fmt]\n" >&2
     printf "    cmd_list: A text file containing a list of commands to run.\n" >&2
     printf "    server_list: A text file containing a list of servers to\n" >&2
     printf "    connect to and run commands on.\n" >&2
     printf "    out_file_fmt: File name format to write the output of each\n" >&2
     printf "    command to (default: %s).\n\n" "$DEFAULT_OUT_FMT" >&2
+    printf "Options:.\n" >&2
+    printf "    -h -?:        Print this help mesage and exit. \n" >&2
+    printf "    -c processor: If present, instead of directly executing cmd_list,\n" >&2
+    printf "    will launch a processor on each server and distribute cmds to the\n" >&2
+    printf "    processors.\n\n" >&2
     printf "All commands will be allocated to the first available server.\n" >&2
     printf "Each command must be valid on every server.\n" >&2
     printf "The output will be saved to a text file on the remote systems.\n" >&2
-    exit 1
+    exit $1
 }
 
 clean_server() {
@@ -39,30 +41,61 @@ clean_server() {
     exit 2
 }
 
-run_server() {
+run_processor() {
     server="$1"
-    loop="$(mktemp -u /tmp/$(basename $CMD_FEED .fifo)-$server-XXX.fifo)"
-    out_file=$(printf "$OUT_FMT" "$(basename "$loop" .fifo)")
-    mkfifo "$loop"
-    trap "clean_server $loop" 2 15
+    loop="$2"
+    out_file="$3"
+    {
+        # Test that ssh is ready to receive.
+        printf "echo\n"
+        read -t $TIMEOUT line < "$loop"
+        printf "Running processor: '$PROCESSOR'\n" "$out_file" >&2
+        printf "$PROCESSOR\n" "$out_file"
+        # Block until command completes
+        while read -t $TIMEOUT line < "$loop" > /dev/null && \
+            cmd=$("$READER" $MAX_CMD_SIZE)
+        do
+            cmd=$(printf "%s" "$cmd" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')
+            printf "%s\n" "$cmd" >&2
+            printf "%s\n" "$cmd"
+        done
+    } | ssh -oBatchMode=yes -oStrictHostKeyChecking=no "$server" "sh" > "$loop"
+}
+
+run_cmd_list() {
+    server="$1"
+    loop="$2"
+    out_file="$3"
     {
-        printf "echo > %s\necho\n" "$out_file"
+        # Test that ssh is ready to receive.
+        printf "echo\n"
         # Block until command completes
         while read -t $TIMEOUT line < "$loop" > /dev/null && \
             cmd=$("$READER" $MAX_CMD_SIZE)
         do
             cmd=$(printf "%s" "$cmd" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//')
-            #cmd_sanitized="$(echo $cmd | sed "$S_SPACE;$R_DASH;$S_QUOTE")"
-            #out_file="$(printf "$OUT_FMT" $cmd_sanitized)"
-            printf "$server: %s\n" "$cmd" >&2
-            #printf "%s > %s\necho\n" "$cmd" "$out_file"
+            printf "%s\n" "$cmd" >&2
             printf "printf \"%s: \" >> %s\n" "$cmd"  "$out_file"
             printf "%s >> %s\n" "$cmd"  "$out_file"
             printf "echo\n"
         done
     } | ssh -oBatchMode=yes -oStrictHostKeyChecking=no "$server" "sh" > "$loop"
+}
+
+run_server() {
+    server="$1"
+    loop="$(mktemp -u /tmp/$(basename $CMD_FEED .fifo)-$server-XXX.fifo)"
+    out_file=$(printf "$OUT_FMT" "$(basename "$loop" .fifo)")
+    mkfifo "$loop"
+    trap "clean_server $loop" 2 15
+    if [ -n "$PROCESSOR" ]; then
+        run_processor "$server" "$loop" "$out_file"
+    else
+        run_cmd_list "$server" "$loop" "$out_file"
+    fi # > "$loop"
+    #| ssh -oBatchMode=yes -oStrictHostKeyChecking=no "$server" "sh" > "$loop"
     clean_server "$loop"
-    echo "Server '$server' finished!" >&2
+    echo "Finished!" >&2
 }
 
 clean_up() {
@@ -73,13 +106,19 @@ clean_up() {
     exit 2
 }
 
+prepend() {
+    while read -r line; do
+        printf "%s: %s\n" "$1" "$line"
+    done
+}
+
 main() {
     mkfifo "$CMD_FEED"
     trap clean_up 2 15
     cat "$CONF_LIST" | sed '/^[[:space:]]*$/d' | \
         xargs -d'\n' printf "%-$MAX_CMD_SIZE.${MAX_CMD_SIZE}s" > "$CMD_FEED" &
     for server in $(head -n$MAX_PROCS "$SERVER_LIST"); do
-        run_server "$server" < "$CMD_FEED" > /dev/null &
+        run_server "$server" < "$CMD_FEED" 2>&1 | prepend "$server" &
     done
     for pid in $(pgrep -P $$); do
         wait $pid
@@ -88,8 +127,23 @@ main() {
     echo "All jobs finished!"
 }
 
-if [ "$#" -eq 2 -o "$#" -eq 3 ]; then
-    main
-else
-    help
+OPTIND=1
+while getopts "h?c:" opt; do
+    case "$opt" in
+        h|\?)
+            help 0
+            ;;
+        c)  readonly PROCESSOR="$OPTARG"
+            ;;
+        *)  help 1
+            ;;
+    esac
+done
+shift $(expr $OPTIND - 1)
+if [ "$#" -lt 2 -o "$#" -gt 3 ]; then
+    help 1
 fi
+readonly CONF_LIST="$1"
+readonly SERVER_LIST="$2"
+readonly OUT_FMT="${3:-$DEFAULT_OUT_FMT}"
+main

+ 72 - 0
src/distributer/distribute_compressor.py

@@ -0,0 +1,72 @@
+#!/home/tflucke/bin/bin/python3
+
+import os, sys, struct, typing, numpy as np
+sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + \
+                    '/../classifiers/')
+from Vector import FeatureVector
+try:
+    import compress_pickle
+except ImportError:
+    try:
+        import cPickle as compress_pickle
+    except ImportError:
+        import pickle as compress_pickle
+
+USHRT_MAX=(1 << 16 - 1)
+
+def main(options: list):
+    args = parse_args(options)
+    if args.classifier == "nearest-neighbor":
+        import nearestneighbors as classifier
+    if args.final_statistic == "median":
+        avg_fn = np.median
+    else:
+        avg_fn = np.average
+    print("Ready for input!", flush=True)
+    for line in sys.stdin:
+        options = classifier.parse_args(line.split())
+        samples = compress_pickle.load(options.features_file, compression=None)
+        num_users = len(np.unique([s.user for s in samples]))
+        if num_users < args.min_users:
+            print("%s: Too few users.  Skipping..." % args.features_file.name, file=sys.stderr, flush=True)
+        else:
+            features = options.feature if options.feature else classifier.DEFAULT_FEATURES
+            data, labels = map(np.array,
+                               zip(*[(FeatureVector(p, features).get(), p.user)
+                                     for p in samples]))
+            runs = [classifier.classify(data, labels, num_users, options) for i in range(0, args.reruns)]
+            #print(runs, file=sys.stderr)
+            write_to_file(args.out_file, args.compression, options, avg_fn(runs, 0))
+            print("Finished: '%s'" % line.strip(), flush=True)
+
+def write_to_file(out: typing.BinaryIO, compression: str, options: list, results: (float, float)):
+    filename = options.features_file.name
+    del options.features_file
+    #print("Args: %s; results: %s" % (options, results), file=sys.stderr)
+    res_packed = struct.pack("@HH", *[int(USHRT_MAX*v) for v in results])
+    compress_pickle.dump((filename, options, res_packed), out, compression=compression)
+
+def read_from_file(in_file: typing.BinaryIO, compression: str = None):
+    res = compress_pickle.load(in_file, compression = compression)
+    return (res[0], res[1], tuple(float(v)/USHRT_MAX for v in struct.unpack("@HH", res[1])))
+
+def parse_args(args: list):
+    import argparse
+    parser = argparse.ArgumentParser(description='Run a series of tests and compress the output.')
+    parser.add_argument('classifier', choices=["nearest-neighbor"], help='Classifier to use.')
+    parser.add_argument('out_file', type=argparse.FileType('wb'), help='Output file name.')
+    parser.add_argument('-v', '--verbose', action="count", default=0,
+                        help='Show more information')
+    parser.add_argument('-m', '--min-users', type=int, default=10,
+                        help='Minimum number of unique users to consider a sample\
+                        file valid. (default: 10)')
+    parser.add_argument('-r', '--reruns', type=int, default=3,
+                        help='Number of times to rerun a sample set. (default: 3)')
+    parser.add_argument('-f', '--final-statistic', choices=["mean", "median"], default="median",
+                        help='Final statistic to show. (default: median)')
+    parser.add_argument('-c', '--compression', choices=["bz2", "gzip", "lzma", "zipfile", None], default="bz2",
+                        help='Compression algorithm to use. (default: bzip2)')
+    return parser.parse_args(args)
+
+if __name__ == '__main__':
+    main(sys.argv[1:])