PySpark XML到JSON w/时间序列数据

时间:2022-06-22 02:31:45

I have almost half a million XML files containing time series data that are about ~2-3MB each and contains around 10k rows of time series data per file. The idea is to convert the XML files into JSON for each unique ID. However, the time series data for each ID needs to be broken down into batches of row size 10 and converted to JSON and written to NoSQL database. Initially, the code was written to iterate over one monolithic dataframe for each ID and increment by row size 10 and then write document to db.

我有大约50万个包含时间序列数据的XML文件,每个文件大约2-3MB,每个文件包含大约10k行时间序列数据。我们的想法是将XML文件转换为每个唯一ID的JSON,但是每个ID的时间序列数据需要被分解成10行大小的批次,并转换为JSON并写入NoSQL数据库。最初,编写这段代码是为了对每个ID遍历一个单片数据aframe,并按行大小增加10,然后将文档写到db。

def resample_idx(X,resample_rate):
    for idx in range(0,len(X),resample_rate):
        yield X.iloc[idx:idx+resample_rate,:]

# Batch Documents 
    for idx, df_batch in enumerate(resample_idx(df,10))
        dict_ = {}
        dict_['id'] = soup.find('id').contents[0]
        dict_['data'] = [v for k,v in pd.DataFrame.to_dict(df_batch.T).items()]

A sample of the JSON document looks as follows:

JSON文档的示例如下:

{'id':123456A,
'data': [{'A': 251.23,
          'B': 130.56,
          'dtim': Timestamp('2011-03-24 11:18:13.350000')
         },
         {
          'A': 253.23,
          'B': 140.56,
          'dtim': Timestamp('2011-03-24 11:19:21.310000')
         },
         .........
        ]
},
{'id':123593X,
'data': [{'A': 641.13,
          'B': 220.51,
          'C': 10.45
          'dtim': Timestamp('2011-03-26 12:11:13.350000')
         },
         {
          'A': 153.25,
          'B': 810.16,
          'C': 12.5
          'dtim': Timestamp('2011-03-26 12:11:13.310000')
         },
         .........
        ]
}

This works fine for a small sample, but quickly realizing this will not scale when creating the batches. Therefore, looking to replicate this in Spark. Limited experience w/ Spark, but here's what I've attempted to far:

对于一个小的示例来说,这是可行的,但是很快就会意识到这在创建批时不会扩展。因此,希望在Spark中复制这个。w/ Spark经验有限,但以下是我迄今为止所尝试的:

First get all of time series data for all IDs:

首先获取所有id的所有时间序列数据:

df = sqlContext.read.format("com.databricks.spark.xml").options(rowTag='log').load("dbfs:/mnt/timedata/")

XML Schema

XML模式

 |-- _id: string (nullable = true)   
 |-- collect_list(TimeData): array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- data: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- ColNames: string (nullable = true)
 |    |    |-- Units: string (nullable = true)

SQL Query to get Spark DataFrame d = df.select("_id","TimeData.data",'TimeData.ColNames')

获取Spark DataFrame d = df.select(“_id”、“TimeData.data”、“TimeData.ColNames”)的SQL查询

Current Spark DataFrame

当前火花DataFrame

