如何从函数值向Spark DataFrame添加新列(使用PySpark)

时间:2022-11-01 22:59:39

I have a dataframe from sql:

我有一个来自sql的数据框:

log = hc.sql("""select 
                     , ip
                     , url
                     , ymd
                  from log """)

and function which apply "ip" value from dataframe and return three value:

和从数据帧应用“ip”值并返回三个值的函数:

def get_loc(ip):
geodata = GeoLocator('SxGeoCity.dat', MODE_BATCH | MODE_MEMORY)
result = []

location = geodata.get_location(ip, detailed=True)
city_name_en    = str(processValue(location['info']['city']['name_en']))
region_name_en  = str(processValue(location['info']['region']['name_en']))
country_name_en = str(processValue(location['info']['country']['name_en']))

result = [city_name_en, region_name_en, country_name_en]

return result

I don't know how to pass value to function get_loc() and add returned value as a map column "property" to existing dataframe. Use python 2.7 and PySpark.

我不知道如何将值传递给函数get_loc()并将返回值作为映射列“property”添加到现有数据帧。使用python 2.7和PySpark。

1 个解决方案

#1


0  

I do not know what does get_loc do.

我不知道get_loc做了什么。

But you can use UDF as below:

但您可以使用UDF如下:

from pyspark.sql import functions as f

def get_loc(ip):
    return str(ip).split('.')

rdd = spark.sparkContext.parallelize([(1, '192.168.0.1'), (2, '192.168.0.1')])
df = spark.createDataFrame(rdd, schema=['idx', 'ip'])
My_UDF = f.UserDefinedFunction(get_loc, returnType=ArrayType(StringType()))
df = df.withColumn('loc', My_UDF(df['ip']))
df.show()

# output:
+---+-----------+----------------+
|idx|         ip|             loc|
+---+-----------+----------------+
|  1|192.168.0.1|[192, 168, 0, 1]|
|  2|192.168.0.1|[192, 168, 0, 1]|
+---+-----------+----------------+

#1


0  

I do not know what does get_loc do.

我不知道get_loc做了什么。

But you can use UDF as below:

但您可以使用UDF如下:

from pyspark.sql import functions as f

def get_loc(ip):
    return str(ip).split('.')

rdd = spark.sparkContext.parallelize([(1, '192.168.0.1'), (2, '192.168.0.1')])
df = spark.createDataFrame(rdd, schema=['idx', 'ip'])
My_UDF = f.UserDefinedFunction(get_loc, returnType=ArrayType(StringType()))
df = df.withColumn('loc', My_UDF(df['ip']))
df.show()

# output:
+---+-----------+----------------+
|idx|         ip|             loc|
+---+-----------+----------------+
|  1|192.168.0.1|[192, 168, 0, 1]|
|  2|192.168.0.1|[192, 168, 0, 1]|
+---+-----------+----------------+