基因数据处理32之Avocado运行记录(人造数据集)
发布时间:2021-03-07 01:14:18 所属栏目:大数据 来源:网络整理
导读:主要是需要数据正确,如果中间缺少记录,avocado一般不会成功 1.代码: Avocado修改: /** * Licensed to Big Data Genomics (BDG) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional
|
副标题[/!--empirenews.page--]
主要是需要数据正确,如果中间缺少记录,avocado一般不会成功 /** * Licensed to Big Data Genomics (BDG) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The BDG licenses this file * to you under the Apache License,Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing,software * distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.bdgenomics.avocado.cli
import java.nio.file.Files
import org.apache.commons.configuration.HierarchicalConfiguration
import org.apache.commons.configuration.plist.PropertyListConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext,Logging}
import org.kohsuke.args4j.{Option => option,Argument}
import org.bdgenomics.adam.models.{VariantContext,ReferenceRegion}
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.avocado.Timers._
import org.bdgenomics.avocado.discovery.Explore
import org.bdgenomics.avocado.genotyping.CallGenotypes
import org.bdgenomics.avocado.input.Input
import org.bdgenomics.avocado.models.Observation
import org.bdgenomics.avocado.preprocessing.Preprocessor
import org.bdgenomics.avocado.postprocessing.Postprocessor
import org.bdgenomics.avocado.stats.AvocadoConfigAndStats
import org.bdgenomics.formats.avro.{
Variant,AlignmentRecord,NucleotideContigFragment,Genotype
}
import org.bdgenomics.utils.cli.{
BDGSparkCommand,BDGCommandCompanion,ParquetArgs,Args4j,Args4jBase
}
import org.bdgenomics.utils.instrumentation._
object Avocado extends BDGCommandCompanion {
val commandName = "Avocado"
val commandDescription = "Call variants using avocado and the ADAM preprocessing pipeline."
def apply(args: Array[String]) = {
new Avocado(Args4j[AvocadoArgs](args))
}
}
class AvocadoArgs extends Args4jBase with ParquetArgs {
@Argument(metaVar = "READS",required = true,usage = "ADAM read-oriented data",index = 0)
var readInput: String = _
@Argument(metaVar = "REFERENCE",usage = "ADAM or FASTA reference genome data",index = 1)
var referenceInput: String = _
@Argument(metaVar = "VARIANTS",usage = "ADAM variant output",index = 2)
var variantOutput: String = _
@Argument(metaVar = "CONFIG",usage = "avocado configuration file",index = 3)
var configFile: String = _
@option(name = "-debug",usage = "If set,prints a higher level of debug output.")
var debug = false
@option(required = false,name = "-fragment_length",usage = "Sets maximum fragment length. Default value is 10,000. Values greater than 1e9 should be avoided.")
var fragmentLength: Long = 10000L
}
class Avocado(protected val args: AvocadoArgs) extends BDGSparkCommand[AvocadoArgs] with Logging {
// companion object to this class - needed for BDGCommand framework
val companion = Avocado
/** *********add by xubo 20160528 *******************/
println("companion:" + companion)
println("companion.commandName:" + companion.commandName)
println("companion.commandDescription:" + companion.commandDescription)
// println("companion.commandName:"+companion.)
println("AvocadoArgs:")
println("args:" + args)
println("args.configFile:" + args.configFile)
println("args.debug:" + args.debug)
println("args.fragmentLength:" + args.fragmentLength)
println("args.readInput:" + args.readInput)
println("args.referenceInput:" + args.referenceInput)
println("args.variantOutput:" + args.variantOutput)
println("test endn")
/** *********add by xubo 20160528 *******************/
// get config off classpath and load into a temp file...
val stream = Thread.currentThread.getContextClassLoader.getResourceAsStream(args.configFile)
val tempPath = Files.createTempDirectory("config")
val tempFilePath = tempPath.resolve("temp.properties")
Files.copy(stream,tempFilePath)
/** *********add by xubo 20160528 *******************/
println("stream:" + stream)
println("tempFilePath:" + tempFilePath)
/** *********add by xubo 20160528 *******************/
// load config
val config: HierarchicalConfiguration = new PropertyListConfiguration(tempFilePath.toFile)
val preprocessorNames = getStringArrayFromConfig("preprocessorNames")
val preprocessorAlgorithms = getStringArrayFromConfig("preprocessorAlgorithms")
assert(preprocessorNames.length == preprocessorAlgorithms.length,"Must have a 1-to-1 mapping between preprocessor names and algorithms.")
val preprocessingStagesZippedWithNames = preprocessorNames.zip(preprocessorAlgorithms)
val explorerName = config.getString("explorerName")
val explorerAlgorithm = config.getString("explorerAlgorithm")
val genotyperName = config.getString("genotyperName")
val genotyperAlgorithm = config.getString("genotyperAlgorithm")
val postprocessorNames = getStringArrayFromConfig("postprocessorNames")
val postprocessorAlgorithms = getStringArrayFromConfig("postprocessorAlgorithms")
assert(postprocessorNames.length == postprocessorAlgorithms.length,"Must have a 1-to-1 mapping between postprocessor names and algoritms.")
val postprocessorsZipped = postprocessorNames.zip(postprocessorAlgorithms)
val debug = args.debug
/** *********add by xubo 20160528 *******************/
println("config:" + config)
println("preprocessorNames:" + preprocessorNames)
preprocessorNames.foreach(println)
println("preprocessorAlgorithms:" + preprocessorAlgorithms)
preprocessorAlgorithms.foreach(println)
println("preprocessingStagesZippedWithNames:" + preprocessingStagesZippedWithNames)
preprocessorNames.foreach(println)
println("explorerName:" + explorerName)
println("explorerAlgorithm:" + explorerAlgorithm)
println("genotyperName:" + genotyperName)
println("genotyperAlgorithm:" + genotyperAlgorithm)
println("postprocessorNames:" + postprocessorNames)
postprocessorNames.foreach(println)
println("postprocessorAlgorithms:" + postprocessorAlgorithms)
postprocessorAlgorithms.foreach(println)
println("postprocessorsZipped:" + postprocessorsZipped)
postprocessorsZipped.foreach(println)
println("stream:" + stream)
println("stream:" + stream)
println("stream:" + stream)
println("stream:" + stream)
/** *********add by xubo 20160528 *******************/
private def getStringArrayFromConfig(name: String): Array[String] = {
config.getStringArray(name).map(_.toString)
}
/** * Applies several pre-processing steps to the read pipeline. Currently,these are the default * steps in the ADAM processing pipeline. * * @param reads RDD of reads to process. * @return RDD containing reads that have been sorted and deduped. */
def preProcessReads(reads: RDD[AlignmentRecord]): RDD[AlignmentRecord] = PreprocessReads.time {
var processedReads = reads //.cache
if (debug) {
log.info("avocado: Preprocessing " + processedReads.count + " reads.")
}
// loop over preprocessing stages and apply
preprocessingStagesZippedWithNames.foreach(p => {
val (stageName,stageAlgorithm) = p
log.info("avocado: Running " + stageName)
// run this preprocessing stage
processedReads = Preprocessor(processedReads,stageName,stageAlgorithm,config)
})
// return processed reads
processedReads
}
/** * Applies variant calling algorithms to reads and pileups. Reduces down and returns called variants. * * @param reads * @param stats * @return Joined output of variant calling algorithms. */
def callVariants(reads: RDD[AlignmentRecord],stats: AvocadoConfigAndStats): RDD[VariantContext] = CallVariants.time {
val discoveries: RDD[Observation] = Explore(explorerAlgorithm,explorerName,reads,stats,config)
CallGenotypes(genotyperAlgorithm,genotyperName,discoveries,config)
}
/** * Applies variant post-processing methods to called variants. Post-processing can * include methods which modify the information in variant calls,or alternatively,* methods that filter out spurious variant calls. * * @param variants RDD of variants to process. * @return Post-processed variants. */
def postProcessVariants(variants: RDD[VariantContext],stats: AvocadoConfigAndStats): RDD[VariantContext] = PostprocessVariants.time {
var rdd = variants
// loop over post processing steps
postprocessorsZipped.foreach(p => {
val (ppStageName,ppAlgorithm) = p
rdd = Postprocessor(rdd,ppStageName,ppAlgorithm,config)
})
rdd
}
/** * Main method. Implements body of variant caller. SparkContext and Hadoop Job are provided * by the ADAMSparkCommand shell. * * @param sc SparkContext for RDDs. * @param job Hadoop Job container for file I/O. */
def run(sc: SparkContext) {
log.info("Starting avocado...")
// load in reference from ADAM file
val reference: RDD[NucleotideContigFragment] = LoadContigs.time {
sc.loadSequence(args.referenceInput,fragmentLength = args.fragmentLength)
}
log.info("Loading reads in from " + args.readInput)
// load in reads from ADAM file
val reads: RDD[AlignmentRecord] = LoadReads.time {
Input(sc,args.readInput,reference,config)
}
/** *********add by xubo 20160528 *******************/
println("readInput:")
reads.foreach(println)
/** *********add by xubo 20160528 *******************/
// create stats/config item
val stats = new AvocadoConfigAndStats(sc,args.debug,reference)
/** *********add by xubo 20160528 *******************/
println("stats:" + stats)
println("stats.contigLengths:" + stats.contigLengths)
// println("stats.coverage:" + stats.coverage)
println("stats.debug:" + stats.debug)
println("stats.referenceObservations:" + stats.referenceObservations)
// println("stats.referenceSeq:" + stats.referenceSeq)
// stats.referenceSeq.foreach(println)
println("stats.samplesInDataset:" + stats.samplesInDataset)
stats.samplesInDataset.foreach(println)
println("stats.sequenceDict:" + stats.sequenceDict)
// println("stats.contigLengths:"+stats)
/** *********add by xubo 20160528 *******************/
// apply read translation steps
log.info("Processing reads.")
var cleanedReads = preProcessReads(reads)
/** *********add by xubo 20160528 *******************/
println("cleanedReads:" + cleanedReads)
println("cleanedReads.count:" + cleanedReads.count())
cleanedReads.foreach(println)
val cleanedReads2 = cleanedReads.map { each =>
each.setRecordGroupSample("hello")
each
}
// cleanedReads.adamGetReadGroupDictionary()
// cleanedReads.getSequenc
// cleanedReads.foreach(each => println(each.getSequence + " " + each.referenceLength))
println("cleanedReads2:")
cleanedReads2.foreach(println)
/** *********add by xubo 20160528 *******************/
// call variants on filtered reads and pileups
log.info("Calling variants.")
// val calledVariants = callVariants(cleanedReads,stats)
val calledVariants = callVariants(cleanedReads2,stats)
/** *********add by xubo 20160528 *******************/
println("calledVariants:" + calledVariants)
println("calledVariants.count:" + calledVariants.count())
// calledVariants.take(10).foreach(each=>println(each.databases+" "+each.position+" "+each.variant+" "=each.genotypes))
// calledVariants.take(10).foreach(println)
/** *********add by xubo 20160528 *******************/
// post process variants
log.info("Post-processing variants.")
val processedGenotypes: RDD[Genotype] = postProcessVariants(calledVariants,stats).flatMap(variantContext => variantContext.genotypes)
/** *********add by xubo 20160528 *******************/
println("processedGenotypes:" + calledVariants)
println("processedGenotypes.count:" + processedGenotypes.count())
processedGenotypes.foreach(println)
/** *********add by xubo 20160528 *******************/
// save variants to output file
log.info("Writing calls to disk.")
SaveVariants.time {
processedGenotypes.adamParquetSave(args.variantOutput,args.blockSize,args.pageSize,args.compressionCodec,args.disableDictionaryEncoding)
/** ***********add ***************/
processedGenotypes.foreach { each => println("processedGenotypes:" + each) }
// processedGenotypes.
}
}
}
(编辑:广西网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐


