运行效果:
数据处理前
+---+------+---+---+---+-----+---+| a|family| i| m| n|wsdtf|wtf|+---+------+---+---+---+-----+---+| ad| CPU| h| s| t| h| h||ad1| GPU| h| d| y| c| b||ad2|Memory| h| s| u| b| b||ad2| SSD| 6| s| i| h| h|| ad| HHD| 6| d| i| c| b|+---+------+---+---+---+-----+---+
数据处理后:
+---+---+------+---+---+---+-----+-------+-------+---+---+-------+---+---+-----+| a|n_y|family|i_h|n_t|n_u|wtf_b|wsdtf_b|wsdtf_c|m_d|n_i|wsdtf_h|m_s|i_6|wtf_h|+---+---+------+---+---+---+-----+-------+-------+---+---+-------+---+---+-----+| ad| 0| CPU| 1| 1| 0| 0| 0| 0| 0| 0| 1| 1| 0| 1||ad1| 1| GPU| 1| 0| 0| 1| 0| 1| 1| 0| 0| 0| 0| 0||ad2| 0|Memory| 1| 0| 1| 1| 1| 0| 0| 0| 0| 1| 0| 0||ad2| 0| SSD| 0| 0| 0| 0| 0| 0| 0| 1| 1| 1| 1| 1|| ad| 0| HHD| 0| 0| 0| 1| 0| 1| 1| 1| 0| 0| 1| 0|+---+---+------+---+---+---+-----+-------+-------+---+---+-------+---+---+-----+
用于把每列中每种标签生成1和0形式的feature,供机器学习训练使用:
if __name__ == '__main__': conf = SparkConf() conf = conf.setMaster("local[*]") conf = conf.set('spark.sql.warehouse.dir', 'file:///d:/tmp') spark = SparkSession.builder.appName("Test PredictionTool").config(conf=conf).getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") df_p = pd.DataFrame({"a": ["ad", "ad1", "ad2", "ad2", "ad"], "n": ["t", "y", "u", "i", "i"], "m": ["s", "d", "s", "s", "d"], "i": ["h", "h", "h", "6", "6"], "wtf": ["h", "b", "b", "h", "b"], "wsdtf": ["h", "c", "b", "h", "c"], "family": ["CPU", "GPU", "Memory", "SSD", "HHD"]}) df = spark.createDataFrame(df_p) def row_to_dict(row): dict_row = row.asDict() for k in dict_row: v = dict_row[k] s = set() s.add(v) dict_row[k] = s return dict_row def reduce_dict(dict_r_1, dict_r_2): dict_result = dict() for k1 in dict_r_1: s1 = dict_r_1[k1] if k1 in dict_r_2: s2 = dict_r_2[k1] assert isinstance(s1, set) assert isinstance(s2, set) s = set(s1 | s2) dict_result[k1] = s else: dict_result[k1] = s1 for k2 in dict_r_2: if k2 not in dict_r_1: s2 = dict_r_2[k2] dict_result[k2] = s2 return dict_result dict_diff_row = df.rdd.map(lambda r: row_to_dict(r)).reduce(lambda a, b: reduce_dict(a, b)) print dict_diff_row def map_to_extend_row(row, dict_diff_row, except_col_list=None): dict_row = row.asDict() dict_result = dict() for k in dict_diff_row: if except_col_list is not None: assert isinstance(except_col_list, list) if k in except_col_list: dict_result[k] = dict_row[k] else: if k in dict_row: data_set = dict_diff_row[k] # r1 -> a,b,c target_value = dict_row[k] assert isinstance(data_set, set) for col in data_set: row_name = k + "_" + col if col == target_value: dict_result[row_name] = 1 else: dict_result[row_name] = 0 else: data_set = dict_diff_row[k] # r1 -> a,b,c for col in data_set: row_name = k + "_" + col dict_result[row_name] = 0 columns = dict_result.keys() v = dict_result.values() row_o = Row(*columns) return row_o(*v) df.show() df.rdd.map(lambda r: map_to_extend_row(r, dict_diff_row, ["family", "a"])).toDF().show()