You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Both Spark and Clickhouse work fine after I replaced type = 'upsert' by type = 'append-only', force_append_only = 'true'
🚿source
CREATE SOURCE drivers (
status STRING,
driver_id STRING,
location_id STRING,
location_lon double precision,
location_lat double precision,
timestamptimestamptz
) WITH (
connector ='kafka',
topic ='drivers_pingshan',
{kafka_info}
) FORMAT PLAIN ENCODE JSON;
🪣sink
CREATE SINK ridesharing_stat AS (
WITH way_events AS (
SELECT
way_id,
drivers.driver_idAS driver_id,
timestampFROM drivers
JOIN node
ONdrivers.location_id=node.id
) SELECT
way_id,
count(*) AS cnt,
window_start::timestampastimestampFROM TUMBLE(way_events, timestamp, INTERVAL '5 MINUTE')
GROUP BY way_id, window_start
) WITH (
connector='iceberg',
type ='upsert',
primary_key ='way_id',
catalog.type='rest',
catalog.uri='http://rest:8181',
s3.endpoint='http://minio:9000',
s3.access.key ='admin',
s3.secret.key ='password',
s3.region='us-east-1',
table.name='ridesharing.stat',
warehouse.path='s3://warehouse/ridesharing/stat',
database.name='default_catalog'
);
🌟Spark
SparkSQL cannot work with select count(*) from ridesharing.stat; and SELECT timestamp, way_id, cnt FROM ridesharing.stat LIMIT 10
[INTERNAL_ERROR] The Spark SQL phase optimization failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase optimization failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
at org.apache.spark.SparkException$.internalError(SparkException.scala:88)
at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:516)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:528)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:139)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:135)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:69)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:415)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:533)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:527)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:527)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:307)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.apache.iceberg.BaseDistributedDataScan.mayHaveEqualityDeletes(BaseDistributedDataScan.java:395)
at org.apache.iceberg.BaseDistributedDataScan.doPlanFiles(BaseDistributedDataScan.java:149)
at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139)
at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.tasks(SparkPartitioningAwareScan.java:174)
at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.taskGroups(SparkPartitioningAwareScan.java:202)
at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.outputPartitioning(SparkPartitioningAwareScan.java:104)
at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:44)
at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:42)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
at org.apache.spark.sql.catalyst.plans.logical.Aggregate.mapChildren(basicLogicalOperators.scala:1122)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.partitioning(V2ScanPartitioningAndOrdering.scala:42)
at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$1(V2ScanPartitioningAndOrdering.scala:35)
at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$3(V2ScanPartitioningAndOrdering.scala:38)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:37)
at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:33)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:143)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
... 41 more
🏚️ clickhouse
Surprisingly, clickhouse can work with SELECT timestamp, way_id, cnt FROM ridesharing_stat LIMIT 10, but not SELECT COUNT(*) FROM ridesharing_stat;
Received exception from server (version 23.9.2):
Code: 8. DB::Exception: Received from clickhouse:9000. DB::Exception: Not found field (cnt) in the following Arrow schema:
file_path: string not null
pos: int64 not null
: While executing ParquetBlockInputFormat: While executing Iceberg. (THERE_IS_NO_COLUMN)
(query: SELECT COUNT(*) FROM ridesharing_stat)
log in /var/log/clickhouse-server
2023.10.26 07:44:41.154303 [ 48 ] {8b8467ea-2bcb-4081-a95e-64d5879dd9ad} <Error> TCPHandler: Code: 8. DB::Exception: Not found field (cnt) in the following Arrow schema:
file_path: string not null
pos: int64 not null
: While executing ParquetBlockInputFormat: While executing Iceberg. (THERE_IS_NO_COLUMN), Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000c741d97 in /usr/bin/clickhouse
1. DB::Exception::Exception<String const&, String>(int, FormatStringHelperImpl<std::type_identity<String const&>::type, std::type_identity<String>::type>, String const&, String&&) @ 0x0000000007c8db87 in /usr/bin/clickhouse
2. DB::ParquetBlockInputFormat::initializeIfNeeded() @ 0x00000000134ddaf9 in /usr/bin/clickhouse
3. DB::ParquetBlockInputFormat::generate() @ 0x00000000134dff9f in /usr/bin/clickhouse
4. DB::ISource::tryGenerate() @ 0x0000000013369eb8 in /usr/bin/clickhouse
5. DB::ISource::work() @ 0x00000000133699ea in /usr/bin/clickhouse
6. DB::ExecutionThreadContext::executeTask() @ 0x00000000133818ba in /usr/bin/clickhouse
7. DB::PipelineExecutor::executeStepImpl(unsigned long, std::atomic<bool>*) @ 0x0000000013378370 in /usr/bin/clickhouse
8. DB::PipelineExecutor::executeStep(std::atomic<bool>*) @ 0x0000000013377b28 in /usr/bin/clickhouse
9. DB::PullingPipelineExecutor::pull(DB::Chunk&) @ 0x000000001338633a in /usr/bin/clickhouse
10. DB::StorageS3Source::generate() @ 0x0000000012a64d1f in /usr/bin/clickhouse
11. DB::ISource::tryGenerate() @ 0x0000000013369eb8 in /usr/bin/clickhouse
12. DB::ISource::work() @ 0x00000000133699ea in /usr/bin/clickhouse
13. DB::ExecutionThreadContext::executeTask() @ 0x00000000133818ba in /usr/bin/clickhouse
14. DB::PipelineExecutor::executeStepImpl(unsigned long, std::atomic<bool>*) @ 0x0000000013378370 in /usr/bin/clickhouse
15. void std::__function::__policy_invoker<void ()>::__call_impl<std::__function::__default_alloc_func<DB::PipelineExecutor::spawnThreads()::$_0, void ()>>(std::__function::__policy_storage const*) @ 0x000000001337948f in /usr/bin/clickhouse
16. ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>::worker(std::__list_iterator<ThreadFromGlobalPoolImpl<false>, void*>) @ 0x000000000c828e7f in /usr/bin/clickhouse
17. void std::__function::__policy_invoker<void ()>::__call_impl<std::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<false>::ThreadFromGlobalPoolImpl<void ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>::scheduleImpl<void>(std::function<void ()>, Priority, std::optional<unsigned long>, bool)::'lambda0'()>(void&&)::'lambda'(), void ()>>(std::__function::__policy_storage const*) @ 0x000000000c82c99c in /usr/bin/clickhouse
18. void* std::__thread_proxy[abi:v15000]<std::tuple<std::unique_ptr<std::__thread_struct, std::default_delete<std::__thread_struct>>, void ThreadPoolImpl<std::thread>::scheduleImpl<void>(std::function<void ()>, Priority, std::optional<unsigned long>, bool)::'lambda0'()>>(void*) @ 0x000000000c82b1c7 in /usr/bin/clickhouse
19. ? @ 0x00007fab963c9609 in ?
20. ? @ 0x00007fab962ee133 in ?
Describe the bug
Both Spark and Clickhouse work fine after I replaced
type = 'upsert'
bytype = 'append-only', force_append_only = 'true'
🚿source
🪣sink
🌟Spark
SparkSQL cannot work with
select count(*) from ridesharing.stat;
andSELECT timestamp, way_id, cnt FROM ridesharing.stat LIMIT 10
🏚️ clickhouse
Surprisingly, clickhouse can work with
SELECT timestamp, way_id, cnt FROM ridesharing_stat LIMIT 10
, but notSELECT COUNT(*) FROM ridesharing_stat;
log in
/var/log/clickhouse-server
Error message/log
No response
To Reproduce
No response
Expected behavior
No response
How did you deploy RisingWave?
The version of RisingWave
Additional context
No response
The text was updated successfully, but these errors were encountered: