logo

PySpark SQL

Apache Spark este cel mai de succes software al Apache Software Foundation și este conceput pentru calcul rapid. Mai multe industrii folosesc Apache Spark pentru a-și găsi soluțiile. PySpark SQL este un modul din Spark care integrează procesarea relațională cu API-ul de programare funcțional al lui Spark. Putem extrage datele folosind un limbaj de interogare SQL. Putem folosi interogările la fel ca limbajul SQL.

Dacă aveți o înțelegere de bază a RDBMS, PySpark SQL va fi ușor de utilizat, unde puteți extinde limitarea procesării tradiționale a datelor relaționale. Spark acceptă și limbajul de interogare Hive, dar există limitări ale bazei de date Hive. Spark SQL a fost dezvoltat pentru a elimina dezavantajele bazei de date Hive. Să aruncăm o privire la următoarele dezavantaje ale Hive:

Dezavantajele Stupului

  • Nu poate relua procesarea, ceea ce înseamnă că dacă execuția eșuează în mijlocul unui flux de lucru, nu puteți relua de unde s-a blocat.
  • Nu putem arunca bazele de date criptate în cascadă când coșul de gunoi este activat. Aceasta duce la o eroare de execuție. Pentru a elimina un astfel de tip de bază de date, utilizatorii trebuie să folosească opțiunea Purge.
  • Interogările ad-hoc sunt executate folosind MapReduce, care este lansat de Hive, dar când analizăm baza de date de dimensiune medie, aceasta întârzie performanța.
  • Hive nu acceptă operația de actualizare sau ștergere.
  • Se limitează la suportul de subinterogare.

Aceste dezavantaje sunt motivele dezvoltării Apache SQL.

PySpark SQL Scurtă introducere

PySpark acceptă procesarea relațională integrată cu programarea funcțională a lui Spark. Oferă suport pentru diferitele surse de date pentru a face posibilă împletirea interogărilor SQL cu transformări de cod, rezultând astfel un instrument foarte puternic.

PySpark SQL stabilește conexiunea dintre RDD și tabelul relațional. Oferă o integrare mult mai strânsă între procesarea relațională și cea procedurală prin intermediul API-ului Dataframe declarativ, care este integrat cu codul Spark.

Folosind SQL, acesta poate fi ușor accesibil pentru mai mulți utilizatori și poate îmbunătăți optimizarea pentru cei actuali. De asemenea, acceptă o gamă largă de surse de date și algoritmi în Big-data.

algoritmi de sortare merge sort

Caracteristica PySpark SQL

Caracteristicile PySpark SQL sunt prezentate mai jos:

1) Accesul la date consistente

Oferă acces consecvent la date înseamnă că SQL acceptă o modalitate partajată de a accesa o varietate de surse de date, cum ar fi Hive, Avro, Parquet, JSON și JDBC. Joacă un rol semnificativ în adaptarea tuturor utilizatorilor existenți în Spark SQL.

2) Încorporarea cu Spark

Interogările PySpark SQL sunt integrate cu programele Spark. Putem folosi interogările din cadrul programelor Spark.

Unul dintre cele mai multe avantaje ale sale este că dezvoltatorii nu trebuie să gestioneze manual eșecul de stare sau să mențină aplicația în sincronizare cu joburile batch.

3) Conectivitate standard

Oferă o conexiune prin JDBC sau ODBC, iar acestea două sunt standardele din industrie pentru conectivitate pentru instrumentele de business intelligence.

4) Funcții definite de utilizator

PySpark SQL are un limbaj combinat cu funcții definite de utilizator (UDF). UDF este folosit pentru a defini o nouă funcție bazată pe coloane care extinde vocabularul DSL-ului Spark SQL pentru transformarea DataFrame.

5) Compatibilitatea stupului

PySpark SQL rulează interogări Hive nemodificate pe datele curente. Permite compatibilitatea deplină cu datele actuale Hive.

Modulul SQL PySpark