+--------------------+--------------------+--------------------+
|                id  |                data|            ColNames|
+--------------------+--------------------+--------------------+
|123456A             |[2011-03-24 11:18...|dTim,A,B            |
|123456A             |[2011-03-24 11:19...|dTim,A,B            |
|123593X             |[2011-03-26 12:11...|dTim,A,B,C          |
|123593X             |[2011-03-26 12:11...|dTim,A,B,C          |
+--------------------+--------------------+--------------------+

Expected Spark DataFrame

预期引发DataFrame

+--------------------+--------------------+----------+----------+
|                id  |               dTime|         A|         B|
+--------------------+--------------------+----------+----------+
|123456A             |2011-03-24 11:18... |    251.23|    130.56|
|123456A             |2011-03-24 11:19... |    253.23|    140.56|
+--------------------+--------------------+----------+----------+

+--------------------+--------------------+----------+----------+----------+
|                id  |               dTime|         A|         B|         C|
+--------------------+--------------------+----------+----------+----------+
|123593X             |2011-03-26 12:11... |    641.13|    220.51|     10.45|
|123593X             |2011-03-26 12:11... |    153.25|    810.16|      12.5|
+--------------------+-------------------+---------- +----------+----------+

I only showed data for two timestamps here, but how could I take the DataFrame above and turn into batch JSON files for every nth row (for each id), similarly to how it was done using Pandas shown above? Initially thoughts were performing a groupBy and apply a UDF to each ID? The output would look like the JSON structure above.

我在这里只显示了两个时间戳的数据,但是我如何使用上面的DataFrame并为每第n行(每个id)转换成批处理JSON文件,类似于上面显示的使用熊猫的方式?最初的想法是对每个ID执行groupBy并应用UDF。输出将类似于上面的JSON结构。

XML Structure:

XML结构:

<log>
   <id>"ABC"</id>
   <TimeData>
      <colNames>dTim,colA,colB,colC,</colNames>
      <data>2011-03-24T11:18:13.350Z,0.139,38.988,0,110.307</data>
      <data>2011-03-24T11:18:43.897Z,0.138,39.017,0,110.307</data>
  </TimeData>
</log>

Note that each ID does not have a fixed number of coNames and that can range between 5-30, depending on the data sources collected for that ID.

请注意,每个ID都没有固定数量的coNames,范围在5-30之间,这取决于为该ID收集的数据源。

2 个解决方案

#1


1  

Well based on the information this could be a solution. Unfortunately my Python is a bit rusty, but there should be equivalents for all the scala functions here

根据这些信息,这是一个解决方案。不幸的是,我的Python有点生疏了,但是这里所有的scala函数都应该有相同的东西

// Assume nth is based of dTim ordering
val windowSpec = Window
  .partitionBy($"_id")
  .orderBy($"dTim".desc)

val nthRow  = 2  // define the nthItem to be fetched

df.select(
  $"_id",
  $"TimeData.data".getItem(0).getItem(0).cast(TimestampType).alias("dTim"),
  $"TimeData.data".getItem(0).getItem(1).cast(DoubleType).alias("A"),
  $"TimeData.data".getItem(0).getItem(2).cast(DoubleType).alias("B"),
  $"TimeData.data".getItem(0).getItem(3).cast(DoubleType).alias("C")
).withColumn("n", row_number().over(windowSpec))
  .filter(col("n") === nthRow)
  .drop("n")
.show()

Will output something like

将输出之类的

+-------+--------------------+------+------+-----+
|    _id|                dTim|     A|     B|    C|
+-------+--------------------+------+------+-----+
|123456A|2011-03-24 11:18:...|251.23|130.56| null|
|123593X|2011-03-26 12:11:...|641.13|220.51|10.45|
+-------+--------------------+------+------+-----+

I'll improve the answer if I know a bit more

如果我知道得多一点,我就改进答案


Update

更新

I liked the puzzle, so if I understand the problem correctly, this could be a solution:

我喜欢这个难题,所以如果我正确地理解了这个问题,这可能是一个解决方案:

I've created 3 xml files with each 2 data records with 2 different ids in total

我已经创建了3个xml文件,每个数据记录有2个不同的id。

val df = spark
  .sqlContext
  .read
  .format("com.databricks.spark.xml")
  .option("rowTag", "log")
  .load("src/main/resources/xml")


// Could be computationally heavy, maybe cache df first if possible, otherwise run it on a sample, otherwise hardcode possible colums
val colNames = df
  .select(explode(split($"TimeData.colNames",",")).as("col"))
  .distinct()
  .filter($"col" =!= lit("dTim") && $"col" =!= "")
  .collect()
  .map(_.getString(0))
  .toList
  .sorted

// or list all possible columns
//val colNames = List("colA", "colB", "colC")


// Based on XML colNames and data are comma seprated strings that have to be split. Could be done using sql split function, but this UDF maps the columns to the correct field
def mapColsToData = udf((cols:String, data:Seq[String]) =>
  if(cols == null || data == null) Seq.empty[Map[String, String]]
  else {
    data.map(str => (cols.split(",") zip str.split(",")).toMap)
  }
)

//  The result of this action is 1 record for each datapoint for all XML's. Each data record is key->value map of colName->data
val denorm = df.select($"id", explode(mapColsToData($"TimeData.colNames", $"TimeData.data")).as("data"))

denorm.show(false)

Output:

输出:

+-------+-------------------------------------------------------------------------------+
|id     |data                                                                           |
+-------+-------------------------------------------------------------------------------+
|123456A|Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0)|
|123593X|Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)           |
|123593X|Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017)           |
|123456A|Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0)|
+-------+-------------------------------------------------------------------------------+
// now create column for each map value, based on predef / found columnNames
val columized = denorm.select(
  $"id",
  $"data.dTim".cast(TimestampType).alias("dTim"),
  $"data"
)

columized.show()

Output:

输出:

+-------+--------------------+--------------------+
|     id|                dTim|                data|
+-------+--------------------+--------------------+
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
+-------+--------------------+--------------------+
// create window over which to resample
val windowSpec = Window
  .partitionBy($"id")
  .orderBy($"dTim".desc)

