PySpark - RDD

Agora que instalamos e configuramos o PySpark em nosso sistema, podemos programar em Python no Apache Spark. Porém, antes de fazer isso, vamos entender um conceito fundamental no Spark - RDD.

RDD significa Resilient Distributed Dataset, esses são os elementos que são executados e operam em vários nós para fazer processamento paralelo em um cluster. Os RDDs são elementos imutáveis, o que significa que, depois de criar um RDD, você não pode alterá-lo. Os RDDs também são tolerantes a falhas, portanto, em caso de qualquer falha, eles se recuperam automaticamente. Você pode aplicar várias operações nesses RDDs para realizar uma determinada tarefa.

Para aplicar operações nestes RDDs, existem duas maneiras -

  • Transformação e
  • Action

Vamos entender essas duas maneiras em detalhes.

Transformation- Estas são as operações aplicadas em um RDD para criar um novo RDD. Filter, groupBy e map são exemplos de transformações.

Action - Essas são as operações aplicadas no RDD, que instruem o Spark a realizar o cálculo e enviar o resultado de volta ao driver.

Para aplicar qualquer operação no PySpark, precisamos criar um PySpark RDDprimeiro. O bloco de código a seguir contém os detalhes de uma classe PySpark RDD -

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Vamos ver como executar algumas operações básicas usando o PySpark. O código a seguir em um arquivo Python cria palavras RDD, que armazenam um conjunto de palavras mencionadas.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

Agora vamos executar algumas operações em palavras.

contagem()

Número de elementos no RDD é retornado.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - O comando para count () é -

$SPARK_HOME/bin/spark-submit count.py

Output - A saída para o comando acima é -

Number of elements in RDD → 8

coletar ()

Todos os elementos do RDD são retornados.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - O comando para collect () é -

$SPARK_HOME/bin/spark-submit collect.py

Output - A saída para o comando acima é -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

Retorna apenas os elementos que atendem à condição da função dentro de foreach. No exemplo a seguir, chamamos uma função de impressão em foreach, que imprime todos os elementos no RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - O comando para foreach (f) é -

$SPARK_HOME/bin/spark-submit foreach.py

Output - A saída para o comando acima é -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filtro (f)

Um novo RDD é retornado contendo os elementos, o que satisfaz a função dentro do filtro. No exemplo a seguir, filtramos as strings que contêm '' spark ".

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - O comando para filtro (f) é -

$SPARK_HOME/bin/spark-submit filter.py

Output - A saída para o comando acima é -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, preservesPartitioning = False)

Um novo RDD é retornado aplicando uma função a cada elemento no RDD. No exemplo a seguir, formamos um par de valores-chave e mapeamos cada string com o valor 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - O comando para map (f, preservesPartitioning = False) é -

$SPARK_HOME/bin/spark-submit map.py

Output - A saída do comando acima é -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

reduzir (f)

Depois de realizar a operação binária comutativa e associativa especificada, o elemento no RDD é retornado. No exemplo a seguir, estamos importando pacote de adição do operador e aplicando-o em 'num' para realizar uma operação de adição simples.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - O comando para reduzir (f) é -

$SPARK_HOME/bin/spark-submit reduce.py

Output - A saída do comando acima é -

Adding all the elements -> 15

juntar (outro, numPartitions = None)

Ele retorna RDD com um par de elementos com as chaves correspondentes e todos os valores dessa chave específica. No exemplo a seguir, há dois pares de elementos em dois RDDs diferentes. Depois de juntar esses dois RDDs, obtemos um RDD com elementos com chaves correspondentes e seus valores.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - O comando para juntar (outro, numPartitions = None) é -

$SPARK_HOME/bin/spark-submit join.py

Output - A saída para o comando acima é -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache ()

Persista este RDD com o nível de armazenamento padrão (MEMORY_ONLY). Você também pode verificar se o RDD está armazenado em cache ou não.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - O comando para cache () é -

$SPARK_HOME/bin/spark-submit cache.py

Output - A saída para o programa acima é -

Words got cached -> True

Essas foram algumas das operações mais importantes realizadas no PySpark RDD.