Interface BatchWrite
- All Known Subinterfaces:
DeltaBatchWrite
The writing procedure is:
- Create a writer factory by
createBatchWriterFactory(PhysicalWriteInfo)
, serialize and send it to all the partitions of the input data(RDD). - For each partition, create the data writer, and write the data of the partition with this
writer. If all the data are written successfully, call
DataWriter.commit()
. If exception happens during the writing, callDataWriter.abort()
. - If all writers are successfully committed, call
commit(WriterCommitMessage[])
. If some writers are aborted, or the job failed with an unknown reason, callabort(WriterCommitMessage[])
.
While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should do it manually in their Spark applications if they want to retry.
Please refer to the documentation of commit/abort methods for detailed specifications.
- Since:
- 3.0.0
-
Method Summary
Modifier and TypeMethodDescriptionvoid
abort
(WriterCommitMessage[] messages) Aborts this writing job because some data writers are failed and keep failing when retry, or the Spark job fails with some unknown reasons, oronDataWriterCommit(WriterCommitMessage)
fails, orcommit(WriterCommitMessage[])
fails.void
commit
(WriterCommitMessage[] messages) Commits this writing job with a list of commit messages.default void
commit
(WriterCommitMessage[] messages, Map<String, Long> metrics) Commits this writing job with a list of commit messages and operation metrics.Creates a writer factory which will be serialized and sent to executors.default void
onDataWriterCommit
(WriterCommitMessage message) Handles a commit message on receiving from a successful data writer.default boolean
Returns whether Spark should use the commit coordinator to ensure that at most one task for each partition commits.
-
Method Details
-
createBatchWriterFactory
Creates a writer factory which will be serialized and sent to executors.If this method fails (by throwing an exception), the action will fail and no Spark job will be submitted.
- Parameters:
info
- Physical information about the input data that will be written to this table.
-
useCommitCoordinator
default boolean useCommitCoordinator()Returns whether Spark should use the commit coordinator to ensure that at most one task for each partition commits.- Returns:
- true if commit coordinator should be used, false otherwise.
-
onDataWriterCommit
Handles a commit message on receiving from a successful data writer. If this method fails (by throwing an exception), this writing job is considered to to have been failed, andabort(WriterCommitMessage[])
would be called. -
commit
Commits this writing job with a list of commit messages. The commit messages are collected from successful data writers and are produced byDataWriter.commit()
. If this method fails (by throwing an exception), this writing job is considered to to have been failed, andabort(WriterCommitMessage[])
would be called. The state of the destination is undefined and @abort(WriterCommitMessage[])
may not be able to deal with it. Note that speculative execution may cause multiple tasks to run for a partition. By default, Spark uses the commit coordinator to allow at most one task to commit. Implementations can disable this behavior by overridinguseCommitCoordinator()
. If disabled, multiple tasks may have committed successfully and one successful commit message per task will be passed to this commit method. The remaining commit messages are ignored by Spark. -
commit
Commits this writing job with a list of commit messages and operation metrics.If this method fails (by throwing an exception), this writing job is considered to to have been failed, and
abort(WriterCommitMessage[])
would be called. The state of the destination is undefined and @abort(WriterCommitMessage[])
may not be able to deal with it.Note that speculative execution may cause multiple tasks to run for a partition. By default, Spark uses the commit coordinator to allow at most one task to commit. Implementations can disable this behavior by overriding
useCommitCoordinator()
. If disabled, multiple tasks may have committed successfully and one successful commit message per task will be passed to this commit method. The remaining commit messages are ignored by Spark.- Parameters:
messages
- a list of commit messages from successful data writers, produced byDataWriter.commit()
.metrics
- a map of operation metrics collected from the query producing write. The keys will be prefixed by operation type, eg `merge`.Currently supported metrics are:
- Operation Type = `merge`
- `numTargetRowsCopied`: number of target rows copied unmodified because they did not match any action
- `numTargetRowsDeleted`: number of target rows deleted
- `numTargetRowsUpdated`: number of target rows updated
- `numTargetRowsInserted`: number of target rows inserted
- `numTargetRowsMatchedUpdated`: number of target rows updated by a matched clause
- `numTargetRowsMatchedDeleted`: number of target rows deleted by a matched clause
- `numTargetRowsNotMatchedBySourceUpdated`: number of target rows updated by a not matched by source clause
- `numTargetRowsNotMatchedBySourceDeleted`: number of target rows deleted by a not matched by source clause
- Operation Type = `merge`
-
abort
Aborts this writing job because some data writers are failed and keep failing when retry, or the Spark job fails with some unknown reasons, oronDataWriterCommit(WriterCommitMessage)
fails, orcommit(WriterCommitMessage[])
fails. If this method fails (by throwing an exception), the underlying data source may require manual cleanup. Unless the abort is triggered by the failure of commit, the given messages should have some null slots as there maybe only a few data writers that are committed before the abort happens, or some data writers were committed but their commit messages haven't reached the driver when the abort is triggered. So this is just a "best effort" for data sources to clean up the data left by data writers.
-