val resampleRate = 2

// add batchId based on resample rate. Group by batch and
val batched = columized
  .withColumn("batchId", floor((row_number().over(windowSpec) - lit(1)) / lit(resampleRate)))
  .groupBy($"id", $"batchId")
  .agg(collect_list($"data").as("data"))
  .drop("batchId")

batched.show(false)

Output:

输出:

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id     |data                                                                                                                                                              |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123593X|[Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017), Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)]                      |
|123456A|[Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0), Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)]|
|123456A|[Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0), Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)]|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// Store as 1 huge json file (drop reapatrition if you can handle multiple json, better for master as well)
batched.repartition(1).write.mode(SaveMode.Overwrite).json("/tmp/xml")

Output json:

json输出:

{"id":"123593X","data":[{"dTim":"2011-03-26T12:20:43.897+01:00","colA":"1.138","colB":"29.017"},{"dTim":"2011-03-26T12:20:13.350+01:00","colA":"1.139","colB":"28.988"}]}
{"id":"123456A","data":[{"dTim":"2011-03-27T13:18:43.897+02:00","colA":"0.128","colB":"35.017","colC":"0"},{"dTim":"2011-03-27T13:18:13.350+02:00","colA":"0.129","colB":"35.988","colC":"0"}]}
{"id":"123456A","data":[{"dTim":"2011-03-24T12:18:43.897+01:00","colA":"0.138","colB":"39.017","colC":"0"},{"dTim":"2011-03-24T12:18:13.350+01:00","colA":"0.139","colB":"38.988","colC":"0"}]}

#2


1  

Here is another way that does not depend on hard-coded column names. Basically, the idea is to explode the data and ColNames columns to get a 'melted' DF, which we can then pivot to get the form you want:

这是另一种不依赖硬编码的列名的方法。基本上,我们的想法是将数据和ColNames列拆开,得到一个“融化”的DF,然后我们可以将其转向,得到您想要的形式:

# define function that processes elements of rdd
# underlying the DF to get a melted RDD
def process(row, cols):
    """cols is list of target columns to explode"""
    row=row.asDict()
    exploded=[[row['id']]+list(elt) for elt in zip(*[row[col] for col in cols])]    
    return(exploded)


#Now split ColNames:
df=df.withColumn('col_split', f.split('ColNames',","))

# define target cols to explode, each element of each col 
# can be of different length
cols=['data', 'col_split']

# apply function and flatmap the results to get melted RDD/DF
df=df.select(['id']+cols).rdd\
    .flatMap(lambda row: process(row, cols))\
    .toDF(schema=['id', 'value', 'name'])

# Pivot to get the required form
df.groupby('id').pivot('name').agg(f.max('value')).show()

#1


1  

Well based on the information this could be a solution. Unfortunately my Python is a bit rusty, but there should be equivalents for all the scala functions here

根据这些信息,这是一个解决方案。不幸的是,我的Python有点生疏了,但是这里所有的scala函数都应该有相同的东西

// Assume nth is based of dTim ordering
val windowSpec = Window
  .partitionBy($"_id")
  .orderBy($"dTim".desc)

val nthRow  = 2  // define the nthItem to be fetched

df.select(
  $"_id",
  $"TimeData.data".getItem(0).getItem(0).cast(TimestampType).alias("dTim"),
  $"TimeData.data".getItem(0).getItem(1).cast(DoubleType).alias("A"),
  $"TimeData.data".getItem(0).getItem(2).cast(DoubleType).alias("B"),
  $"TimeData.data".getItem(0).getItem(3).cast(DoubleType).alias("C")
).withColumn("n", row_number().over(windowSpec))
  .filter(col("n") === nthRow)
  .drop("n")
.show()

Will output something like

将输出之类的

+-------+--------------------+------+------+-----+
|    _id|                dTim|     A|     B|    C|
+-------+--------------------+------+------+-----+
|123456A|2011-03-24 11:18:...|251.23|130.56| null|
|123593X|2011-03-26 12:11:...|641.13|220.51|10.45|
+-------+--------------------+------+------+-----+

I'll improve the answer if I know a bit more

如果我知道得多一点,我就改进答案


Update

更新

I liked the puzzle, so if I understand the problem correctly, this could be a solution:

我喜欢这个难题,所以如果我正确地理解了这个问题,这可能是一个解决方案:

I've created 3 xml files with each 2 data records with 2 different ids in total

我已经创建了3个xml文件,每个数据记录有2个不同的id。

