import csv import argparse from datetime import datetime from io import StringIO from operator import add, itemgetter from pyspark import SparkConf,SparkContext def split(line): reader = csv.reader(StringIO(line)) return reader.next() def toCSV(data): for eventName, count in data: yield eventName+','+str(count) def main(sc): parser = argparse.ArgumentParser() parser.add_argument("--inputfile",help="Full path of input file to parse") parser.add_argument("--outputpath",help="Path to generate output file. Should end with '/'") args = parser.parse_args() if args.inputfile: inputfile = args.inputfile if args.outputpath: outputpath = args.outputpath eventNames = sc.textFile(inputfile).map(split).map(lambda row:(row[3],1)) eventCounts = eventNames.reduceByKey(add).collect() eventCountSorted = sorted(eventCounts, key=itemgetter(1)) sortedRdd = sc.parallelize(toCSV(eventCountSorted)) #sortedRdd.saveAsTextFile('logdataout.csv') sortedRdd.saveAsTextFile(outputpath+str(datetime.now().month)+str(datetime.now().day)+str(datetime.now().year)+'.csv') if __name__=='__main__': conf = SparkConf().setAppName('EventLogParser') sc = SparkContext(conf=conf) main(sc)