PySpark Read.Parquet()

Pyspark Read Parquet



In PySpark schreibt die Funktion write.parquet() den DataFrame in die Parquet-Datei und read.parquet() liest die Parquet-Datei in den PySpark DataFrame oder eine andere Datenquelle. Um die Spalten in Apache Spark schnell und effizient zu verarbeiten, müssen wir die Daten komprimieren. Die Datenkomprimierung schont unseren Speicher und alle Spalten werden in eine flache Ebene umgewandelt. Das bedeutet, dass die flache Lagerung auf Säulenebene vorhanden ist. Die Datei, in der diese gespeichert werden, wird als PARQUET-Datei bezeichnet.

In diesem Handbuch konzentrieren wir uns hauptsächlich auf das Lesen/Laden der Parquet-Datei in den PySpark DataFrame/SQL mithilfe der Funktion read.parquet(), die in der Klasse pyspark.sql.DataFrameReader verfügbar ist.

Inhaltsthema:







Holen Sie sich die Parkettdatei



Lesen Sie die Parquet-Datei in den PySpark DataFrame



Lesen Sie die Parquet-Datei in PySpark SQL ein





Pyspark.sql.DataFrameReader.parquet()

Mit dieser Funktion wird die Parquet-Datei gelesen und in den PySpark DataFrame geladen. Es übernimmt den Pfad/Dateinamen der Parquet-Datei. Wir können einfach die Funktion read.parquet() verwenden, da dies die generische Funktion ist.

Syntax:



Sehen wir uns die Syntax von read.parquet() an:

spark_app.read.parquet(Dateiname.parquet/Pfad)

Installieren Sie zunächst das PySpark-Modul mit dem Befehl pip:

Pip Pyspark installieren

Holen Sie sich die Parkettdatei

Um eine Parkettdatei zu lesen, benötigen Sie die Daten, in denen die Parkettdatei aus diesen Daten generiert wird. In diesem Teil erfahren Sie, wie Sie aus dem PySpark DataFrame eine Parquet-Datei generieren.

Lassen Sie uns einen PySpark DataFrame mit 5 Datensätzen erstellen und diesen in die Parkettdatei „industry_parquet“ schreiben.

Pyspark importieren

aus pyspark.sql SparkSession,Row importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

# Erstellen Sie den Datenrahmen, der Branchendetails speichert

Industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Landwirtschaft' ,Bereich= 'USA' ,
Bewertung= 'Heiß' ,Total_employees= 100 ),

Zeile(Typ= 'Landwirtschaft' ,Bereich= 'Indien' ,Bewertung= 'Heiß' ,Total_employees= 200 ),

Zeile(Typ= 'Entwicklung' ,Bereich= 'USA' ,Bewertung= 'Warm' ,Total_employees= 100 ),

Zeile(Typ= 'Ausbildung' ,Bereich= 'USA' ,Bewertung= 'Cool' ,Total_employees= 400 ),

Zeile(Typ= 'Ausbildung' ,Bereich= 'USA' ,Bewertung= 'Warm' ,Total_employees= zwanzig )

])

# Tatsächlicher Datenrahmen

industrial_df.show()

# Schreiben Sie den Industry_df in die Parquet-Datei

industrial_df.coalesce( 1 ).write.parquet( „industry_parquet“ )

Ausgang:

Dies ist der DataFrame, der 5 Datensätze enthält.

Für den vorherigen DataFrame wird eine Parquet-Datei erstellt. Hier lautet unser Dateiname mit Erweiterung „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet“. Wir verwenden diese Datei im gesamten Tutorial.

Lesen Sie die Parquet-Datei in den PySpark DataFrame

Wir haben die Parkettdatei. Lesen wir diese Datei mit der Funktion read.parquet() und laden sie in den PySpark DataFrame.

Pyspark importieren

aus pyspark.sql SparkSession,Row importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

# Lesen Sie die Parkettdatei in das dataframe_from_parquet-Objekt ein.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet“ )

# Zeigt den dataframe_from_parquet-DataFrame an

dataframe_from_parquet.show()

Ausgang:

Wir zeigen den DataFrame mit der Methode show() an, die aus der Parquet-Datei erstellt wurde.

SQL-Abfragen mit Parquet-Datei

Nach dem Laden in den DataFrame kann es möglich sein, die SQL-Tabellen zu erstellen und die im DataFrame vorhandenen Daten anzuzeigen. Wir müssen eine TEMPORÄRE ANSICHT erstellen und die SQL-Befehle verwenden, um die Datensätze aus dem DataFrame zurückzugeben, der aus der Parquet-Datei erstellt wird.

Beispiel 1:

Erstellen Sie eine temporäre Ansicht mit dem Namen „Sektoren“ und verwenden Sie den SELECT-Befehl, um die Datensätze im DataFrame anzuzeigen. Darauf können Sie sich beziehen Lernprogramm Das erklärt, wie man eine VIEW in Spark – SQL erstellt.

Pyspark importieren

aus pyspark.sql SparkSession,Row importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

# Lesen Sie die Parkettdatei in das dataframe_from_parquet-Objekt ein.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet“ )

# Erstellen Sie eine Ansicht aus der oben genannten Parkettdatei mit dem Namen „Sektoren“.

dataframe_from_parquet.createOrReplaceTempView( „Sektoren“ )

# Abfrage zur Anzeige aller Datensätze aus den Sektoren

linuxhint_spark_app.sql( „wählen Sie * aus Sektoren“ ).zeigen()

Ausgang:

Beispiel 2:

Schreiben Sie unter Verwendung der vorherigen VIEW die SQL-Abfrage:

  1. Zur Anzeige aller Datensätze aus den Sektoren, die zu „Indien“ gehören.
  2. Um alle Datensätze aus den Sektoren mit einem Mitarbeiter von mehr als 100 anzuzeigen.
# Abfrage zur Anzeige aller Datensätze aus den Sektoren, die zu „Indien“ gehören.

linuxhint_spark_app.sql( „wählen Sie * aus Sektoren aus, in denen Area='Indien‘ ist“ ).zeigen()

# Abfrage zum Anzeigen aller Datensätze aus den Sektoren mit Mitarbeitern über 100

linuxhint_spark_app.sql( „wählen Sie * aus Sektoren aus, in denen Total_employees>100“ ).zeigen()

Ausgang:

Es gibt nur einen Datensatz mit dem Gebiet „Indien“ und zwei Datensätze mit Mitarbeitern, die größer als 100 sind.

Lesen Sie die Parquet-Datei in PySpark SQL ein

Zuerst müssen wir mit dem CREATE-Befehl eine VIEW erstellen. Mithilfe des Schlüsselworts „path“ innerhalb der SQL-Abfrage können wir die Parquet-Datei in Spark SQL einlesen. Nach dem Pfad müssen wir den Dateinamen/Speicherort der Datei angeben.

Syntax:

spark_app.sql( „TEMPORÄRE ANSICHT ERSTELLEN view_name MIT Parkett-OPTIONEN (Pfad „) Dateiname.parquet ')' )

Beispiel 1:

Erstellen Sie eine temporäre Ansicht mit dem Namen „Sector2“ und lesen Sie die Parkettdatei hinein. Schreiben Sie mit der Funktion sql() die Auswahlabfrage, um alle in der Ansicht vorhandenen Datensätze anzuzeigen.

Pyspark importieren

aus pyspark.sql SparkSession,Row importieren

linuxhint_spark_app = SparkSession.builder.appName( „Linux-Hinweis“ ).getOrCreate()

# Lesen Sie die Parquet-Datei in Spark-SQL ein

linuxhint_spark_app.sql( „ERSTELLEN SIE EINE VORÜBERGEHENDE ANSICHT Sektor2 MIT PARKET-OPTIONEN (Pfad „) Teil-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Abfrage zur Anzeige aller Datensätze aus Sektor2

linuxhint_spark_app.sql( „Wähle * aus Sektor2 aus“ ).zeigen()

Ausgang:

Beispiel 2:

Verwenden Sie die vorherige VIEW und schreiben Sie die Abfrage, um alle Datensätze mit der Bewertung „Hot“ oder „Cool“ anzuzeigen.

# Abfrage zur Anzeige aller Datensätze aus Sektor2 mit der Bewertung „Hot“ oder „Cool“.

linuxhint_spark_app.sql( „wählen Sie * aus Sektor2 aus, wobei Rating=‘Hot‘ ODER Rating=‘Cool‘ ist“ ).zeigen()

Ausgang:

Es gibt drei Datensätze mit der Bewertung „Hot“ oder „Cool“.

Abschluss

In PySpark schreibt die Funktion write.parquet() den DataFrame in die Parquet-Datei. Die Funktion read.parquet() liest die Parquet-Datei in den PySpark DataFrame oder eine andere DataSource. Wir haben gelernt, wie man die Parquet-Datei in den PySpark DataFrame und in die PySpark-Tabelle einliest. Im Rahmen dieses Tutorials haben wir auch besprochen, wie man die Tabellen aus dem PySpark DataFrame erstellt und die Daten mithilfe der WHERE-Klausel filtert.