Skip to main content
Announcements
Accelerate Your Success: Fuel your data and AI journey with the right services, delivered by our experts. Learn More
cancel
Showing results for 
Search instead for 
Did you mean: 
J_Ruiz
Contributor II
Contributor II

tFileInputParquet randomly changing the order of the schema on runtime

Greetings.

 

I'm having a problem with the component tFileInputParquet in an Apache Spark Batch on Talend Big Data R2020-09-7.3.1. I use a remote connection to TAC and a Job Server to run the job.

 

I'm currently using YARN Cluster spark mode with CDH6.3.2.

 

I have the following part of a batch that, after some processing, makes two parquet outputs. No problem here.

0695b00000N491XAAR.png 

I, however, wish to unite both parquet outputs later on, and I've already used a tReplicate.

Therefore, I use an OnSubjobOk to retrieve both.

 

0695b00000N4926AAB.png 

The schema of all 5 parquet components is as follows:

 

0695b00000N496DAAR.png 

However, when I execute the batch, I get the following exception:

 

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 49 in stage 2.0 failed 4 times, most recent failure: Lost task 49.3 in stage 2.0 (TID 67, slaaeizeba01.enelint.global, executor 9): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.sql.Timestamp is not a valid external type for schema of string

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, calidad_medida), StringType), true, false) AS calidad_medida# 116

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, magnitude), StringType), true, false) AS magnitude# 117

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, manufact), StringType), true, false) AS manufact# 118

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, mdm_strat), StringType), true, false) AS mdm_strat# 119

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, meas_end_ts), TimestampType), true, false) AS meas_end_ts# 120

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, meas_start_ts), TimestampType), true, false) AS meas_start_ts# 121

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, meas_ym), StringType), true, false) AS meas_ym# 122

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, model), StringType), true, false) AS model# 123

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, pod), StringType), true, false) AS pod# 124

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, serial_n), StringType), true, false) AS serial_n# 125

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, status), StringType), true, false) AS status# 126

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, territorio), StringType), true, false) AS territorio# 127

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, type), StringType), true, false) AS type# 128

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.types.Decimal$, DecimalType(30,0), fromDecimal, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, val), DecimalType(30,0)), true, false) AS val# 129

at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)

at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:602)

at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:602)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)

at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

at org.apache.spark.scheduler.Task.run(Task.scala:121)

at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.RuntimeException: java.sql.Timestamp is not a valid external type for schema of string

at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_3$(Unknown Source)

at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_1$(Unknown Source)

at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)

at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)

 

From what I've seen online, this is an error that occurs when the schema types doesn't match the actual types of the data the component retrieves.

 

The strange thing is, the schema seems to have changed in runtime. The stack trace shows the schema on a completely different order, not to mention the StaticInvoke_3 seems to match 'mdm_strat' column on runtime (string) but 'meas_start_ts' on the talend component schema (date/Timestamp).

 

Any idea of what could be happening here? Thanks.

Labels (5)
1 Reply
Anonymous
Not applicable

This is probably best raised with Support. I feel that further analysis of your job may be required to see where the issue is here.