具有计数的pyspark sql

问题描述 投票:1回答:2

我正在尝试从表WarehousesBoxes中选择所有仓库代码这样Warehouse.capacity小于Boxes.count_of_boxes

在PostgreSQL中有效的SQL查询

select w.code
from Warehouses w
join Boxes b
on w.code = b.warehouse
group by w.code
having count(b.code) > w.capacity

但是同一查询在pyspark中不起作用

spark.sql("""
select w.code
from Warehouses w
join Boxes b
on w.code = b.warehouse
group by w.code
having count(b.code) > w.capacity

""").show()

如何修复代码?

设置

import numpy as np
import pandas as pd


# pyspark
import pyspark
from pyspark.sql import functions as F 
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext, SQLContext


spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sc.setLogLevel("INFO")


# warehouse
dfw = pd.DataFrame({'code': [1, 2, 3, 4, 5],
          'location': ['Chicago', 'Chicago', 'New York', 'Los Angeles', 'San Francisco'],
          'capacity': [3, 4, 7, 2, 8]})

schema = StructType([
    StructField('code',IntegerType(),True),
    StructField('location',StringType(),True),
    StructField('capacity',IntegerType(),True),
    ])

sdfw = sqlContext.createDataFrame(dfw, schema)
sdfw.createOrReplaceTempView("Warehouses")


# box
dfb = pd.DataFrame({'code': ['0MN7', '4H8P', '4RT3', '7G3H', '8JN6', '8Y6U', '9J6F', 'LL08', 'P0H6', 'P2T6', 'TU55'],
          'contents': ['Rocks', 'Rocks', 'Scissors', 'Rocks', 'Papers', 'Papers', 'Papers', 'Rocks', 'Scissors', 'Scissors', 'Papers'],
          'value': [180.0, 250.0, 190.0, 200.0, 75.0, 50.0, 175.0, 140.0, 125.0, 150.0, 90.0],
          'warehouse': [3, 1, 4, 1, 1, 3, 2, 4, 1, 2, 5]})

schema = StructType([
    StructField('code',StringType(),True),
    StructField('contents',StringType(),True),
    StructField('value',FloatType(),True),
    StructField('warehouse',IntegerType(),True),

    ])

sdfb = sqlContext.createDataFrame(dfb, schema)
sdfb.createOrReplaceTempView("Boxes")

spark.sql("""
select w.code
from Warehouses w
join Boxes b
on w.code = b.warehouse
group by w.code
having count(b.code) > w.capacity

""").show()
python pyspark pyspark-sql
2个回答
2
投票

尝试一下。错误是spark无法找到容量,因为它没有包装在聚合函数中。 First应该为您做到这一点。:

spark.sql("""
select w.code
from Warehouses w
join Boxes b
on w.code = b.warehouse
group by w.code
having count(b.code) > first(w.capacity)

""").show()

0
投票

如何修复代码?

问题可能与您的代码无关。

从您使用的Java JDK中检查版本。我知道的是spark.sql().show()与Java JDK版本11不兼容。如果使用的是该版本,只需降级到版本8(还要正确配置JDK 8的环境变量)。

© www.soinside.com 2019 - 2024. All rights reserved.