Script cd_hit_para_CCRT_py
|
|
1
2
3
4 import os, sys
5 import argparse
6 import subprocess
7
8 __doc__="""
9 Launch cd-hit program in a parallel mode compatible with the CCRT architecture.
10
11 @requires: U{python<https://github.com/lh3/seqtk>} (tested with 2.7.5)
12 @requires: U{cd-hit<https://code.google.com/p/cdhit/downloads/list>} (tested with v4.5.8-2012-03-24)
13
14 Input : a FASTA file
15 Output : a clustering FASTA
16 """
17
19
20 parser = argparse.ArgumentParser(description='Launch cd-hit program in a parallel \
21 mode compatible with the CCRT architecture.')
22
23 parser.add_argument('-i', action="store", dest='fasta',
24 type=str, help='input FASTA file')
25
26 parser.add_argument('-n', action="store", dest='nbSplit',
27 type=int, default=10, help='number of split files generated (default:10)')
28
29 parser.add_argument('-A', action="store", dest='projid',
30 type=str, default=None, help='CCRT project/account name (default:None)')
31
32 parser.add_argument('-q', action="store", dest='queue',
33 type=str, default='large', help='Job Priority (default:large)')
34
35 parser.add_argument('-t', action="store", dest='cores',
36 type=int, default=1, help='Numbers of threads requested to each task (default:1)')
37
38 parser.add_argument('-o', action="store", dest='prefix',
39 type=str, help='output prefix')
40
41 parser.add_argument('-c', action="store", dest='clutserThreshold',
42 type=str, default='0.95', help='sequence identity threshold for clustering (default=0.95)')
43
44 parser.add_argument('-j', action="store", dest='jobDependency',
45 type=str, default=None, help='The submitted job will only be started when the jobs \
46 corresponding to the provided ids are terminated. (default=None)')
47
48 parser.add_argument('-aS', action="store", dest='minAlig',
49 type=str, default='0.0', help='alignment coverage for the shorter sequence (default:0.0) if set to 0.9,the alignment must covers 90 pourcent of the sequence')
50
51 return parser
52
53
55 """
56 launch the ccc_msub commande with a bash script as arguments and optionnally a job ID for the job dependency.
57 Then this function recovers and return the job ID generated.
58 @param filename: bash script file name
59 @type filename: string
60 @param jobDependency: put an "after" dependency on the set of jobs defined as the argument.\
61 The submitted job will only be started when the jobs corresponding to the provided ids are terminated.
62 @type jobDependency: string
63 @return: job ID
64 @rtype: string
65 """
66
67
68 if(jobDependency==None):
69 commandeLine = "ccc_msub " + filename
70 else:
71 commandeLine = "ccc_msub -a " + str(jobDependency) + " " + filename
72
73 process = subprocess.Popen(commandeLine,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
74 result=process.communicate()
75 print commandeLine
76
77 jobID = result[0].split()[3]
78 if("\n" in jobID):
79 jobID = jobID[0:len(jobID)-1]
80
81 return jobID
82
83
85 """
86 Create the job parser script header
87 @param filename: bash script file name
88 @type filename: string
89 @param cores: cores number reserved
90 @type cores: integer
91 @param task: max task number
92 @type task: integer
93 @param queue: requested queue
94 @type queue: string
95 """
96
97 file = open(filename, "wb")
98 name = filename.split(".")[0]
99 file.write("#!/bin/bash \n")
100 file.write("#MSUB -q " + queue + " \n")
101
102 file.write("#MSUB -n " + str(task) + " \n")
103 file.write("#MSUB -c " + str(cores) + " \n")
104 file.write("#MSUB -e " + name + ".e \n")
105 file.write("#MSUB -o " + name + ".o \n")
106 if(projid):
107 file.write("#MSUB -A " + Arguments.projid + " \n")
108 file.write("set -x \n")
109
110 file.close()
111
112
113
115
116 parser=get_parser()
117
118 if len(sys.argv)==1:
119 parser.print_help()
120 sys.exit(1)
121
122 Arguments=parser.parse_args()
123
124
125 jobFileName = Arguments.prefix + "_split.sh"
126 writeJobScript(jobFileName, 1, 1, Arguments.queue, Arguments.projid)
127 commandeLine = "cd-hit-div.pl " + Arguments.fasta + " " + Arguments.prefix + "_tmp0-part " + str(Arguments.nbSplit)
128 file = open(jobFileName, "ab")
129
130
131 file.write("if [ ! -f " + Arguments.fasta + " ]\nthen\n")
132 file.write("\tif [ ! -f errorProcess.txt ]\n\tthen\n")
133 file.write("\t\techo 'Error cd-hit-div: " + Arguments.fasta + " file not found' > errorProcess.txt\n")
134 file.write("\tfi\n")
135 file.write("\texit 1\n")
136 file.write("fi\n")
137
138 file.write(commandeLine)
139 file.close()
140
141 splitJobID = launchJobCCRT(jobFileName,Arguments.jobDependency)
142
143
144
145 jobDependency = splitJobID
146 for i in range(0,Arguments.nbSplit-1):
147
148 cdhitInputName = Arguments.prefix + "_tmp" + str(i) + "-part-"
149
150
151
152 jobFileName = Arguments.prefix + "pass" + str(i) + "_clustering.sh"
153 writeJobScript(jobFileName, Arguments.cores, 1, Arguments.queue, Arguments.projid)
154 cdhitOutputName = Arguments.prefix + "_clusterFinal-part-" + str(i)
155 commandeLine = "cd-hit -i " + cdhitInputName + str(i) + " -o " + cdhitOutputName + " -c " +\
156 Arguments.clutserThreshold + " -T " + str(Arguments.cores) + " -aS " + Arguments.minAlig
157 file = open(jobFileName, "ab")
158
159 file.write("if [ ! -f " + cdhitInputName + str(i) + " ]\nthen\n")
160 file.write("\tif [ ! -f errorProcess.txt ]\n\tthen\n")
161 file.write("\t\techo 'Error cd-hit: " + cdhitInputName + str(i) + " file not found' > errorProcess.txt\n")
162 file.write("\tfi\n")
163 file.write("\texit 1\n")
164 file.write("fi\n")
165
166 file.write(commandeLine)
167 file.close()
168 jobDependency = launchJobCCRT(jobFileName, jobDependency)
169
170
171
172 jobFileName = Arguments.prefix + "pass" + str(i) + "_cdhit2d.sh"
173 writeJobScript(jobFileName, Arguments.cores, Arguments.nbSplit-i-1, Arguments.queue, Arguments.projid)
174 newCdhitInputName = Arguments.prefix + "_tmp" + str(i+1) + "-part-"
175 file = open(jobFileName, "ab")
176
177 file.write("if [ ! -f " + cdhitInputName + str(i) + " ]\nthen\n")
178 file.write("\tif [ ! -f errorProcess.txt ]\n\tthen\n")
179 file.write("\t\techo 'Error cd-hit-est-2d: " + cdhitInputName + str(i) + " file not found' > errorProcess.txt\n")
180 file.write("\tfi\n")
181 file.write("\texit 1\n")
182 file.write("fi\n")
183
184 for j in range(i+1,Arguments.nbSplit):
185
186 commandeLine = "cd-hit-est-2d -i " + cdhitOutputName + " -i2 " + \
187 cdhitInputName + str(j) + " -o " + newCdhitInputName + str(j) + \
188 " -c " + Arguments.clutserThreshold + " -T " + str(Arguments.cores) + " -aS " + Arguments.minAlig + " &\n"
189 file.write(commandeLine)
190
191 file.write("wait")
192 file.close()
193 jobDependency = launchJobCCRT(jobFileName, jobDependency)
194 cdhitInputName = newCdhitInputName
195
196
197
198 jobFileName = Arguments.prefix + "pass" + str(Arguments.nbSplit-1) + "_clustering.sh"
199 writeJobScript(jobFileName, Arguments.cores, 1, Arguments.queue, Arguments.projid)
200 cdhitOutputName = Arguments.prefix + "_clusterFinal-part-" + str(Arguments.nbSplit-1)
201 commandeLine = "cd-hit -i " + cdhitInputName + str(Arguments.nbSplit-1) + " -o " + cdhitOutputName + " -c " +\
202 Arguments.clutserThreshold + " -T " + str(Arguments.cores) + " -aS " + Arguments.minAlig
203 file = open(jobFileName, "ab")
204
205 file.write("if [ ! -f " + cdhitInputName + str(Arguments.nbSplit-1) + " ]\nthen\n")
206 file.write("\tif [ ! -f errorProcess.txt ]\n\tthen\n")
207 file.write("\t\techo 'Error cd-hit: " + cdhitInputName + str(Arguments.nbSplit-1) + " file not found' > errorProcess.txt\n")
208 file.write("\tfi\n")
209 file.write("\texit 1\n")
210 file.write("fi\n")
211
212 file.write(commandeLine)
213 file.close()
214 jobDependency = launchJobCCRT(jobFileName, jobDependency)
215
216
217
218
219 jobFileName = Arguments.prefix + "_cd-hit_clean.sh"
220 writeJobScript(jobFileName, 1, 1, Arguments.queue, Arguments.projid)
221 file = open(jobFileName, "ab")
222
223 cleanCommandeLine1 = "rm " + Arguments.prefix + "*.clstr " + Arguments.prefix + "_tmp*part* " + Arguments.prefix + "*.e " + \
224 Arguments.prefix + "*.o " + Arguments.prefix + "*.sh \n"
225 file.write(cleanCommandeLine1)
226 concatCommandeLine = "cat " + Arguments.prefix + "_clusterFinal-part-* > " + Arguments.prefix + "_RefCluster.fasta \n"
227 file.write(concatCommandeLine)
228 cleanCommandeLine2 = "rm " + Arguments.prefix + "_clusterFinal-part-*"
229 file.write(cleanCommandeLine2)
230
231 file.close()
232 jobDependency = launchJobCCRT(jobFileName, jobDependency)
233
234 sys.exit(jobDependency)
235
236
237 if __name__ == "__main__":
238 main()
239