PySpark Pandas_Udf()

Pyspark Pandas Udf



Die Transformation des PySpark DataFrame ist mit der Funktion pandas_udf() möglich. Es handelt sich um eine benutzerdefinierte Funktion, die mit einem Pfeil auf den PySpark DataFrame angewendet wird. Wir können die vektorisierten Operationen mit pandas_udf() ausführen. Es kann implementiert werden, indem diese Funktion als Dekorator übergeben wird. Lassen Sie uns in dieses Handbuch eintauchen, um die Syntax, Parameter und verschiedene Beispiele kennenzulernen.

Inhaltsthema:

Wenn Sie mehr über den PySpark DataFrame und die Modulinstallation erfahren möchten, lesen Sie hier Artikel .







Pyspark.sql.functions.pandas_udf()

pandas_udf() ist im sql.functions-Modul in PySpark verfügbar und kann mit dem Schlüsselwort „from“ importiert werden. Es wird verwendet, um die vektorisierten Operationen auf unserem PySpark DataFrame auszuführen. Diese Funktion wird wie ein Dekorator durch die Übergabe von drei Parametern implementiert. Danach können wir eine benutzerdefinierte Funktion erstellen, die die Daten im Vektorformat (wie wir dafür series/NumPy verwenden) mithilfe eines Pfeils zurückgibt. Innerhalb dieser Funktion können wir das Ergebnis zurückgeben.



Struktur und Syntax:



Schauen wir uns zunächst die Struktur und Syntax dieser Funktion an:

@pandas_udf(Datentyp)
def Funktionsname(Operation) -> Convert_Format:
return-Anweisung

Hier ist der Funktionsname der Name unserer definierten Funktion. Der Datentyp gibt den Datentyp an, der von dieser Funktion zurückgegeben wird. Wir können das Ergebnis mit dem Schlüsselwort „return“ zurückgeben. Alle Operationen werden innerhalb der Funktion mit der Pfeilzuweisung ausgeführt.





Pandas_udf (Funktion und ReturnType)

  1. Der erste Parameter ist die benutzerdefinierte Funktion, die ihm übergeben wird.
  2. Der zweite Parameter wird verwendet, um den Rückgabedatentyp der Funktion anzugeben.

Daten:

In diesem gesamten Leitfaden verwenden wir zur Demonstration nur einen PySpark DataFrame. Alle von uns definierten benutzerdefinierten Funktionen werden auf diesen PySpark DataFrame angewendet. Stellen Sie sicher, dass Sie diesen DataFrame zuerst nach der Installation von PySpark in Ihrer Umgebung erstellen.



Pyspark importieren

aus pyspark.sql SparkSession importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

aus pyspark.sql.functions import pandas_udf

aus pyspark.sql.types import *

Pandas als Panda importieren

# Gemüsedetails

Gemüse =[{ 'Typ' : 'Gemüse' , 'Name' : 'Tomate' , 'locate_country' : 'USA' , 'Menge' : 800 },

{ 'Typ' : 'Frucht' , 'Name' : 'Banane' , 'locate_country' : 'CHINA' , 'Menge' : zwanzig },

{ 'Typ' : 'Gemüse' , 'Name' : 'Tomate' , 'locate_country' : 'USA' , 'Menge' : 800 },

{ 'Typ' : 'Gemüse' , 'Name' : 'Mango' , 'locate_country' : 'JAPAN' , 'Menge' : 0 },

{ 'Typ' : 'Frucht' , 'Name' : 'Zitrone' , 'locate_country' : 'INDIEN' , 'Menge' : 1700 },

{ 'Typ' : 'Gemüse' , 'Name' : 'Tomate' , 'locate_country' : 'USA' , 'Menge' : 1200 },

{ 'Typ' : 'Gemüse' , 'Name' : 'Mango' , 'locate_country' : 'JAPAN' , 'Menge' : 0 },

{ 'Typ' : 'Frucht' , 'Name' : 'Zitrone' , 'locate_country' : 'INDIEN' , 'Menge' : 0 }

]

# Erstellen Sie den Marktdatenrahmen aus den oben genannten Daten

market_df = linuxhint_spark_app.createDataFrame(vegetable)

market_df.show()

Ausgang:

Hier erstellen wir diesen DataFrame mit 4 Spalten und 8 Zeilen. Jetzt verwenden wir pandas_udf(), um die benutzerdefinierten Funktionen zu erstellen und sie auf diese Spalten anzuwenden.

Pandas_udf() mit verschiedenen Datentypen

In diesem Szenario erstellen wir einige benutzerdefinierte Funktionen mit pandas_udf(), wenden sie auf Spalten an und zeigen die Ergebnisse mit der Methode select() an. In jedem Fall verwenden wir pandas.Series, wenn wir die vektorisierten Operationen ausführen. Dabei werden die Spaltenwerte als eindimensionales Array betrachtet und die Operation auf die Spalte angewendet. Im Dekorator selbst geben wir den Rückgabetyp der Funktion an.

Beispiel 1: Pandas_udf() mit String-Typ

Hier erstellen wir zwei benutzerdefinierte Funktionen mit dem Rückgabetyp „String“, um die Spaltenwerte vom Typ „String“ in Groß- und Kleinbuchstaben umzuwandeln. Abschließend wenden wir diese Funktionen auf die Spalten „type“ und „locate_country“ an.

# Typspalte mit pandas_udf in Großbuchstaben umwandeln

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Konvertieren Sie die Spalte „locate_country“ mit pandas_udf in Kleinbuchstaben

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Spalten mit select() anzeigen

market_df.select( 'Typ' ,type_upper_case( 'Typ' ), „locate_country“ ,
country_lower_case( „locate_country“ )).zeigen()

Ausgang:

Erläuterung:

Die Funktion StringType() ist im Modul pyspark.sql.types verfügbar. Wir haben dieses Modul bereits beim Erstellen des PySpark DataFrame importiert.

  1. Zunächst gibt UDF (benutzerdefinierte Funktion) mithilfe der Funktion str.upper() die Zeichenfolgen in Großbuchstaben zurück. Die Funktion str.upper() ist in der Datenstruktur der Serie verfügbar (da wir mit einem Pfeil innerhalb der Funktion in eine Serie konvertieren), die die angegebene Zeichenfolge in Großbuchstaben umwandelt. Schließlich wird diese Funktion auf die Spalte „Typ“ angewendet, die in der Methode select() angegeben ist. Bisher waren alle Zeichenfolgen in der Typspalte in Kleinbuchstaben geschrieben. Jetzt werden sie in Großbuchstaben geändert.
  2. Zweitens gibt UDF mithilfe der Funktion str.lower() die Zeichenfolgen in Großbuchstaben zurück. Die Funktion str.lower() ist in der Datenstruktur der Serie verfügbar und konvertiert die angegebene Zeichenfolge in Kleinbuchstaben. Schließlich wird diese Funktion auf die Spalte „Typ“ angewendet, die in der Methode select() angegeben ist. Bisher waren alle Zeichenfolgen in der Typspalte in Großbuchstaben geschrieben. Jetzt werden sie in Kleinbuchstaben geändert.

Beispiel 2: Pandas_udf() mit Integer-Typ

Erstellen wir eine UDF, die die ganzzahlige Spalte von PySpark DataFrame in die Pandas-Reihe konvertiert und zu jedem Wert 100 hinzufügt. Übergeben Sie die Spalte „Menge“ an diese Funktion innerhalb der Methode select().

# Addiere 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

gib i+ zurück 100

# Übergeben Sie die Mengenspalte an die obige Funktion und zeigen Sie sie an.

market_df.select( 'Menge' ,add_100( 'Menge' )).zeigen()

Ausgang:

Erläuterung:

Innerhalb der UDF iterieren wir alle Werte und konvertieren sie in Serien. Danach addieren wir 100 zu jedem Wert in der Reihe. Schließlich übergeben wir die Spalte „Menge“ an diese Funktion und sehen, dass zu allen Werten 100 addiert wird.

Pandas_udf() mit verschiedenen Datentypen unter Verwendung von Groupby() und Agg()

Schauen wir uns die Beispiele an, um die UDF an die aggregierten Spalten zu übergeben. Hier werden die Spaltenwerte zunächst mit der Funktion „groupby()“ gruppiert und die Aggregation erfolgt mit der Funktion „agg()“. Wir übergeben unsere UDF innerhalb dieser Aggregatfunktion.

Syntax:

pyspark_dataframe_object.groupby( „grouping_column“ ).agg(UDF
(pyspark_dataframe_object[ 'Spalte' ]))

Hier werden zuerst die Werte in der Gruppierungsspalte gruppiert. Anschließend erfolgt die Aggregation aller gruppierten Daten in Bezug auf unsere UDF.

Beispiel 1: Pandas_udf() mit Aggregate Mean()

Hier erstellen wir eine benutzerdefinierte Funktion mit einem Rückgabetyp float. Innerhalb der Funktion berechnen wir den Durchschnitt mithilfe der Funktion „mean()“. Diese UDF wird an die Spalte „Menge“ übergeben, um die durchschnittliche Menge für jeden Typ zu erhalten.

# gibt den Mittelwert/Durchschnitt zurück

@pandas_udf( 'schweben' )

def Average_function(i: panda.Series) -> float:

return i.mean()

# Übergeben Sie die Mengenspalte an die Funktion, indem Sie die Typspalte gruppieren.

market_df.groupby( 'Typ' ).agg(average_function(market_df[ 'Menge' ])).zeigen()

Ausgang:

Wir gruppieren basierend auf Elementen in der Spalte „Typ“. Es werden zwei Gruppen gebildet – „Obst“ und „Gemüse“. Für jede Gruppe wird der Mittelwert berechnet und zurückgegeben.

Beispiel 2: Pandas_udf() mit Aggregate Max() und Min()

Hier erstellen wir zwei benutzerdefinierte Funktionen mit dem Rückgabetyp Integer (int). Die erste UDF gibt den Minimalwert zurück und die zweite UDF gibt den Maximalwert zurück.

# pandas_udf, die den Mindestwert zurückgeben

@pandas_udf( „int“ )

def min_(i: panda.Series) -> int:

return i.min()

# pandas_udf, die den Maximalwert zurückgeben

@pandas_udf( „int“ )

def max_(i: panda.Series) -> int:

Rückkehr i.max()

# Übergeben Sie die Mengenspalte an min_pandas_udf, indem Sie „locate_country“ gruppieren.

market_df.groupby( „locate_country“ ).agg(min_(market_df[ 'Menge' ])).zeigen()

# Übergeben Sie die Mengenspalte an max_ pandas_udf, indem Sie „locate_country“ gruppieren.

market_df.groupby( „locate_country“ ).agg(max_(market_df[ 'Menge' ])).zeigen()

Ausgang:

Um Minimal- und Maximalwerte zurückzugeben, verwenden wir die Funktionen min() und max() im Rückgabetyp von UDFs. Jetzt gruppieren wir die Daten in der Spalte „locate_country“. Es werden vier Gruppen gebildet („CHINA“, „INDIEN“, „JAPAN“, „USA“). Für jede Gruppe geben wir die maximale Menge zurück. Ebenso geben wir die Mindestmenge zurück.

Abschluss

Grundsätzlich wird pandas_udf() verwendet, um die vektorisierten Operationen auf unserem PySpark DataFrame auszuführen. Wir haben gesehen, wie man pandas_udf() erstellt und auf den PySpark DataFrame anwendet. Zum besseren Verständnis haben wir die verschiedenen Beispiele unter Berücksichtigung aller Datentypen (String, Float und Integer) besprochen. Es kann möglich sein, pandas_udf() mit groupby() über die Funktion agg() zu verwenden.