Skip to content

Commit f3178a1

Browse files
authored
Merge pull request #1833 from Caltech-IPAC/FIREFLY-1829-job-cancel
FIREFLY-1829: Fully support Job ABORT
2 parents bdfe812 + 54fccde commit f3178a1

File tree

13 files changed

+119
-61
lines changed

13 files changed

+119
-61
lines changed

src/firefly/java/edu/caltech/ipac/firefly/api/Async.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ private static void updateJob(HttpServletResponse res, SrvParam params, String j
156156
private static void updateJobPhase(HttpServletRequest req, HttpServletResponse res, String jobId) throws Exception {
157157
String phase = req.getParameter("PHASE");
158158
if (String.valueOf(phase).equals("ABORT")) {
159-
JobInfo fi = JobManager.abort(jobId, "Abort by user");
159+
JobInfo fi = JobManager.abort(jobId, null);
160160
sendResponse(JobUtil.toJson(fi), res);
161161
}
162162
}

src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.function.Consumer;
1111

1212
import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotNull;
13+
import static edu.caltech.ipac.firefly.core.background.JobManager.sendUpdate;
1314
import static edu.caltech.ipac.firefly.core.background.JobManager.updateJobInfo;
1415
import static edu.caltech.ipac.firefly.server.util.QueryUtil.combineErrorMsg;
1516
import static java.util.Optional.ofNullable;
@@ -30,7 +31,7 @@ enum Type {SEARCH, UWS, TAP, PACKAGE, SCRIPT}
3031

3132
Worker getWorker();
3233

33-
void setWorker(Worker worker);
34+
void onStart(Worker worker);
3435

3536
void setParams(SrvParam params);
3637

@@ -42,46 +43,44 @@ enum Type {SEARCH, UWS, TAP, PACKAGE, SCRIPT}
4243

4344
String run() throws Exception;
4445

45-
default boolean shouldUpdate() {
46-
boolean isSelfManaged = ofNullable(getWorker()).map(Worker::isSelfManaged).orElse(false);
47-
return !isSelfManaged;
48-
}
49-
50-
default void updateJobStatus(Consumer<JobInfo> apply) {
51-
if (shouldUpdate()) updateJobInfo(getJobId(), apply);
52-
}
53-
54-
default void sendJobStatus(Consumer<JobInfo> apply) {
55-
if (shouldUpdate()) JobManager.sendUpdate(getJobId(), apply);
46+
default void updateManagedStatus(Consumer<JobInfo> func) {
47+
if (getWorker() != null && !getWorker().isSelfManaged()) {
48+
updateJobInfo(getJobId(), func);
49+
}
5650
}
5751

5852
default String call() {
59-
60-
sendJobStatus(ji -> {
61-
ji.setPhase(JobInfo.Phase.EXECUTING);
62-
ji.getMeta().setProgress(10);
63-
ji.setStartTime(Instant.now());
64-
});
6553
try {
54+
updateJobInfo(getJobId(), ji -> {
55+
ji.setStartTime(Instant.now());
56+
ji.getMeta().setProgress(0);
57+
});
6658
String results = run();
67-
updateJobStatus(ji -> {
59+
// the worker is set in onStart().
60+
getWorker().onComplete();
61+
if (Thread.currentThread().isInterrupted()) throw new InterruptedException("Job was aborted");
62+
updateManagedStatus(ji -> {
6863
ji.setPhase(JobInfo.Phase.COMPLETED);
6964
ji.getMeta().setProgress(100, "");
7065
});
7166
return results;
7267
} catch (InterruptedException | DataAccessException.Aborted e) {
73-
updateJobStatus(ji -> ji.setPhase(JobInfo.Phase.ABORTED));
68+
updateJobInfo(getJobId(), ji -> {
69+
ji.setPhase(JobInfo.Phase.ABORTED);
70+
ji.getMeta().setProgress(100, "Job was aborted");
71+
});
7472
getWorker().onAbort();
7573
} catch (Exception e) {
76-
String msg = combineErrorMsg(e.getMessage(), e.getCause() == null ? null : e.getCause().getMessage());
77-
updateJobStatus(ji -> ji.setError(new JobInfo.Error(500, msg)));
74+
updateManagedStatus(ji -> {
75+
String msg = combineErrorMsg(e.getMessage(), e.getCause() == null ? null : e.getCause().getMessage());
76+
ji.setError(new JobInfo.Error(500, msg));
77+
});
7878
Logger.getLogger().error(e);
7979
} finally {
80-
sendJobStatus(ji -> {
80+
sendUpdate(getJobId(), ji -> {
8181
ji.setEndTime(Instant.now());
8282
ji.getMeta().setProgress(100);
8383
});
84-
getWorker().onComplete();
8584
}
8685
return null;
8786
}
@@ -118,7 +117,7 @@ default void onComplete() {}
118117
*/
119118
default void sendJobUpdate(Consumer<JobInfo> func) {
120119
ifNotNull(getJob()).apply(j -> {
121-
JobManager.sendUpdate(j.getJobId(), func);
120+
sendUpdate(j.getJobId(), func);
122121
});
123122
}
124123

src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121

2222

2323
/**
24-
* Value object containing information of a Job
25-
*
24+
* Represents a Job value object persisted in Redis as JSON string.
25+
* Modifying this data structure may require additional logic in the JobManager
26+
* to migrate previously persisted data.
2627
* Date: 9/29/21
2728
*
2829
* @author loi

src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,22 @@
3737
import java.util.Arrays;
3838
import java.util.Comparator;
3939
import java.util.HashMap;
40+
import java.util.HashSet;
4041
import java.util.List;
4142
import java.util.Objects;
43+
import java.util.Set;
4244
import java.util.concurrent.*;
4345
import java.util.function.Consumer;
4446
import java.util.stream.Collectors;
4547
import java.util.stream.Stream;
4648

4749
import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotEmpty;
4850
import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotNull;
51+
import static edu.caltech.ipac.firefly.core.background.Job.Type.UWS;
4952
import static edu.caltech.ipac.firefly.core.background.JobInfo.*;
5053
import static edu.caltech.ipac.firefly.core.background.JobUtil.*;
5154
import static edu.caltech.ipac.firefly.data.ServerParams.EMAIL;
55+
import static edu.caltech.ipac.firefly.server.query.UwsJobProcessor.getUwsJobInfo;
5256
import static edu.caltech.ipac.util.StringUtils.isEmpty;
5357
import static edu.caltech.ipac.firefly.core.background.Job.Type.PACKAGE;
5458
import static edu.caltech.ipac.firefly.core.background.JobInfo.Phase.*;
@@ -120,9 +124,26 @@ public static void init() {
120124
*/
121125
public static List<JobInfo> list() {
122126
List<JobInfo> userJobs = JobManager.getUserJobs(); // Ensure getUserJobs() is called only once, since it may be expensive. List is updated and returned at the end.
127+
Set<String> importedJobIds = new HashSet<>();
123128
UWS_HISTORY_SVCS.forEach(svc -> {
124-
Try.it(() -> importJobHistories(svc, userJobs)).getOrElse(LOG::error);
129+
Set<String> ids = Try.it(() -> importJobHistories(svc, userJobs)).getOrElse(LOG::error);
130+
if (ids != null) importedJobIds.addAll(ids);
125131
});
132+
// update all userJobs with active status
133+
userJobs.forEach(ji -> {
134+
if (ji.getMeta().getType() == UWS &&
135+
isActive(ji) &&
136+
!importedJobIds.contains(ji.getMeta().getJobId())) {
137+
JobInfo uws = Try.it(() -> getUwsJobInfo(ji.getAux().getJobUrl())).get();
138+
if (uws == null) {
139+
LOG.debug("Job no longer exists:" + ji.getAux().getJobUrl());
140+
ji.setError(new JobInfo.Error(404, "Job no longer exists"));
141+
} else {
142+
mergeJobInfo(ji, uws, null, null);
143+
}
144+
}
145+
});
146+
126147
return userJobs;
127148
}
128149

@@ -164,7 +185,7 @@ public static JobInfo submit(Job job) {
164185

165186
public static JobInfo abort(String jobId, String reason) {
166187
JobInfo info = updateJobInfo(jobId, (ji) -> {
167-
ji.setError(new JobInfo.Error(410, reason));
188+
if (reason != null) ji.setError(new JobInfo.Error(500, reason));
168189
ji.setPhase(ABORTED);
169190
});
170191
if (info != null) {
@@ -476,9 +497,8 @@ public void onMessage(Message msg) {
476497
JobEvent.EventType type = Try.it(() -> JobEvent.EventType.valueOf(msg.getValue(null, JobEvent.TYPE))).get();
477498
if (type == JobEvent.EventType.ABORTED) {
478499
removeLocalJob(jobInfo);
479-
} else {
480-
updateClient(jobInfo); // update jobInfo to client
481500
}
501+
updateClient(jobInfo); // update jobInfo to client
482502
});
483503
}
484504
}

src/firefly/java/edu/caltech/ipac/firefly/core/background/JobUtil.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.HashMap;
22+
import java.util.HashSet;
2223
import java.util.List;
2324
import java.util.Map;
25+
import java.util.Set;
2426
import java.util.stream.Collectors;
2527

2628
import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotNull;
@@ -93,16 +95,16 @@ public static String toJson(JobInfo info) {
9395
* Import job histories from the given service URL.
9496
* @param svcDef the service to import job histories from
9597
* @param userJobs
96-
* @return the number of job histories imported
98+
* @return the set of job IDs that were imported
9799
*/
98-
public static int importJobHistories(String svcDef, List<JobInfo> userJobs) {
100+
public static Set<String> importJobHistories(String svcDef, List<JobInfo> userJobs) {
99101
int count = 0;
100102

101103
String[] svcParts = ifNotNull(svcDef).getOrElse("").split("\\|", 3);
102104
String url = svcParts[0].trim();
103105
String svcId = svcParts.length > 1 ? svcParts[1].trim() : null;
104106
String svcType = svcParts.length > 2 ? svcParts[2].trim() : null;
105-
if (url.isEmpty()) return count;
107+
if (url.isEmpty()) return Set.of();
106108

107109
URL urlObs= Try.it(() -> new URI(url).toURL()).getOrElse((URL)null);
108110
String paramStr= urlObs == null ? "" : urlObs.getQuery();
@@ -121,12 +123,12 @@ public static int importJobHistories(String svcDef, List<JobInfo> userJobs) {
121123
});
122124
return HttpServices.Status.ok();
123125
});
124-
if (jobList.get() == null || jobList.get().isEmpty()) return count;
126+
if (jobList.get() == null || jobList.get().isEmpty()) return Set.of();
125127

126128
// remove jobs with no URL or runId in the ignore list
127129
boolean hasBadJobs = jobList.get().removeIf(j -> j.getAux().getJobUrl() == null || runIdIgnoreList.contains(String.valueOf(j.getRunId())));
128130
if (hasBadJobs) LOG.debug("Some jobs with no URL or ignored runId were removed from list");
129-
131+
HashSet<String> importedIds = new HashSet<>();
130132
for (JobInfo job : jobList.get()) {
131133
JobInfo jobInfo = findJobInfo(job.getJobId(), userJobs);
132134
if (jobInfo == null || isActive(jobInfo)) {
@@ -138,14 +140,15 @@ public static int importJobHistories(String svcDef, List<JobInfo> userJobs) {
138140
count++;
139141
mergeJobInfo(jobInfo, uws, svcId, svcType);
140142
if (jobInfo == null ) userJobs.add(uws); // update passed in userJobs; to avoid having to call getUserJobs, which can be expensive
143+
importedIds.add(uws.getJobId());
141144
LOG.trace("Job added jobUrl=%s jobId=%s".formatted(job.getAux().getJobUrl(),uws.getJobId()));
142145
}
143146
} else {
144147
LOG.debug("Job %s is already completed, skipping".formatted(job.getJobId()));
145148
}
146149
}
147150
LOG.debug("%d job histories imported".formatted(count));
148-
return count;
151+
return importedIds; // return imported job IDs to the caller to avoid having to call the uws service again
149152
}
150153

151154
public static JobInfo mergeJobInfo(JobInfo local, JobInfo uws, String svcId, String svcType) {
@@ -154,7 +157,7 @@ public static JobInfo mergeJobInfo(JobInfo local, JobInfo uws, String svcId, Str
154157
ji.copyFrom(uws);
155158
Job.Type type = Try.it(() -> Job.Type.valueOf(svcType)).getOrElse(Job.Type.UWS);
156159
ji.getMeta().setType(type);
157-
ji.getMeta().setSvcId(svcId);
160+
if (svcId != null ) ji.getMeta().setSvcId(svcId);
158161
});
159162
}
160163

src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import edu.caltech.ipac.firefly.server.ServerContext;
1010
import edu.caltech.ipac.firefly.server.SrvParam;
1111

12+
import java.time.Instant;
1213
import java.util.Map;
1314

1415
import static edu.caltech.ipac.firefly.core.background.JobManager.*;
@@ -65,11 +66,15 @@ public SrvParam getParams() {
6566
return params;
6667
}
6768

68-
public void setWorker(Worker worker) {
69-
if (jobId != null) {
69+
public void onStart(Worker worker) {
70+
if (jobId != null || worker == null) { // a job must have a jobId and worker
7071
this.worker = worker;
7172
worker.setJob(this);
72-
sendUpdate(getJobId(), ji -> { // needs to update clients, because these values may change after the job has submitted
73+
updateManagedStatus(ji -> { // set these only if it's not a self-managed job
74+
ji.setPhase(JobInfo.Phase.EXECUTING);
75+
ji.getMeta().setProgress(10);
76+
});
77+
sendUpdate(jobId, ji -> { // needs to update clients, because these values may change after the job has submitted
7378
ji.getMeta().setType(worker.getType());
7479
ji.getAux().setTitle(worker.getLabel());
7580
ji.getMeta().setSvcId(worker.getSvcId());

src/firefly/java/edu/caltech/ipac/firefly/server/AppServerCommands.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public static class InitApp extends ServCommand {
4141

4242
public String doCommand(SrvParam params) throws Exception {
4343
String spaName = params.getRequired(SPA_NAME);
44+
ServerContext.getRequestOwner().extendUserKeyExpiry();
4445

4546
// check for alerts
4647
AlertsMonitor.checkAlerts(true);

src/firefly/java/edu/caltech/ipac/firefly/server/RequestOwner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020
import java.util.UUID;
2121

22+
import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotEmpty;
2223
import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotNull;
2324
import static edu.caltech.ipac.util.StringUtils.isEmpty;
2425
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -38,7 +39,7 @@ public class RequestOwner implements Cloneable {
3839

3940
private static final Logger.LoggerImpl LOG = Logger.getLogger();
4041
public static String USER_KEY = "usrkey";
41-
public static int USER_KEY_EXPIRY = AppProperties.getIntProperty("userkey.expiry", 3600 * 24 * 7 * 2); // 2 weeks
42+
public static int USER_KEY_EXPIRY = AppProperties.getIntProperty("userkey.expiry", 3600 * 24 * 365); // 1 year
4243
public static final String SET_USERINFO_ACTION = "app_data.setUserInfo";
4344
private static boolean ignoreAuth = AppProperties.getBooleanProperty("ignore.auth", false);
4445
private RequestAgent requestAgent;
@@ -100,6 +101,10 @@ private void id(RequestAgent ra) {
100101
}
101102
}
102103

104+
public void extendUserKeyExpiry() {
105+
ifNotEmpty(getUserKeyFromClient()).apply(this::updateClientUserKey);
106+
}
107+
103108

104109
public RequestAgent getRequestAgent() {
105110
return requestAgent;

src/firefly/java/edu/caltech/ipac/firefly/server/network/HttpServices.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public static Status postData(HttpServiceInput input, Handler handler) {
193193
try {
194194
String url = input.getRequestUrl();
195195
if (isEmpty(url)) throw new FileNotFoundException("Missing URL parameter");
196-
input.setFollowRedirect(false); // post are not allowed to follow redirect
196+
input.setFollowRedirect(false); // httpclient 3.x, post are not allowed to follow redirect
197197
return executeMethod(new PostMethod(url), input, handler);
198198
} catch (Exception e) {
199199
return new Status(400, e.getMessage());
@@ -232,11 +232,11 @@ public static Status executeMethod(HttpMethod method, HttpServiceInput input, Ha
232232
try {
233233
input = input == null ? new HttpServiceInput() : input;
234234

235-
method.setRequestHeader("Connection", "close"); // request server to NOT keep-alive.. we don't plan to reuse this connection.
235+
method.setRequestHeader("Connection", "close"); // request server to NOT keep-alive. we don't plan to reuse this connection.
236236
method.setRequestHeader("User-Agent", VersionUtil.getUserAgentString());
237237
method.setRequestHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
238238
if (method instanceof GetMethod) {
239-
method.setFollowRedirects(input.isFollowRedirect()); // post are not allowed to follow redirect
239+
method.setFollowRedirects(input.isFollowRedirect()); // httpclient 3.x, post are not allowed to follow redirect
240240
}
241241

242242
HttpClient httpClient = newHttpClient();

src/firefly/java/edu/caltech/ipac/firefly/server/query/SearchServerCommands.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public String doCommand(SrvParam params) throws Exception {
8383
String format = params.getOptional(FORMAT, "json");
8484

8585
SearchProcessor<?> processor = SearchManager.getProcessor(tsr.getRequestId());
86-
if (processor instanceof Job.Worker) setWorker((Job.Worker)processor);
86+
onStart(processor instanceof Job.Worker ? (Worker) processor : null);
8787

8888
if (format.toLowerCase().contains("votable")) {
8989
ByteArrayOutputStream rval = new ByteArrayOutputStream();
@@ -275,7 +275,7 @@ public static class PackageRequest extends ServCmdJob {
275275

276276
public String doCommand(SrvParam params) throws Exception {
277277
PackagingWorker worker = new PackagingWorker();
278-
setWorker(worker);
278+
onStart(worker);
279279
return worker.doCommand(params);
280280
}
281281
}
@@ -286,7 +286,7 @@ public static class DownloadScriptRequest extends ServCmdJob {
286286

287287
public String doCommand(SrvParam params) throws Exception {
288288
DownloadScriptWorker worker = new DownloadScriptWorker();
289-
setWorker(worker);
289+
onStart(worker);
290290
return worker.doCommand(params);
291291
}
292292
}
@@ -314,7 +314,7 @@ public static class Cancel extends ServCommand {
314314

315315
public String doCommand(SrvParam params) throws Exception {
316316
String jobId = params.getRequired(JOB_ID);
317-
JobInfo info = JobManager.abort(jobId, "Aborted by user");
317+
JobInfo info = JobManager.abort(jobId, null);
318318
return JobUtil.toJson(info);
319319
}
320320
}

0 commit comments

Comments
 (0)