已獨立成項目在github上面 dataformat,在進行hadoop測試時,需要造大量數據,例如某個表存在56列,但實際程序邏輯只適用到某幾列,我們造的數據 也只需要某幾列#this script change data from your source to the dest data format#2011-08-05 created version0.1#2011-10-29 add row-row mapping ,default row value .rebuild all functions. version0.2#next:add data auto generate by re expression#2011-12-17 add new functions, add timestamp creator. version0.3#2012-03-08 rebuild functions. version0.4#2012-06-22 add function to support multi output separators#2012-07-11 fix bug line 44,add if#2012-09-03 rebuild functions,add help msg! version0.5#2012-11-08 last version edited by lingyue.wkl# this py: https://github.com/wklken/pytools/blob/master/data_process/dataformat.py#read file and get each line without nreturn [line[:-1] for line in lines ]def one_line_proc(parts, total, ft_map, outsp, empty_fill, fill_with_sno):for i in range(1, total + 1): #加入使用默認值列 若是以d開頭,後面是默認,否則取文件對應列 done if fill_index.startswith("d"): outline.append(fill_index[1:]) outline.append(handler_specal_part(parts[int(fill_index) - 1])) outline.append(empty_fill)default_outsp = outsp.get(0,"t") result.append(outline[i]) result.append(outsp.get(i + 1, default_outsp))def process(inpath, total, to, outpath, insp, outsp, empty_fill, fill_with_sno, error_line_out): if r":" not in to_row and len(to_row.split(":")) == 2: used_row.append(int(to_row.split(":")[1])) if r"=" not in str(to_row) and len(str(to_row).split("=")) == 2: if r"=" not in str(to_row) and len(str(to_row).split("=")) == 2: ft_map.update({int(to_row.split("=")[0]): "d"+to_row.split("=")[1]}) elif r":" not in to_row and len(to_row.split(":")) == 2: ft_map.update({int(to_row.split(":")[0]): to_row.split(":")[1]}) for i in range(1, total + 1): ft_map.update({int(to_row): str(to_index)}) used_row.append(to_index)#setp3 處理輸出分隔符 outsp 0=t,1= 0代表默認的,其他前面帶列號的代表指定的if len(outsp) > 1 and len(outsp.split(",")) > 1: outsps = re.findall(r"d=.+?", outsp) k,v = outsp_kv.split("=") outsp.update({int(k): v})lines = read_file(inpath) if len(insp.split("|")) > 0: parts = re.split(insp, line) if len(parts) >= in_count: outline = one_line_proc(parts, total, ft_map, outsp, empty_fill, fill_with_sno) result.append(outline + "n") result.append(line + "n")#特殊的處理入口,處理維度為每一行,目前只有時間處理def handler_specal_part(part_str):if part_str.startswith("TS") and "=" in part_str: ts_format = {8: "%Y%m%d", #step1 確認輸出的格式 TS8 TS10 TS14 TS19 to_l = int(part_str[2:part_str.index("=")]) part_str = part_str.split("=")[1].strip() inputdate = part_str.split("+")[0].strip() interval = int(part_str.split("+")[1].strip()) parts = part_str.split("-") if len(parts) == 2: #20101020 - XX inputdate = parts[0].strip() interval = -int(parts[1].strip()) elif len(parts) == 3: #2010-10-20 elif len(parts) == 4: #2010-10-20 - XX inputdate = "-".join(parts[:-1]) interval = -int(parts[-1]) inputdate = part_str.strip() part_str = get_timestamp(inputdate, ts_format, interval) #step4 如果定義了輸出格式,轉換成目標格式,返回 part_str = time.strftime(ts_format.get(to_l), time.localtime(int(part_str)))def get_timestamp(inputdate, ts_format, interval=0): inputdate = time.strftime("%Y%m%d%H%M%S")inputdate = inputdate.strip() ts = time.strptime(inputdate, ts_format.get(size)) print "the input date and time expression error,only allow 'YYYYmmdd[HHMMSS]' or 'YYYY-MM-DD HH:MM:SS' " print "the input date and time expression error,only allow 'YYYYmmdd[HHMMSS]' or 'YYYY-MM-DD HH:MM:SS' "return str(int(time.mktime(ts)) + interval)print("功能:原數據文件轉為目標數據格式")print("t -i inputfilepath [必輸,input, 原文件路徑]")print("t -t n [必輸,total, n為數字,目標數據總的域個數]")print("t -a '1,3,4' [必輸,array, 域編號字符串,逗號分隔。指定域用原數據欄位填充,未指定用'0'填充]")print("t -a '3,5=abc,6:2' 第5列默認值abc填充,第6列使用輸入的第1列填充,第3列使用輸入第1列填充")print("t -o outputfilepath [可選,output, 默認為 inputfilepath.dist ]")print("t -F 'FS' [可選,field Sep,原文件域分隔符,默認為\t,支持多分隔符,eg.'t|||' ]")print("t -P 'OFS' [可選,out FS,輸出文件的域分隔符,默認為\t,可指定多個,多個需指定序號=分隔符,逗號分隔,默認分隔符序號0 ]")print("t -f 'fill_str' [可選,fill,未選列的填充值,默認為空 ]")print("t -s [可選,serial number,當配置時,-f無效,使用列號填充未指派的列]")print("t -e [可選,error, 源文件列切分不一致行/空行/注釋等,會被直接輸出,正確行按原邏輯處理]")def must_be_defined(param, map, error_info): opts,args = getopt.getopt(sys.argv[1:],"F:P:t:a:i:o:f:hse") if op in ("-h", "-H", "--help"): insp = value.decode("string_escape") outsp = value.decode("string_escape") print(sys.argv[0]+" : the amount of params must great equal than 3") print("Command : ./dataformat.py -h")except getopt.GetoptError: print(sys.argv[0]+" : params are not defined well!") print("Command : ./dataformat.py -h")must_be_defined('inpath', params_map, sys.argv[0]+" : -i param is needed,input file path must define!")must_be_defined('total', params_map, sys.argv[0]+" : -t param is needed,the fields of result file must define!")must_be_defined('to', params_map, sys.argv[0]+" : -a param is needed,must assign the field to put !")if not os.path.exists(inpath): print(sys.argv[0]+" file : %s is not exists"%inpath)if 'outpath' not in dir():process(inpath, total, to, outpath, insp, outsp, empty_fill, fill_with_sno, error_line_out)if __name__ =="__main__":功能:可指定輸入分隔,輸出分隔,無配置欄位填充,某列默認值,可按順序填充,也可亂序映射填充./dataformat.py –i in_file –t 65 -a 「22,39,63」 –F 「^I」 –P 「^A」 –f 「0」in_file中欄位是以t分隔的[可不配-F,使用默認]。將in_file的第1,2,3列分別填充到in_file.dist[use default]的第22,39,63列in_file.dist共65列,以^A分隔,未配置列以0填充-a中順序與源文件列序有關,若-a 「39,22,63」 則是將第1列填充到第39列,第二列填充到22列,第3列填充到63列
【需要對某些列填充相同的值,但不想在源文件中維護】
./dataformat.py -i in_file –t 30 –a 「3=tag_1,9,7,12=0.0」 –o out_filein_file以t分隔,輸出out_file以t分隔將in_file的第1列,第2列填充到out_file的第9列,第7列out_file共30列,第3列均用字符串」tag_1」填充,第12列用0.0填充,其他未配置列為空注意:默認值 的取值,若是使用到等號和冒號,需轉義,加 = :./dataformat.py –i in_file –t 56 –a 「3:2,9,5:3,1=abc,11」目標文件第3列用輸入文件第2列填充,目標文件第5列用輸入文件第3列填充目標文件第9列用輸入文件第1列填充,第11列用輸入文件第4列填充【未配置映射,使用從頭開始還沒有被用過的列】腳本會對簡單的欄位數量等映射邏輯進行檢測,複雜最好全配上,使用默認太抽象本文連結:http://python.jobbole.com/83447/
點擊文章底部閱讀原文,查看CDA數據分析師認證考試考綱解析和報名流程。
回復關鍵字 看往期精彩~
1001 ☛ 一分鐘讀懂2015中國數據分析師行業峰會!
1002 ☛ 吳喜之:數據分析和數據挖掘是最大的求職法寶
1003 ☛ 33道Hadoop面試題,看看你能答對多少?(答案在後面)
1004 ☛ 成為首席數據官是一種什麼樣的體驗?
1005 ☛ 超能教程 十分鐘學會 Python!