Unele clase importante de Spark SQL și DataFrames sunt următoarele:

    pyspark.sql.SparkSession:Reprezintă principalul punct de intrare pentru DataFrame și funcționalitatea SQL.pyspark.sql.DataFrame:Reprezintă o colecție distribuită de date grupate în coloane numite.pyspark.sql.Column:Reprezintă o expresie de coloană în a DataFrame. pyspark.sql.Row:Reprezintă un rând de date în a DataFrame. pyspark.sql.GroupedData:Metode de agregare, returnate de DataFrame.groupBy(). pyspark.sql.DataFrameNaFunctions:Reprezintă metode de tratare a datelor lipsă (valori nule).pyspark.sql.DataFrameStatFunctions:Reprezintă metode pentru funcționalitatea statistică.pysark.sql.functions:Reprezintă o listă de funcții încorporate disponibile pentru DataFrame. pyspark.sql.types:Acesta reprezintă o listă de tipuri de date disponibile.pyspark.sql.Window:Este folosit pentru a lucra cu funcții Window.

Luați în considerare următorul exemplu de PySpark SQL.

 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

Ieșire:

 +-----+ |hello| +-----+ |spark| +-----+ 

Explicația codului:

În codul de mai sus, am importat findspark modul și apelat findspark.init() constructor; apoi, am importat modulul SparkSession pentru a crea sesiunea spark.

din pyspark.sql import SparkSession

O sesiune spark poate fi utilizată pentru a crea setul de date și API-ul DataFrame. Un SparkSession poate fi, de asemenea, utilizat pentru a crea DataFrame, înregistra DataFrame ca tabel, executa SQL peste tabele, cache tabel și citi fișierul parchet.

constructor de clasă

Este un constructor al Spark Session.

getOrCreate()

Este folosit pentru a obține un existent SparkSession, sau dacă nu există unul existent, creați unul nou pe baza opțiunilor setate în builder.

Alte câteva metode

Câteva metode de PySpark SQL sunt următoarele:

1. appName(nume)

Este folosit pentru a seta numele aplicației, care va fi afișat în interfața web Spark. Parametrul Nume acceptă numele parametrului.

2. config(key=None, value = None, conf = None)

Este folosit pentru a seta o opțiune de configurare. Opțiunile setate folosind această metodă sunt propagate automat la ambele SparkConf și SparkSession configurația lui.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

Parametri:

    cheie-Un șir de nume de cheie al unei proprietăți de configurare.valoare-Reprezintă valoarea unei proprietăți de configurare.conf -O instanță a SparkConf.

3. maestru (master)

Setează URL-ul principal de spark la care să se conecteze, cum ar fi „local” să ruleze local, „local[4]” să ruleze local cu 4 nuclee.

Parametri:

    maestru:o adresă URL pentru spark master.

4. SparkSession.catalog

Este o interfață pe care utilizatorul o poate crea, elimina, modifica sau interoga baza de date, tabelele, funcțiile etc.

5. SparkSession.conf

Este o interfață de configurare runtime pentru spark. Aceasta este interfața prin care utilizatorul poate obține și seta toate configurațiile Spark și Hadoop care sunt relevante pentru Spark SQL.

clasa pyspark.sql.DataFrame

Este o colecție distribuită de date grupate în coloane cu nume. Un DataFrame este similar cu tabelul relațional din Spark SQL, poate fi creat folosind diferite funcții din SQLContext.

 student = sqlContext.read.csv('...') 

După crearea cadrului de date, îl putem manipula folosind mai multe limbi specifice domeniului (DSL), care sunt funcții predefinite ale DataFrame. Luați în considerare următorul exemplu.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

Să luăm în considerare următorul exemplu:

șir la char

Interogare folosind Spark SQL

În următorul cod, mai întâi, creăm un DataFrame și executăm interogările SQL pentru a prelua datele. Luați în considerare următorul cod:

 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

Ieșire:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

Folosind funcția groupBy().

Funcția groupBy() colectează date similare din categoria.

 songdf.groupBy('Genre').count().show() 

Ieșire:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

distribuție(numpartiții, *cols)

The distributie() returnează un nou DataFrame care este o expresie de partiționare. Această funcție acceptă doi parametri numpartiții și *col. The numpartiții parametrul specifică numărul țintă de coloane.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

Ieșire:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows