Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double #2055

Open
1 of 2 tasks
sarath-mec opened this issue Jan 12, 2023 · 13 comments

Comments

@sarath-mec
Copy link

sarath-mec commented Jan 12, 2023

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

Trying a simple Example of Reading ElasticSearch GeoIP data from Sample kibana_sample_data_ecommerce
https://www.elastic.co/guide/en/kibana/8.6/get-started.html

If we try to Read the geoip.location field which is a geoip filed , df.show() will error with the following message

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double

Tried with various options without any success

  • .option("es.mapping.date.rich", False)
  • .option("es.field.read.empty.as.null", False)

Steps to reproduce

Code:

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("SparkTest").getOrCreate()

df = (spark.read
      .format( "org.elasticsearch.spark.sql" )
      .option( "es.nodes", "<host_ip>" )
      .option( "es.port", "9200")
      .option( "es.resource", "kibana_sample_data_ecommerce" )
      .option("es.read.metadata", True)
      .option("es.read.field.as.array.include", "products,manufacturer,category,sku")
      .option("es.mapping.date.rich", False)
      .option("es.field.read.empty.as.null", False)
      #.option("es.read.field.exclude", "geoip.location")
      .option( "es.query", "?q=*:*" )
      .load()
      )
df.printSchema()
df.count()

Run
df.show()

Strack trace:

23/01/12 19:21:54 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 22) (cluster-a104-w-0.c.api-staging-367614.internal executor 2): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, category), ArrayType(StringType,true)), None) AS category#1236
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, currency), StringType), true, false) AS currency#1237
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, customer_birth_date), StringType), true, false) AS customer_birth_date#1238
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, customer_first_name), StringType), true, false) AS customer_first_name#1239
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, customer_full_name), StringType), true, false) AS customer_full_name#1240
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, customer_gender), StringType), true, false) AS customer_gender#1241
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, customer_id), StringType), true, false) AS customer_id#1242
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, customer_last_name), StringType), true, false) AS customer_last_name#1243
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, customer_phone), StringType), true, false) AS customer_phone#1244
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, day_of_week), StringType), true, false) AS day_of_week#1245
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, day_of_week_i), IntegerType) AS day_of_week_i#1246
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, email), StringType), true, false) AS email#1247
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(dataset, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, event), StructField(dataset,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, event), StructField(dataset,StringType,true)), 0, dataset), StringType), true, false)) AS event#1248
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(city_name, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 0, city_name), StringType), true, false), continent_name, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 1, continent_name), StringType), true, false), country_iso_code, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 2, country_iso_code), StringType), true, false), location, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else named_struct(lat, if (validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 3, location), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 3, location), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)), 0, lat), DoubleType), lon, if (validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 3, location), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 3, location), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)), 1, lon), DoubleType)), region_name, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 4, region_name), StringType), true, false)) AS geoip#1249
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, manufacturer), ArrayType(StringType,true)), None) AS manufacturer#1250
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, order_date), StringType), true, false) AS order_date#1251
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, order_id), StringType), true, false) AS order_id#1252
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), if (isnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)))) null else named_struct(_id, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 0, _id), StringType), true, false), base_price, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 1, base_price), FloatType), base_unit_price, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 2, base_unit_price), FloatType), category, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 3, category), StringType), true, false), created_on, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 4, created_on), StringType), true, false), discount_amount, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 5, discount_amount), FloatType), discount_percentage, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 6, discount_percentage), FloatType), manufacturer, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 7, manufacturer), StringType), true, false), min_price, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 8, min_price), FloatType), price, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 9, price), FloatType), product_id, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 10, product_id), LongType), product_name, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 11, product_name), StringType), true, false), ... 12 more fields), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, products), ArrayType(StructType(StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)),true)), None) AS products#1253
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -4), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, sku), ArrayType(StringType,true)), None) AS sku#1254
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, taxful_total_price), FloatType) AS taxful_total_price#1255
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, taxless_total_price), FloatType) AS taxless_total_price#1256
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, total_quantity), IntegerType) AS total_quantity#1257
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, total_unique_products), IntegerType) AS total_unique_products#1258
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, type), StringType), true, false) AS type#1259
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 24, user), StringType), true, false) AS user#1260
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS _metadata#1261
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:213)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:195)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_2$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_0_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_2_7$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:209)
	... 19 more

23/01/12 19:21:55 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 13.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[13], line 20
     18 df.printSchema()
     19 df.count()
---> 20 df.show()

File /usr/lib/spark/python/pyspark/sql/dataframe.py:484, in DataFrame.show(self, n, truncate, vertical)
    441 """Prints the first ``n`` rows to the console.
    442 
    443 .. versionadded:: 1.3.0
   (...)
    481  name | Bob
    482 """
    483 if isinstance(truncate, bool) and truncate:
--> 484     print(self._jdf.showString(n, 20, vertical))
    485 else:
    486     print(self._jdf.showString(n, int(truncate), vertical))

File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File /usr/lib/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)

File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o275.showString.

Version Info

Jar : elasticsearch-spark-30_2.12-8.6.0.jar

OS: : Google Data Proc 3 Node Default Cluster
JVM :
Hadoop/Spark:
ES-Hadoop :
ES :

PySpark Version Info:

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.3
      /_/
                        
Using Scala version 2.12.14, OpenJDK 64-Bit Server VM, 1.8.0_352
Branch HEAD
Compiled by user  on 2022-11-01T22:00:39Z
Revision b28f046c307a8374984c0231d76debeb3a3beb97
Url https://bigdataoss-internal.googlesource.com/third_party/apache/spark``` 


@masseyke
Copy link
Member

I just tried to reproduce this using the environment at https://github.com/masseyke/es-spark-docker and it worked fine for me. I used:

  • elasticsearch 8.1.0
  • es-hadoop elasticsearch-spark-30_2.12-8.1.0.jar
  • java 1.8.0_312
  • spark 3.2.1
  • scala 2.12.15
  • python 3.8.10

Here's the output:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Python version 3.8.10 (default, Nov 26 2021 20:14:08)
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = yarn, app id = application_1673556566212_0001).
SparkSession available as 'spark'.
>>> df = spark.read.format( "org.elasticsearch.spark.sql" ).option( "es.resource", "kibana_sample_data_ecommerce" ).option("es.read.metadata", True).option("es.read.field.as.array.include", "products,manufacturer,category,sku").option("es.mapping.date.rich", False).option("es.field.read.empty.as.null", False).option( "es.query", "?q=*:*" ).load()
>>> df.printSchema()
root
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currency: string (nullable = true)
 |-- customer_birth_date: string (nullable = true)
 |-- customer_first_name: string (nullable = true)
 |-- customer_full_name: string (nullable = true)
 |-- customer_gender: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_last_name: string (nullable = true)
 |-- customer_phone: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- day_of_week_i: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- dataset: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- city_name: string (nullable = true)
 |    |-- continent_name: string (nullable = true)
 |    |-- country_iso_code: string (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |    |-- region_name: string (nullable = true)
 |-- manufacturer: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- order_date: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _id: string (nullable = true)
 |    |    |-- base_price: float (nullable = true)
 |    |    |-- base_unit_price: float (nullable = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- created_on: string (nullable = true)
 |    |    |-- discount_amount: float (nullable = true)
 |    |    |-- discount_percentage: float (nullable = true)
 |    |    |-- manufacturer: string (nullable = true)
 |    |    |-- min_price: float (nullable = true)
 |    |    |-- price: float (nullable = true)
 |    |    |-- product_id: long (nullable = true)
 |    |    |-- product_name: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- sku: string (nullable = true)
 |    |    |-- tax_amount: float (nullable = true)
 |    |    |-- taxful_price: float (nullable = true)
 |    |    |-- taxless_price: float (nullable = true)
 |    |    |-- unit_discount_amount: float (nullable = true)
 |-- sku: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- taxful_total_price: float (nullable = true)
 |-- taxless_total_price: float (nullable = true)
 |-- total_quantity: integer (nullable = true)
 |-- total_unique_products: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- user: string (nullable = true)
 |-- _metadata: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

>>> df.count()
4675

I'll upgrade to a newer elasticsearch/es-hadoop to see if that makes a difference. I'm wondering though if upgrading to a newer spark might fix your problem.

@masseyke
Copy link
Member

It also works with elasticsearch 8.6.0 and elasticsearch-spark-30_2.12-8.6.0.jar in my environment (otherwise the same as the one I described previously).

@masseyke
Copy link
Member

It works for me with spark 3.1.3 as well. Here's what I've got:

  • elasticsearch 8.1.0
  • es-hadoop elasticsearch-spark-30_2.12-8.6.0.jar
  • java 1.8.0_312
  • spark 3.1.3
  • scala 2.12.10
  • python 3.8.10

Otherwise I'm just installing "Sample eCommerce orders" in kibana and running the code I pasted above.

@sarath-mec
Copy link
Author

sarath-mec commented Jan 12, 2023

Hi,

Thanks for trying to simulate the issue. I missed one step in code. df.show() only fails. Print schema and count works fine for me as well. Could you please try that as well

Thank,
Sarath

@masseyke
Copy link
Member

Oh that makes sense. I can reproduce it now. The problem is a lat or lon that doesn't have a decimal. Here's the data I used to reproduce it:

PUT test
{
  "mappings": {
    "properties": {
      "location": {
        "type": "geo_point"
      }
    }
  }
}

POST test/_doc
{
    "location": {
      "lat": 41.1,
      "lon": -71.2
  }
}

POST test/_doc
{
    "location": {
      "lat": 41,
      "lon": -71
  }
}

Note that you have to have at least one document in there that does have decimals. And then in spark:

val df = spark.read.format("es").option( "es.resource", "test" ).load()
df.show()

I'll see if this is easily fixable.

@sarath-mec
Copy link
Author

Thanks for the same. I am new to Spark and not well versed.
Coming from a DB background, I know now Bigquery and many databases support schemaless JSON inside a column table.

Q1. Is there a way to read the whole elastic JSON record as a schemaless column in data frame

Q2: is there a way to read the elastic columns as default string, rather than an interpreted rich datatype. This way many of the reading errors with elastic can be avoided

@sarath-mec
Copy link
Author

Hi,

I noticed that elastic search has a coerce feature and there can be data corruption. Is there a way to overcome this in spark

https://xeraa.net/blog/2020_elasticsearch-coerce-float-to-integer-or-long/

@masseyke
Copy link
Member

This unfortunately does not look like an easy one to fix. The reason is that the only information we have in es-hadoop is that elasticsearch has handed us an integer, so that's what we pass to spark (and spark isn't being very flexible here). A little further away in the stack we have access to the schema, but that only tells us that we're in a geo_point field (and unfortunately the geo_point field type has several variants). I'll see if my colleague has any better ideas when he's back next week.
If you have control of the data being written into elasticsearch, you can work around this by always putting a decimal in your lats and lons (like 40.0 instead of 40).
Mainly for my own reference, here's where we're parsing what is returned from elasticsearch as an int:

_parseNumericValue:354, JsonNumericParserBase (org.elasticsearch.hadoop.thirdparty.codehaus.jackson.impl)
getNumberType:225, JsonNumericParserBase (org.elasticsearch.hadoop.thirdparty.codehaus.jackson.impl)
numberType:196, JacksonJsonParser (org.elasticsearch.hadoop.serialization.json)
numberType:143, BlockAwareJsonParser (org.elasticsearch.hadoop.serialization.json)
mapping:1107, ScrollReader (org.elasticsearch.hadoop.serialization)
read:898, ScrollReader (org.elasticsearch.hadoop.serialization)
map:1058, ScrollReader (org.elasticsearch.hadoop.serialization)
read:895, ScrollReader (org.elasticsearch.hadoop.serialization)
map:1058, ScrollReader (org.elasticsearch.hadoop.serialization)
read:895, ScrollReader (org.elasticsearch.hadoop.serialization)
readHitAsMap:608, ScrollReader (org.elasticsearch.hadoop.serialization)
readHit:432, ScrollReader (org.elasticsearch.hadoop.serialization)
read:298, ScrollReader (org.elasticsearch.hadoop.serialization)
read:262, ScrollReader (org.elasticsearch.hadoop.serialization)
scroll:320, RestRepository (org.elasticsearch.hadoop.rest)
hasNext:94, ScrollQuery (org.elasticsearch.hadoop.rest)
hasNext:66, AbstractEsRDDIterator (org.elasticsearch.spark.rdd)
hasNext:460, Iterator$$anon$10 (scala.collection)
processNext:-1, GeneratedClass$GeneratedIteratorForCodegenStage1 (org.apache.spark.sql.catalyst.expressions)
hasNext:43, BufferedRowIterator (org.apache.spark.sql.execution)
hasNext:759, WholeStageCodegenExec$$anon$1 (org.apache.spark.sql.execution)
$anonfun$getByteArrayRdd$1:349, SparkPlan (org.apache.spark.sql.execution)
apply:-1, 1948572920 (org.apache.spark.sql.execution.SparkPlan$$Lambda$2524)
$anonfun$mapPartitionsInternal$2:898, RDD (org.apache.spark.rdd)
$anonfun$mapPartitionsInternal$2$adapted:898, RDD (org.apache.spark.rdd)
apply:-1, 1890488482 (org.apache.spark.rdd.RDD$$Lambda$2525)
compute:52, MapPartitionsRDD (org.apache.spark.rdd)
computeOrReadCheckpoint:373, RDD (org.apache.spark.rdd)
iterator:337, RDD (org.apache.spark.rdd)
runTask:90, ResultTask (org.apache.spark.scheduler)
run:131, Task (org.apache.spark.scheduler)
$anonfun$run$3:506, Executor$TaskRunner (org.apache.spark.executor)
apply:-1, 851314936 (org.apache.spark.executor.Executor$TaskRunner$$Lambda$1150)
tryWithSafeFinally:1462, Utils$ (org.apache.spark.util)
run:509, Executor$TaskRunner (org.apache.spark.executor)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)

@masseyke
Copy link
Member

Q1. Is there a way to read the whole elastic JSON record as a schemaless column in data frame

The esJsonRDD method is what you want -- https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read-json.

@sarath-mec
Copy link
Author

sarath-mec commented Jan 21, 2023

Thanks for the comments,
Putting the esJSONRDD method for reference

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType}
// Configuration
val conf = new SparkConf().setAppName("Test").setMaster("local[*]")
conf.set("spark.es.nodes", "XXXXXX")
conf.set("spark.es.port", "9200")
conf.set("spark.es.resource", "kibana_sample_data_ecommerce" )
conf.set("spark.es.read.metadata", "true")
conf.set("spark.es.query", "?q=*:*" )
conf.set("spark.es.nodes.wan.only", "true")
// Spark Context from conf
val sc = new SparkContext(conf)
// Spark Session
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
// Read RDD
val RDD = sc.esJsonRDD()

val df = spark.createDataFrame(RDD).toDF("id", "source")

// 
If you want specific schema
val rowRDD = RDD.map(t => Row(t._1, t._2))

val table_schema = new StructType()
  .add(StructField("id", StringType, false))
  .add(StructField("source", StringType, true, Metadata.fromJson("""{"sqlType":"JSON"}""")))
  
val df = spark.createDataFrame(rowRDD, table_schema)
//

Thanks,
Sarath

@nick-nsc
Copy link

Hi there, I'm currently experiencing a rather similar issue on "official" sample data:
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of string

I'm using the "Kibana Sample Data Logs" and my PySpark code fails at at df.show().
df.printSchema() and df.count() does work.

Is the problem in the data or in es-hadoop?

Code

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TestReader").getOrCreate()
df = spark.read.format("es").option("es.resource", "kibana_sample_data_logs").load()

df.printSchema()
print(df.count())

df.show()    # error

Version Info

Java version: 1.8.0_382
Scala version: 2.12.15
Spark version: 3.2.4
Python version: 3.10.12
Elasticsearch version: 8.8.2
ES-Hadoop version: elasticsearch-spark-30_2.12-8.8.2.jar

PySpark Info:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.4
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_382
Branch HEAD
Compiled by user centos on 2023-04-09T20:59:10Z
Revision 0ae10ac18298d1792828f1d59b652ef17462d76e
Url https://github.com/apache/spark
Type --help for more information.

@sarath-mec
Copy link
Author

@jbaiera

Elastic Search Hadoop has a scala version and would have to leverage that for processing. The issue is occuring because the library gets the schema of your index and some data inside elasticsearch is not the same as the schema inference. This is due to the fact that elasticsearch stores the original data as JSON only and when indexing it parses to the data type of schema in lucene for searching. Hence the current library when it parses the original JSON ( not indexes one) infers a wrong datatype and errors. This can be even that a field is defined as float and the data is stored in JSON as 1.2, 1,2.3 etc.. The "1" here will fail as it is not a float and an integer.

To overcome this either correct the problematic records or leverage the Scala version(not Pyspark) and use the esJSONRDD method

@nick-nsc
Copy link

I see...
Is there a way to reliably determine/debug the problematic records?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants