Konvertieren von PySpark DataFrame in CSV

Konvertieren Von Pyspark Dataframe In Csv



Schauen wir uns die vier verschiedenen Szenarien der Konvertierung des PySpark DataFrame in CSV an. Direkt verwenden wir die Methode write.csv(), um den PySpark DataFrame in CSV zu konvertieren. Mit der Funktion to_csv() konvertieren wir den PySpark Pandas DataFrame in CSV. Dies kann auch durch Konvertieren in das NumPy-Array möglich sein.

Inhaltsthema:

Wenn Sie mehr über PySpark DataFrame und die Modulinstallation erfahren möchten, lesen Sie hier Artikel .







PySpark DataFrame in CSV durch Konvertierung in Pandas DataFrame

to_csv() ist eine im Pandas-Modul verfügbare Methode, die den Pandas-DataFrame in CSV konvertiert. Zuerst müssen wir unseren PySpark DataFrame in Pandas DataFrame konvertieren. Dazu wird die Methode toPandas() verwendet. Sehen wir uns die Syntax von to_csv() zusammen mit seinen Parametern an.



Syntax:



pandas_dataframe_obj.to_csv(path/ 'Dateiname.csv' , Header ,Index,Spalten,Modus...)
  1. Wir müssen den Dateinamen der CSV-Datei angeben. Wenn Sie die heruntergeladene CSV-Datei an einem bestimmten Ort auf Ihrem PC speichern möchten, können Sie neben dem Dateinamen auch den Pfad angeben.
  2. Spalten werden einbezogen, wenn der Header auf „True“ gesetzt ist. Wenn Sie keine Spalten benötigen, setzen Sie die Überschrift auf „False“.
  3. Indizes werden angegeben, wenn der Index auf „True“ gesetzt ist. Wenn Sie keine Indizes benötigen, setzen Sie den Index auf „False“.
  4. Der Parameter „Columns“ verwendet eine Liste von Spaltennamen, in der wir angeben können, welche bestimmten Spalten in die CSV-Datei extrahiert werden.
  5. Mithilfe des Modusparameters können wir die Datensätze zur CSV-Datei hinzufügen. Anhängen – „a“ wird dazu verwendet.

Beispiel 1: Mit den Header- und Indexparametern

Erstellen Sie den PySpark DataFrame „skills_df“ mit 3 Zeilen und 4 Spalten. Konvertieren Sie diesen DataFrame in CSV, indem Sie ihn zunächst in den Pandas DataFrame konvertieren.





Pyspark importieren

aus pyspark.sql SparkSession importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

# Kompetenzdaten mit 3 Zeilen und 4 Spalten

Fähigkeiten =[{ 'Ausweis' : 123 , 'Person' : 'Honig' , 'Fähigkeit' : 'malen' , 'Preis' : 25000 },

{ 'Ausweis' : 112 , 'Person' : 'Mouni' , 'Fähigkeit' : 'tanzen' , 'Preis' : 2000 },

{ 'Ausweis' : 153 , 'Person' : 'Tulasi' , 'Fähigkeit' : 'Lektüre' , 'Preis' : 1200 }

]

# Erstellen Sie den Skills-Datenrahmen aus den oben genannten Daten

skills_df = linuxhint_spark_app.createDataFrame(skills)

skills_df.show()

# Skills_df in Pandas DataFrame konvertieren

pandas_skills_df= skills_df.toPandas()

print(pandas_skills_df)

# Konvertieren Sie diesen DataFrame in CSV mit Header und Index

pandas_skills_df.to_csv( 'pandas_skills1.csv' , Header =Wahr, Index=Wahr)

Ausgang:



Wir können sehen, dass der PySpark DataFrame in Pandas DataFrame konvertiert wird. Mal sehen, ob es mit Spaltennamen und Indizes in CSV konvertiert wird:

Beispiel 2: Anhängen der Daten an CSV

Erstellen Sie einen weiteren PySpark-DataFrame mit 1 Datensatz und hängen Sie diesen an die CSV-Datei an, die als Teil unseres ersten Beispiels erstellt wurde. Stellen Sie sicher, dass wir den Header zusammen mit dem Modusparameter auf „False“ setzen müssen. Ansonsten werden die Spaltennamen auch als Zeile angehängt.

Pyspark importieren

aus pyspark.sql SparkSession importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

Fähigkeiten =[{ 'Ausweis' : 90 , 'Person' : 'Bhargav' , 'Fähigkeit' : 'Lektüre' , 'Preis' : 12000 }

]

# Erstellen Sie den Skills-Datenrahmen aus den oben genannten Daten

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Skills_df in Pandas DataFrame konvertieren

pandas_skills_df= skills_df.toPandas()

# Fügen Sie diesen DataFrame zur Datei pandas_skills1.csv hinzu

pandas_skills_df.to_csv( 'pandas_skills1.csv' , Modus= 'A' , Header =Falsch)

CSV-Ausgabe:

Wir können sehen, dass der CSV-Datei eine neue Zeile hinzugefügt wird.

Beispiel 3: Mit dem Columns-Parameter

Lassen Sie uns denselben DataFrame haben und ihn in CSV mit zwei Spalten konvertieren: „Person“ und „Preis“.

Pyspark importieren

aus pyspark.sql SparkSession importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

# Kompetenzdaten mit 3 Zeilen und 4 Spalten

Fähigkeiten =[{ 'Ausweis' : 123 , 'Person' : 'Honig' , 'Fähigkeit' : 'malen' , 'Preis' : 25000 },

{ 'Ausweis' : 112 , 'Person' : 'Mouni' , 'Fähigkeit' : 'tanzen' , 'Preis' : 2000 },

{ 'Ausweis' : 153 , 'Person' : 'Tulasi' , 'Fähigkeit' : 'Lektüre' , 'Preis' : 1200 }

]

# Erstellen Sie den Skills-Datenrahmen aus den oben genannten Daten

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Skills_df in Pandas DataFrame konvertieren

pandas_skills_df= skills_df.toPandas()

# Konvertieren Sie diesen DataFrame mit bestimmten Spalten in CSV

pandas_skills_df.to_csv( 'pandas_skills2.csv' , Spalten=[ 'Person' , 'Preis' ])

CSV-Ausgabe:

Wir können sehen, dass in der CSV-Datei nur die Spalten „Person“ und „Preis“ vorhanden sind.

PySpark Pandas DataFrame zu CSV mit der To_Csv()-Methode

to_csv() ist eine im Pandas-Modul verfügbare Methode, die den Pandas-DataFrame in CSV konvertiert. Zuerst müssen wir unseren PySpark DataFrame in Pandas DataFrame konvertieren. Dazu wird die Methode toPandas() verwendet. Sehen wir uns die Syntax von to_csv() zusammen mit seinen Parametern an:

Syntax:

pyspark_pandas_dataframe_obj.to_csv(path/ 'Dateiname.csv' , Header ,Index,Spalten,...)
  1. Wir müssen den Dateinamen der CSV-Datei angeben. Wenn Sie die heruntergeladene CSV-Datei an einem bestimmten Ort auf Ihrem PC speichern möchten, können Sie neben dem Dateinamen auch den Pfad angeben.
  2. Spalten werden einbezogen, wenn der Header auf „True“ gesetzt ist. Wenn Sie keine Spalten benötigen, setzen Sie die Überschrift auf „False“.
  3. Indizes werden angegeben, wenn der Index auf „True“ gesetzt ist. Wenn Sie keine Indizes benötigen, setzen Sie den Index auf „False“.
  4. Der Parameter columns nimmt eine Liste von Spaltennamen entgegen, in der wir angeben können, welche bestimmten Spalten in die CSV-Datei extrahiert werden.

Beispiel 1: Mit dem Columns-Parameter

Erstellen Sie einen PySpark Pandas DataFrame mit 3 Spalten und konvertieren Sie ihn mit to_csv() mit den Spalten „Person“ und „Preis“ in CSV.

Von Pyspark Import Pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'Ausweis' :[ 90 , 78 , 90 , 57 ], 'Person' :[ 'Honig' , 'Mouni' , 'sich selbst' , 'radha' ], 'Preis' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Konvertieren Sie diesen DataFrame mit bestimmten Spalten in CSV

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , Spalten=[ 'Person' , 'Preis' ])

Ausgang:

Wir können sehen, dass der PySpark Pandas DataFrame mit zwei Partitionen in CSV konvertiert wird. Jede Partition enthält 2 Datensätze. Außerdem lauten die Spalten in der CSV nur „Person“ und „Preis“.

Partitionsdatei 1:

Partitionsdatei 2:

Beispiel 2: Mit dem Header-Parameter

Verwenden Sie den vorherigen DataFrame und geben Sie den Header-Parameter an, indem Sie ihn auf „True“ setzen.

Von Pyspark Import Pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'Ausweis' :[ 90 , 78 , 90 , 57 ], 'Person' :[ 'Honig' , 'Mouni' , 'sich selbst' , 'radha' ], 'Preis' :[ 1 , 2 , 3 , 4 ]})

# Konvertieren Sie diesen DataFrame in eine CSV-Datei mit Header.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , Header =Wahr)

CSV-Ausgabe:

Wir können sehen, dass der PySpark Pandas DataFrame mit zwei Partitionen in CSV konvertiert wird. Jede Partition enthält 2 Datensätze mit Spaltennamen.

Partitionsdatei 1:

Partitionsdatei 2:

PySpark Pandas DataFrame in CSV durch Konvertierung in ein NumPy-Array

Wir haben die Möglichkeit, den PySpark Pandas DataFrame durch Konvertierung in das Numpy-Array in CSV zu konvertieren. to_numpy() ist eine im PySpark Pandas-Modul verfügbare Methode, die den PySpark Pandas DataFrame in das NumPy-Array konvertiert.

Syntax:

pyspark_pandas_dataframe_obj.to_numpy()

Es werden keine Parameter benötigt.

Verwendung der Tofile()-Methode

Nach der Konvertierung in das NumPy-Array können wir NumPy mit der Methode tofile() in CSV konvertieren. Hier wird jeder Datensatz spaltenweise in einer neuen Zelle in einer CSV-Datei gespeichert.

Syntax:

array_obj.to_numpy(filename/path,sep=’ ’)

Es benötigt den Dateinamen oder Pfad einer CSV-Datei und ein Trennzeichen.

Beispiel:

Erstellen Sie einen PySpark Pandas DataFrame mit 3 Spalten und 4 Datensätzen und konvertieren Sie ihn in CSV, indem Sie ihn zunächst in ein NumPy-Array konvertieren.

Von Pyspark Import Pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'Ausweis' :[ 90 , 78 , 90 , 57 ], 'Person' :[ 'Honig' , 'Mouni' , 'sich selbst' , 'radha' ], 'Preis' :[ 1 , 2 , 3 , 4 ]})

# Konvertieren Sie den obigen DataFrame in ein Numpy-Array

konvertiert = pyspark_pandas_dataframe.to_numpy()

drucken (konvertiert)

# Verwendung von tofile()

konvertiert.tofile( 'converted1.csv' , sep = ',' )

Ausgang:

[[ 90 'Honig' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'sich selbst' 3 ]

[ 57 'radha' 4 ]]

Wir können sehen, dass der PySpark Pandas DataFrame in ein NumPy-Array (12 Werte) konvertiert wird. Wenn Sie die CSV-Daten sehen können, wird jeder Zellenwert in einer neuen Spalte gespeichert.

PySpark DataFrame zu CSV mit der Methode Write.Csv()

Die Methode write.csv() verwendet den Dateinamen/Pfad, in dem wir die CSV-Datei speichern müssen, als Parameter.

Syntax:

dataframe_object.coalesce( 1 ).write.csv( 'Dateinamen' )

Tatsächlich wird die CSV-Datei in Partitionen (mehr als eine) gespeichert. Um dies zu beseitigen, führen wir alle partitionierten CSV-Dateien zu einer zusammen. In diesem Szenario verwenden wir die Funktion „coalesce()“. Jetzt können wir nur eine CSV-Datei mit allen Zeilen aus dem PySpark DataFrame sehen.

Beispiel:

Betrachten Sie den PySpark DataFrame mit 4 Datensätzen und 4 Spalten. Schreiben Sie diesen DataFrame mit der Datei „market_details“ in eine CSV-Datei.

Pyspark importieren

aus pyspark.sql SparkSession importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

# Marktdaten mit 4 Zeilen und 4 Spalten

Markt =[{ 'm_id' : 'mz-001' , 'm_name' : 'ABC' , 'm_city' : 'Delhi' , 'm_state' : 'Delhi' },

{ 'm_id' : 'mz-002' , 'm_name' : 'XYZ' , 'm_city' : 'patna' , 'm_state' : 'lucknow' },

{ 'm_id' : 'mz-003' , 'm_name' : 'PQR' , 'm_city' : 'Florida' , 'm_state' : 'eins' },

{ 'm_id' : 'mz-004' , 'm_name' : 'ABC' , 'm_city' : 'Delhi' , 'm_state' : 'lucknow' }

]



# Erstellen Sie den Marktdatenrahmen aus den oben genannten Daten

market_df = linuxhint_spark_app.createDataFrame(market)

# Tatsächliche Marktdaten

market_df.show()

# write.csv()

market_df.coalesce( 1 ).write.csv( „market_details“ )

Ausgang:

Suchen wir nach der Datei:

Öffnen Sie die letzte Datei, um die Datensätze anzuzeigen.

Abschluss

Wir haben die vier verschiedenen Szenarien, die den PySpark DataFrame in CSV konvertieren, anhand von Beispielen unter Berücksichtigung verschiedener Parameter kennengelernt. Wenn Sie mit dem PySpark DataFrame arbeiten, haben Sie zwei Möglichkeiten, diesen DataFrame in CSV zu konvertieren: Eine Möglichkeit ist die Verwendung der write()-Methode und eine andere Möglichkeit die Verwendung der to_csv()-Methode durch Konvertierung in Pandas DataFrame. Wenn Sie mit PySpark Pandas DataFrame arbeiten, können Sie auch to_csv() und tofile() verwenden, indem Sie in ein NumPy-Array konvertieren.