Inferindo o esquema usando reflexão

Este método usa reflexão para gerar o esquema de um RDD que contém tipos específicos de objetos. A interface Scala para Spark SQL oferece suporte à conversão automática de um RDD contendo classes de caso em um DataFrame. ocase classdefine o esquema da tabela. Os nomes dos argumentos para a classe de caso são lidos usando reflexão e se tornam os nomes das colunas.

As classes de caso também podem ser aninhadas ou conter tipos complexos, como Sequências ou Arrays. Este RDD pode ser convertido implicitamente em um DataFrame e então registrado como uma tabela. As tabelas podem ser usadas em instruções SQL subsequentes.

Exemplo

Vamos considerar um exemplo de registros de funcionários em um arquivo de texto chamado employee.txt. Crie um RDD lendo os dados do arquivo de texto e converta-o em DataFrame usando funções SQL padrão.

Given Data - Dê uma olhada nos seguintes dados de um arquivo chamado employee.txt colocou-o no respectivo diretório atual onde o ponto de shell de faísca está sendo executado.

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Os exemplos a seguir explicam como gerar um esquema usando Reflections.

Inicie o Spark Shell

Inicie o Spark Shell usando o seguinte comando.

$ spark-shell

Criar SQLContext

Gere SQLContext usando o seguinte comando. Aqui,sc significa objeto SparkContext.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Importar funções SQL

Use o seguinte comando para importar todas as funções SQL usadas para converter implicitamente um RDD em um DataFrame.

scala> import sqlContext.implicts._

Criar classe de caso

Em seguida, temos que definir um esquema para dados de registro de funcionários usando uma classe de caso. O comando a seguir é usado para declarar a classe de caso com base nos dados fornecidos (id, nome, idade).

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

Criar RDD e aplicar transformações

Use o seguinte comando para gerar um RDD chamado empl lendo os dados de employee.txt e convertê-lo em DataFrame, usando as funções de mapa.

Aqui, duas funções de mapa são definidas. Um é para dividir o registro de texto em campos (.map(_.split(“,”))) e a segunda função de mapa para converter campos individuais (id, nome, idade) em um objeto de classe de caso (.map(e(0).trim.toInt, e(1), e(2).trim.toInt))

Finalmente, toDF() método é usado para converter o objeto de classe de caso com esquema em um DataFrame.

scala> val empl=sc.textFile("employee.txt")
.map(_.split(","))
.map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt))
.toDF()

Resultado

empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]

Armazene os dados do DataFrame em uma tabela

Use o seguinte comando para armazenar os dados DataFrame em uma tabela chamada employee. Após este comando, podemos aplicar todos os tipos de instruções SQL nele.

scala> empl.registerTempTable("employee")

A mesa dos funcionários está pronta. Vamos agora passar algumas consultas sql na tabela usandoSQLContext.sql() método.

Selecione Consulta no DataFrame

Use o seguinte comando para selecionar todos os registros do employeemesa. Aqui, usamos a variávelallrecordspara capturar todos os dados de registros. Para exibir esses registros, ligueshow() método sobre ele.

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")

Para ver os dados do resultado de allrecords DataFrame, use o seguinte comando.

scala> allrecords.show()

Resultado

+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1203 | amith   | 39 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

Where Clause SQL Query no DataFrame

Use o seguinte comando para aplicar wheredeclaração em uma tabela. Aqui, a variávelagefilter armazena os registros de funcionários com idade entre 20 e 35 anos.

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")

Para ver os dados do resultado de agefilter DataFrame, use o seguinte comando.

scala> agefilter.show()

Resultado

<console>:25, took 0.112757 s
+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

As duas consultas anteriores foram passadas em toda a tabela DataFrame. Agora, vamos tentar buscar dados do DataFrame de resultado aplicandoTransformations nele.

Buscar valores de ID do filtro de idade DataFrame usando o índice da coluna

A seguinte instrução é usada para buscar os valores de ID de agefilter Resultado RDD, usando índice de campo.

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

Resultado

<console>:25, took 0.093844 s
ID: 1201
ID: 1202
ID: 1204
ID: 1205

Essa abordagem baseada em reflexão leva a um código mais conciso e funciona bem quando você já conhece o esquema ao escrever seu aplicativo Spark.