val df = spark
  .sqlContext
  .read
  .format("com.databricks.spark.xml")
  .option("rowTag", "log")
  .load("src/main/resources/xml")


// Could be computationally heavy, maybe cache df first if possible, otherwise run it on a sample, otherwise hardcode possible colums
val colNames = df
  .select(explode(split($"TimeData.colNames",",")).as("col"))
  .distinct()
  .filter($"col" =!= lit("dTim") && $"col" =!= "")
  .collect()
  .map(_.getString(0))
  .toList
  .sorted

// or list all possible columns
//val colNames = List("colA", "colB", "colC")


// Based on XML colNames and data are comma seprated strings that have to be split. Could be done using sql split function, but this UDF maps the columns to the correct field
def mapColsToData = udf((cols:String, data:Seq[String]) =>
  if(cols == null || data == null) Seq.empty[Map[String, String]]
  else {
    data.map(str => (cols.split(",") zip str.split(",")).toMap)
  }
)

//  The result of this action is 1 record for each datapoint for all XML's. Each data record is key->value map of colName->data
val denorm = df.select($"id", explode(mapColsToData($"TimeData.colNames", $"TimeData.data")).as("data"))

denorm.show(false)

Output:

输出:

+-------+-------------------------------------------------------------------------------+
|id     |data                                                                           |
+-------+-------------------------------------------------------------------------------+
|123456A|Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0)|
|123593X|Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)           |
|123593X|Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017)           |
|123456A|Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0)|
+-------+-------------------------------------------------------------------------------+
// now create column for each map value, based on predef / found columnNames
val columized = denorm.select(
  $"id",
  $"data.dTim".cast(TimestampType).alias("dTim"),
  $"data"
)

columized.show()

Output:

输出:

+-------+--------------------+--------------------+
|     id|                dTim|                data|
+-------+--------------------+--------------------+
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
+-------+--------------------+--------------------+
// create window over which to resample
val windowSpec = Window
  .partitionBy($"id")
  .orderBy($"dTim".desc)

val resampleRate = 2

// add batchId based on resample rate. Group by batch and
val batched = columized
  .withColumn("batchId", floor((row_number().over(windowSpec) - lit(1)) / lit(resampleRate)))
  .groupBy($"id", $"batchId")
  .agg(collect_list($"data").as("data"))
  .drop("batchId")

batched.show(false)

Output:

输出:

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id     |data                                                                                                                                                              |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123593X|[Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017), Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)]                      |
|123456A|[Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0), Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)]|
|123456A|[Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0), Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)]|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// Store as 1 huge json file (drop reapatrition if you can handle multiple json, better for master as well)
batched.repartition(1).write.mode(SaveMode.Overwrite).json("/tmp/xml")

Output json:

json输出:

{"id":"123593X","data":[{"dTim":"2011-03-26T12:20:43.897+01:00","colA":"1.138","colB":"29.017"},{"dTim":"2011-03-26T12:20:13.350+01:00","colA":"1.139","colB":"28.988"}]}
{"id":"123456A","data":[{"dTim":"2011-03-27T13:18:43.897+02:00","colA":"0.128","colB":"35.017","colC":"0"},{"dTim":"2011-03-27T13:18:13.350+02:00","colA":"0.129","colB":"35.988","colC":"0"}]}
{"id":"123456A","data":[{"dTim":"2011-03-24T12:18:43.897+01:00","colA":"0.138","colB":"39.017","colC":"0"},{"dTim":"2011-03-24T12:18:13.350+01:00","colA":"0.139","colB":"38.988","colC":"0"}]}

#2


1  

Here is another way that does not depend on hard-coded column names. Basically, the idea is to explode the data and ColNames columns to get a 'melted' DF, which we can then pivot to get the form you want:

这是另一种不依赖硬编码的列名的方法。基本上,我们的想法是将数据和ColNames列拆开,得到一个“融化”的DF,然后我们可以将其转向,得到您想要的形式:

# define function that processes elements of rdd
# underlying the DF to get a melted RDD
def process(row, cols):
    """cols is list of target columns to explode"""
    row=row.asDict()
    exploded=[[row['id']]+list(elt) for elt in zip(*[row[col] for col in cols])]    
    return(exploded)


#Now split ColNames:
df=df.withColumn('col_split', f.split('ColNames',","))

# define target cols to explode, each element of each col 
# can be of different length
cols=['data', 'col_split']

# apply function and flatmap the results to get melted RDD/DF
df=df.select(['id']+cols).rdd\
    .flatMap(lambda row: process(row, cols))\
    .toDF(schema=['id', 'value', 'name'])

# Pivot to get the required form
df.groupby('id').pivot('name').agg(f.max('value')).show()