Apache Spark er den mest suksessrike programvaren til Apache Software Foundation og designet for rask databehandling. Flere bransjer bruker Apache Spark for å finne sine løsninger. PySpark SQL er en modul i Spark som integrerer relasjonsbehandling med Sparks funksjonelle programmerings-API. Vi kan trekke ut dataene ved å bruke et SQL-spørringsspråk. Vi kan bruke spørringene på samme måte som SQL-språket.
Hvis du har en grunnleggende forståelse av RDBMS, vil PySpark SQL være enkel å bruke, hvor du kan utvide begrensningene til tradisjonell relasjonsdatabehandling. Spark støtter også Hive Query Language, men det er begrensninger for Hive-databasen. Spark SQL ble utviklet for å fjerne ulempene med Hive-databasen. La oss ta en titt på følgende ulemper med Hive:
Ulempene med Hive
- Den kan ikke gjenoppta behandlingen, noe som betyr at hvis utførelsen mislykkes midt i en arbeidsflyt, kan du ikke fortsette fra der den ble sittende fast.
- Vi kan ikke slippe de krypterte databasene i kaskade når papirkurven er aktivert. Det fører til utførelsesfeilen. For å droppe en slik type database, må brukerne bruke Purge-alternativet.
- Ad-hoc-spørringene utføres ved hjelp av MapReduce, som lanseres av Hive, men når vi analyserer den mellomstore databasen, forsinker det ytelsen.
- Hive støtter ikke oppdateringen eller slettingen.
- Det er begrenset til støtte for underspørringer.
Disse ulempene er grunnene til å utvikle Apache SQL.
PySpark SQL Kort introduksjon
PySpark støtter integrert relasjonsbehandling med Sparks funksjonelle programmering. Den gir støtte for de ulike datakildene for å gjøre det mulig å veve SQL-spørringer med kodetransformasjoner, og dermed resultere i et veldig kraftig verktøy.
PySpark SQL etablerer forbindelsen mellom RDD og relasjonstabellen. Det gir mye tettere integrasjon mellom relasjons- og prosedyrebehandling gjennom deklarativ Dataframe API, som er integrert med Spark-kode.
Ved å bruke SQL kan det være lett tilgjengelig for flere brukere og forbedre optimaliseringen for de nåværende. Den støtter også det store spekteret av datakilder og algoritmer i Big-data.
Funksjon av PySpark SQL
Funksjonene til PySpark SQL er gitt nedenfor:
1) Tilgang til konsistensdata
Det gir konsistent datatilgang betyr at SQL støtter en delt måte å få tilgang til en rekke datakilder som Hive, Avro, Parkett, JSON og JDBC. Det spiller en betydelig rolle i å imøtekomme alle eksisterende brukere i Spark SQL.
2) Inkorporering med Spark
PySpark SQL-spørringer er integrert med Spark-programmer. Vi kan bruke spørringene i Spark-programmene.
100 km/t til mph
En av de største fordelene er at utviklere ikke trenger å administrere tilstandsfeil manuelt eller holde applikasjonen synkronisert med batchjobber.
3) Standard tilkobling
Det gir en tilkobling gjennom JDBC eller ODBC, og disse to er industristandardene for tilkobling for business intelligence-verktøy.
4) Brukerdefinerte funksjoner
PySpark SQL har en språkkombinert brukerdefinert funksjon (UDF). UDF brukes til å definere en ny kolonnebasert funksjon som utvider vokabularet til Spark SQLs DSL for transformering av DataFrame.
5) Hive-kompatibilitet
PySpark SQL kjører umodifiserte Hive-spørringer på gjeldende data. Den tillater full kompatibilitet med gjeldende Hive-data.
PySpark SQL-modul
Noen viktige klasser av Spark SQL og DataFrames er følgende:
Tenk på følgende eksempel på PySpark SQL.
import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show()
Produksjon:
+-----+ |hello| +-----+ |spark| +-----+
Kodeforklaring:
I koden ovenfor har vi importert findspark modul og ringte findspark.init() konstruktør; Deretter importerte vi SparkSession-modulen for å lage gnistøkt.
fra pyspark.sql importer SparkSession
En gnistøkt kan brukes til å lage Dataset og DataFrame API. En SparkSession kan også brukes til å lage DataFrame, registrere DataFrame som en tabell, utføre SQL over tabeller, cachetabell og lese parkettfil.
klassebygger
Det er en bygger av Spark Session.
getOrCreate()
java samlinger
Den brukes til å få en eksisterende SparkSession, eller hvis det ikke er noen eksisterende, lag en ny basert på alternativene satt i byggherren.
Noen få andre metoder
Noen få metoder for PySpark SQL er følgende:
1. appnavn (navn)
Den brukes til å angi navnet på applikasjonen, som vises i Spark-nettgrensesnittet. Parameteren Navn godtar navnet på parameteren.
2. config(key=Ingen, verdi = Ingen, conf = Ingen)
Den brukes til å angi et konfigurasjonsalternativ. Alternativer satt ved hjelp av denne metoden overføres automatisk til begge SparkConf og SparkSession sin konfigurasjon.
from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf())
Parametere:
3. mester (mester)
Den setter sparkmaster-nettadressen som skal kobles til, for eksempel 'local' for å kjøre lokalt, 'local[4]' til å kjøre lokalt med 4 kjerner.
Parametere:
4. SparkSession.catalog
Det er et grensesnitt som brukeren kan opprette, slippe, endre eller spørre etter den underliggende databasen, tabellene, funksjonene osv.
5. SparkSession.conf
Det er kjøretidskonfigurasjonsgrensesnitt for gnist. Dette er grensesnittet gjennom som brukeren kan få og sette alle Spark- og Hadoop-konfigurasjoner som er relevante for Spark SQL.
klasse pyspark.sql.DataFrame
Det er en distribuert samling av data gruppert i navngitte kolonner. En DataFrame ligner på relasjonstabellen i Spark SQL, kan opprettes ved hjelp av forskjellige funksjoner i SQLContext.
student = sqlContext.read.csv('...')
Etter opprettelse av dataramme kan vi manipulere den ved å bruke flere domenespesifikke språkene (DSL) som er forhåndsdefinerte funksjoner til DataFrame. Tenk på følgende eksempel.
# To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id) .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'})
La oss vurdere følgende eksempel:
Spørre ved hjelp av Spark SQL
I den følgende koden lager vi først en DataFrame og utfører SQL-spørringene for å hente dataene. Tenk på følgende kode:
from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show()
Produksjon:
+----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
Bruke groupBy()-funksjonen
java dato til streng
groupBy()-funksjonen samler inn lignende kategoridata.
songdf.groupBy('Genre').count().show()
Produksjon:
+----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+
distribusjon (antallpartisjoner, *cols)
De fordeling() returnerer en ny DataFrame som er et partisjoneringsuttrykk. Denne funksjonen aksepterer to parametere antall partisjoner og *kol. De antall partisjoner parameteren angir måltallet for kolonner.
song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5)
Produksjon:
+---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows