How to use the job engine for bulk tasks
The job machine is designed to run many jobs in parallel as fast as possible. In this tutorial we want to calculate the MD hash values for each file in a directory with many files.
The example is a bit longer because many tasks are presented here:
- The definition of dependencies
- The service starter
- The operation of a nano service
- The outsourcing of the request to a working target, in order that parallel requests can be processed
- The initialization of the job machine
- The job itself
- The orderly termination of the machine in case of success and in case of termination
The service target
To make the exercise more understandable, we register a nanoservice. Creating hashes of the files in a directory is thus possible by sending a message to this nanoservice.
The nanoservice is provided by a target. Depending on the size and number of files, generating the hashes may take some time. Therefore, the target delegates the work to another work target to avoid being blocked.
This is our service target.
class CFileHashCalculator extends CTarget implements IService { private final IDependencies mDependencies; CFileHashCalculator(@NotNull final IDependencies aDependencies) { mDependencies = aDependencies; addMessageHandler(CRecordStartTarget.ID, this::asyncStartTarget); addMessageHandler(CRecordHashMyDir.ID, this::asyncHashMyDir); } @Override public void activate(@NotNull final IServiceRegistry aServiceRegistry) throws Exception { // This namespace will always exist 1 final INamespace namespace = mDependencies.getNamespaceFactory() .getNamespace(CWellKnownNID.SYSTEM); assert namespace != null; namespace.getTargetRegistry() .registerTarget(this); } @Override public void deactivate(@NotNull final IServiceRegistry aServiceRegistry) throws Exception { // Remove nano service CRecordHashMyDir from system namespace. // This namespace will always exist. // Will be done by system automatically, but is shown here for completeness. 5 final INamespace system = mDependencies.getNamespaceFactory() .getNamespace(CWellKnownNID.SYSTEM); assert system != null; system.getNanoServiceRegistry() .removeObserver(CRecordHashMyDir.class, getAddress()); deregisterTarget(); } private boolean asyncStartTarget(@NotNull final CEnvelope aEnvelope, @NotNull final CRecord aRecord) throws CException { if (aEnvelope.isAnswer()) { return false; } else { // add nano service CRecordHashMyDir to system namespace 2 final INamespace system = mDependencies.getNamespaceFactory() .getNamespace(CWellKnownNID.SYSTEM); assert system != null; system.getNanoServiceRegistry() .addNanoServiceAndObserver(CRecordHashMyDir.class, getAddress(), false); aEnvelope.setResultSuccess(); return true; } } private boolean asyncHashMyDir(@NotNull final CEnvelope aEnvelope, @NotNull final CRecord aRecord) throws CException { if (aEnvelope.isAnswer()) { return false; } else { 3 // get path name of directory from message final String pathname = CRecordHashMyDir.getPath(aRecord, null); if (pathname == null) { // error: send back aEnvelope.setResult(CResultCode.INVALID_ARGUMENT, "path is null"); } else { final Path path = Paths.get(pathname); final File dir = path.toFile(); if (!dir.exists() || !dir.isDirectory()) { // error: send back aEnvelope.setResult(CResultCode.NO_DIRECTORY, "no directory"); } else { // Each request is processed by an additional working target 4 final CWorkingTarget tgt = new CWorkingTarget(mDependencies, aEnvelope, aRecord); getTargetRegistry().registerTarget(tgt); } } return true; } } }
- 1 Registration of the target in the activate() method
- 2 Registration of the nano service with observer in the namespace SYSTEM
- 3 Parsing of the
CRecordHashMyDir
message - 4 Creation and registration of the working target
- 5 Deregistration of the observer (not required)
The working target
The work target is responsible for processing the request. It creates the job engine as well as the threads of the engine. Then it parses the subdirectory and creates a job for each file, which it passes to the engine.
class CWorkingTarget extends CTarget { private static final String NAME = "HashCalculator"; private final IDependencies mDependencies; private final CEnvelope mEnvelope; private final CRecord mRecord; private final Map<String, String> mResult = new HashMap<>(); private IJobEngine mEngine; CWorkingTarget(@NotNull final IDependencies aDependencies, @NotNull final CEnvelope aEnvelope, @NotNull final CRecord aRecord) { mDependencies = aDependencies; 1 mEnvelope = aEnvelope; mRecord = aRecord; mEnvelope.setBlocked(true); addMessageHandler(CRecordStartTarget.ID, this::asyncStartTarget); addMessageHandler(CRecordSetHash.ID, this::asyncSetHash); addMessageHandler(CRecordNoMoreJobs.ID, this::asyncNoMoreJobs); addMessageHandler(CRecordDismiss.ID, this::asyncDismiss); } private boolean asyncStartTarget(@NotNull final CEnvelope aEnvelope, @NotNull final CRecord aRecord) throws CException { if (aEnvelope.isAnswer()) { return false; } else { // create job engine 2 final String path = CRecordHashMyDir.getPath(mRecord, null); mEngine = mDependencies.getJobEngineFactory() .createEngine(NAME); mEngine.setOwner(getAddress()); for (int i = 0; i < 10; i++) { mEngine.appendThread("_" + i, EThreadPriority.LOW, 1); } mEngine.start(); addJobs(Paths.get(path)); aEnvelope.setResultSuccess(); return true; } } private boolean asyncSetHash(@NotNull final CEnvelope aEnvelope, @NotNull final CRecord aRecord) { if (aEnvelope.isAnswer()) { return false; } else { // from a job target: path and hash for a file 4 final String path = CRecordSetHash.getPath(aRecord, null); final String hash = CRecordSetHash.getHash(aRecord, null); if (path != null && hash != null) { mResult.put(path, hash); } return true; } } private boolean asyncNoMoreJobs(@NotNull final CEnvelope aEnvelope, @NotNull final CRecord aRecord) throws CException { if (aEnvelope.isAnswer()) { return false; } else { // finish with success 5 finish(CResultCode.SUCCESS, ""); return true; } } private boolean asyncDismiss(@NotNull final CEnvelope aEnvelope, @NotNull final CRecord aRecord) throws CException { if (aEnvelope.isAnswer()) { return false; } else { // Finish with cancel 6 finish(CResultCode.CANCEL, "Dismissed"); return true; } } private void addJobs(final Path incomingPath) { final CTargetAddress address = getAddress(); try { Files.walkFileTree(incomingPath, new FileVisitor<Path>() { @Override public FileVisitResult preVisitDirectory(final Path aDir, final BasicFileAttributes attrs) { return FileVisitResult.CONTINUE; } @Override public FileVisitResult visitFile(final Path aPath, final BasicFileAttributes attrs) { // Create and add job 3 final IJob job = new CJobHashFile(aPath, address); mEngine.appendJob(job); return FileVisitResult.CONTINUE; } @Override public FileVisitResult visitFileFailed(final Path file, final IOException exc) { return FileVisitResult.CONTINUE; } @Override public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) { return FileVisitResult.CONTINUE; } }); } catch (final IOException aE) { aE.printStackTrace(); } } private void finish(final int aCode, final String aText) throws CException { // dismiss engine 7 if (mEngine != null) { mEngine.dismiss(); } // Enter the data in the request 8 if (aCode == CResultCode.SUCCESS && mResult != null) { // create two string arrays, the first with path entries, the second with hashes final int size = mResult.size(); final String[] paths = new String[size]; final String[] hashes = new String[size]; int index = 0; for (final Map.Entry<String, String> entry : mResult.entrySet()) { paths[index] = entry.getKey(); hashes[index] = entry.getValue(); index++; } CRecordHashMyDir.setPathArray(mRecord, paths); CRecordHashMyDir.setMd5Array(mRecord, hashes); } // send results back to requester 9 mEnvelope.setResult(aCode, aText); mEnvelope.setBlocked(false); getMessageSender().sendBack(mEnvelope, mRecord); // The work is finished 10 deregisterTarget(); } }
- 1 The envelope and the record of the original request are saved. The message is blocked so that it is not automatically returned to the client by the system. We do this manually after the job.
- 2 The job machine is created. 10 threads are added, each with priority LOW. The machine is started. Then the jobs are added. Attention: The priority of the jobs must match that of the threads.
- 3 The directory is searched recursively for files. For each regular file found, a job is created and added to the engine. In addition to the path of the file, the address of the working target is also passed to the job so that the job can transmit the hash after it has been calculated.
- 4 A job has sent us a hash for a file via message.
- 5 There are no more jobs in process. We finish the job machine and send the result back.
- 6 We take a CRecordDismiss message as a cause to abort processing.
- 7 The engine will be terminated.
- 8 The hash data is packed into the message.
- 9 The message is returned manually.
- 10 The working target is terminated.
The job target
The actual job class is derived from the abstract class CAbstractJob. The structure is simple. The work is done in the handler for the start message. Internally the thread is used in which the target was registered by the machine.
class CJobHashFile extends CAbstractJob { private static final ILogger LOG = CLoggerFactory.getLogger(CJobHashFile.class); private final Path mPath; private final CTargetAddress mParent; CJobHashFile(@NotNull final Path aPath, @NotNull final CTargetAddress aParent) { mPath = aPath; mParent = aParent; addMessageHandler(CRecordStartTarget.ID, this::asyncStartTarget); } @Override @NotNull public String getJobName() { return mPath.toString(); } @Override public @NotNull EThreadPriority getPriority() { // There must be threads in the engine with the same priority! return EThreadPriority.LOW; } private boolean asyncStartTarget(@NotNull final CEnvelope aEnvelope, @NotNull final CRecord aRecord) throws CException { if (aEnvelope.isAnswer()) { return false; } else { LOG.info("Create hash for {}", mPath); // hash the file 1 final File file = mPath.toFile(); final CMd5 md5 = CUtilMd5.calculate(file, null); if (md5 != null) { // send file hash to working target final String hash = md5.toBase64(); 2 final CEnvelope env = CEnvelope.forSingleTarget(mParent); final CRecord record = CRecordSetHash.create(); CRecordSetHash.setPath(record, mPath.toString()); CRecordSetHash.setHash(record, hash); sendNotification(env, record); } 3 super.notifyJobStopped(); aEnvelope.setResultSuccess(); return true; } } }
- 1 By means of an internal auxiliary class of the kernel the hash value is determined and encoded into a base64 string.
- 2 Then the calculated value is sent to the working target.
- 3 The end of the job is signaled. The target is not terminated, because this is done by the machine.
The dependencies interface
As always, the
interface IDependencies { // Used for registration of the target INamespaceFactory getNamespaceFactory(); // The job engine service factory IJobEngineFactory getJobEngineFactory(); }
The service starter
The service starter starts the package. It is created in the module handler of the plugin.
public class CPackageHashCalculator implements IServiceStarter, IDependencies { private IService mService; private INamespaceFactory mNamespaceFactory; private IJobEngineFactory mJobEngineFactory; @Override public void getDependencies(@NotNull final IServiceDependencyList aDependencyList) { 1 aDependencyList.add(INamespaceFactory.class); aDependencyList.add(IJobEngineFactory.class); } @Override public void start(@NotNull final IServiceRegistry aServiceRegistry) throws Exception { if (mService == null) { 2 mNamespaceFactory = aServiceRegistry.getServiceOrThrow(INamespaceFactory.class); mJobEngineFactory = aServiceRegistry.getServiceOrThrow(IJobEngineFactory.class); 3 mService = new CFileHashCalculator(this); mService.activate(aServiceRegistry); } } @Override public void stop(@NotNull final IServiceRegistry aServiceRegistry) throws Exception { if (mService != null) { 4 mService.deactivate(aServiceRegistry); mService = null; } } @Override public INamespaceFactory getNamespaceFactory() { return mNamespaceFactory; } @Override public IJobEngineFactory getJobEngineFactory() { return mJobEngineFactory; } }
- 1 Registration of dependencies.
- 2 Fetch dependencies.
- 3 Start of the service.
- 4 Terminate the service.
The messages
CRecordHashMyDir
This is the nanoservice sent to the service by a requester.
{ "id": "907df7c7-c435-45ed-a0c4-09ff0f7f6102", "name": "HASH_MY_DIR", "isService": "true", "namespaces": "SYSTEM", "description": "Create MD5 hashes for all files in a directory", "slots": [ { "key": "path", "name": "PATH", "direction": "REQUEST", "mandatory": "true", "type": "STRING", "description": "The path of the directory to hash." }, { "key": "path-arr", "name": "PATH_ARRAY", "direction": "ANSWER", "mandatory": "false", "type": "STRING_ARRAY", "description": "The paths of all files." }, { "key": "md5-arr", "name": "MD5_ARRAY", "direction": "ANSWER", "mandatory": "false", "type": "STRING_ARRAY", "description": "The hashes of all files base64 encoded." } ] }
CRecordSetHash
This message is sent by each job to the working target after finishing its work.
{ "id": "67f918bb-673a-409c-a09a-519fcd47c33b", "name": "SET_HASH", "isService": "false", "namespaces": "", "description": "Set the base64 encoded hash value for a file.", "slots": [ { "key": "1", "name": "PATH", "direction": "REQUEST", "mandatory": "true", "type": "STRING", "description": "The path of the file." }, { "key": "2", "name": "HASH", "direction": "REQUEST", "mandatory": "true", "type": "STRING", "description": "The base64 encoded hash value of the file." } ] }