Getting Started

The user code is always provided in the form of a plugin on nyssr.net. For this reason, we start our introduction with a project to create a JAR file. We use Gradle as our build tool and IDEA as our IDE.

Our plugin should send messages to any targets (message recipients) and nodes, similar to a ping. A message is sent to a target as a request, whereby an automatic response is sent back to us asynchronously from the target. As soon as we have received this response, we send another message. The plugin will basically generate load on the TCP channels between the nodes. We would therefore like to know how many messages are generated and sent in one second.

Please note that only one message at a time is on its way to a destination, either the request or the response.

Our plugin is designed so that you can easily operate it via the node's command line. You can add and delete pings and call up the current statistics. The commandline service is implemented as a nano service in the kernel.

The project tree

Here we can see our project tree.

Our project consists of two packages:

  • de.sillysky.nyssr.module: initialization of the JARS
  • de.sillysky.nyssr.impl.example001: the actual code of the plugin

The gradle files

The build file

plugins {
    id 'java'
}

group = 'de.sillysky'

// defined in gradle.properties
version = ssJarVersion

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(ssLanguageLevel)
    }
}

repositories {
    mavenCentral()

    flatDir {
        // Path for the library directory (with the kernel JAR)
        dirs 'c:/work/run/lib/'
    }
}

jar {
    archiveVersion = ssJarVersion
    manifest {
        attributes(
                "Manifest-Version": "1.0",
                "Implementation-Vendor": "sillysky software labs, Hohenwestedt, Germany",
                "Implementation-URL": "sillysky.net",
                "Implementation-Version": ssJarVersion,
                "Implementation-Title": "nyssr Example 001",
    
                // The entry point while loading the plugin
                "PlugIn-Class": "de.sillysky.nyssr.module.CModuleExample001", "nyssr.net"
        )
    }
}

dependencies {
    implementation "org.jetbrains:annotations:" + version_jetbrains_annotations
    implementation "com.googlecode.json-simple:json-simple:" + version_json_simple
    
    // the kernel JAR in the local file system
    implementation 'de.sillysky:NY_Kernel:2.4.0'
}

The property file

Some constants are defined in the gradle.properties file, including the language level, the version number of the JAR file and the version numbers of the libraries used.

ssLanguageLevel=23
ssJarVersion=2.4.0

# versions
version_jetbrains_annotations=26.0.1
version_json_simple=1.1.1

Initialization the plugin

When loading the plugin, the nyssr.net section of the JAR manifest is searched for the class for initializing the plugin. The class name can be found under the key PlugIn-Class. This class implements the IPlugin interface.

package de.sillysky.nyssr.module;

import de.sillysky.nyssr.impl.example001.CPackageImplExample001;
import de.sillysky.nyssr.plugin.IPlugIn;
import de.sillysky.nyssr.service.IServiceRegistry;
import de.sillysky.nyssr.util.version.CVersionHelper;
import org.jetbrains.annotations.NotNull;

import java.net.URL;

public class CModuleExample001 implements IPlugIn
{
    @Override
    public void startPlugin(@NotNull final URL aUrl,
                            @NotNull final ClassLoader aClassLoader,
                            @NotNull final IServiceRegistry aServiceRegistry)
    {
        // Print JAR version into the LOG
        CVersionHelper.printJarVersion(aUrl);
    
        // Initialize the package "de.sillysky.nyssr.impl.example001" by adding the ServiceStarter Interface
        // to the service registry.
        // The service registry will start the package if the dependencies of the package are fulfilled.
        aServiceRegistry.addStarter(new CPackageImplExample001());
    }
}

As we are in an asynchronous environment, we can only start our target once all dependencies have been fulfilled, i.e. all required services are available. We do not use classic dependency injection, as many dependencies are only fulfilled later by loading additional plugins. nyssr.net offers a service registry and the service starter principle, which is somewhat inspired by OSGi systems.

You can find more information in the tutorial package initialization.

The service starter

This class can be found in the package of our service.

  • It defines our dependencies
  • It ensures that the service is started when these are fulfilled
  • It ensures that the service is stopped when dependencies are removed
package de.sillysky.nyssr.impl.example001;

import de.sillysky.nyssr.namespace.INamespaceRegistry;
import de.sillysky.nyssr.service.IService;
import de.sillysky.nyssr.service.IServiceDependencyList;
import de.sillysky.nyssr.service.IServiceRegistry;
import de.sillysky.nyssr.service.IServiceStarter;
import org.jetbrains.annotations.NotNull;

public class CPackageImplExample001 implements IServiceStarter, IDependencies
{
    private IService mService;
    private INamespaceRegistry mNamespaceRegistry;

    @Override
    public void getDependencies(@NotNull final IServiceDependencyList aServiceDependencyList)
    {
        // Here we define the dependencies of our package
        // We need the namespace registry to register our target.
        aServiceDependencyList.add(INamespaceRegistry.class);
        // The command line interface is used to allow the user to add pings.
        aServiceDependencyList.add(ICommandLineInterface.class);
    }

    @Override
    public void start(@NotNull final IServiceRegistry aServiceRegistry) throws Exception
    {
        // The dependencies are fulfilled, so we start the package by creating our service
        if (mService == null)
        {
            mNamespaceRegistry = aServiceRegistry.getService(INamespaceRegistry.class);

            mService = new CServiceExample001(this);
            mService.activate(aServiceRegistry);
        }
    }

    @Override
    public void stop(@NotNull final IServiceRegistry aServiceRegistry) throws Exception
    {
    // The dependencies are no longer all fulfilled, so we stop the service
        if (mService != null)
        {
            mService.deactivate(aServiceRegistry);
            mService = null;
        }
    }

    @Override
    public @NotNull INamespaceRegistry getNamespaceRegistry()
    {
        return mNamespaceRegistry;
    }
}

The implementation of the service starter also provides a local interface via which the classes of the package can access the dependencies:

package de.sillysky.nyssr.impl.example001;


import de.sillysky.nyssr.namespace.INamespaceRegistry;
import org.jetbrains.annotations.NotNull;

interface IDependencies
{
    @NotNull INamespaceRegistry getNamespaceRegistry();
}

We only need to provide the namespace registry in the interface. We want to send and receive messages. To receive messages, we need a target, a class that processes messages. Targets are registered in namespaces, which have threads and message queues. Namespaces are registered in the namespace registry. So we get the namespace registry to be able to register our target.

The command line interface is merely a marker interface whose availability only signals that the service is available. We use the command line interface via messages.

The service

The constructor

class CServiceExample001 extends CTarget implements IService
{
    CServiceExample001(@NotNull final IDependencies aDependencies)
    {
        mDependencies = aDependencies;

        addMessageHandler(CRecordStartTarget.ID,
                          this::asyncStartTarget);
        addMessageHandler(CRecordCliHandle.ID,
                          this::asyncCliHandle);
        addMessageHandler(CRecordPing.ID,
                          this::asyncPing);
    }

    ...
}

Our service is a target, so we derive the class from CTarget. CTarget offers many conveniences, so we use it as the base class.

At the same time, we implement the service interface. This is only used to activate and deactivate the class. The methods are called by the service starter when the service is started or stopped.

The class is given the IDependencies interface by the Service Starter. We save these so that we can access our dependencies.

We also register three messages predefined in the kernel in the constructor. The CRecordStartTarget message is the first message that a target receives after registration. It is used for initialization and is already delivered in the thread that was selected during registration.

The second message is CRecordCliHandle, which is sent by the command line interface when our registered command is entered on the command line by the user.

The third message is CRecordPing, which we send to the addresses the user has given us.

The initialization

@Override
public void activate(@NotNull final IServiceRegistry aServiceRegistry) throws Exception
{
    // We fetch the namespace SYSTEM, which exists in every node.
    final INamespace namespace = mDependencies.getNamespaceRegistry()
                                              .getNamespace(CWellKnownNID.SYSTEM);
    assert namespace != null;
    
    // We register the target
    namespace.getTargetRegistry()
             .registerTarget(this);
}

@Override
public void deactivate(@NotNull final IServiceRegistry aServiceRegistry)
{
    // We deregister the target
    deregisterTarget();
}

These are the two methods for initializing and removing the target. They are called by the package starter.

The SYSTEM namespace is retrieved during initialization. Alternatively, we could get another namespace or create a new namespace. Our target is then registered in the target registry that each namespace has. Since we do not specify a queue ID, the main thread of the namespace is used. And since we do not specify a target ID, we are assigned a random target ID.

When the target is registered, the target receives an initial asynchronous message (CRecordStartTarget). With this message, the target receives an address that is unique throughout nyssr.net.

The first message

private boolean asyncStartTarget(@NotNull final CEnvelope aEnvelope,
                                 @NotNull final CRecord aRecord) throws CException
{
    if (aEnvelope.isAnswer())
    {
        return false;
    }
    else
    {
        // Register a command line handler
        registerCli();
        
        aEnvelope.setResultSuccess();
        return true;
    }
}

A message always consists of an envelope and a record. The envelope contains data and settings for sending the message. The payload and the message ID are in the record.

We have registered the message handler in the constructor. We can therefore process this message in a separate method.

Since the message could also be a reply (not in the case of this message, but in general), we first distinguish between response and message. We do not process the response, so we return false. If it is not a response, we return true. At the same time, we always set a result, here with the convenience method setResultSuccess.

The actual initialization takes place in the registerCli method in order to register our target as a command line handler.

private static final String PING = "ping";
private static final String HELP_PING = //
        "--target [targetAddress]               : ping a target\n" + //
                "    --nodes [node IDs separated by commas] : ping a target PING.SYSTEM.[nodeId].[local segment id]\n" + //
                "    -l                                     : list all currently active pings\n" + //
                "    --delete pingId                        : delete a ping\n" + //
                "    --delete *                             : delete all pings";
    
private void registerCli() throws CException
{
    final CEnvelope env = CEnvelope.forLocalNanoService(CWellKnownNID.SYSTEM);
    final CRecord record = CRecordCliAddHandler.create();
    CRecordCliAddHandler.setCommand(record,
                                    new String[]{
                                            PING
                                    });
    CRecordCliAddHandler.setHelp(record,
                                 new String[]{
                                         HELP_PING
                                 });
    sendNotification(env,
                     record);
}

Here we create a message to register a command line handler. This is a nanoservice that is registered in the SYSTEM namespace. We register the command “ping”. We also include a help message that is displayed when the user enters “help” on the command line.

The help message

The storage

We store the target addresses to which we send pings in a map in objects of the CPingEntry class.

package de.sillysky.nyssr.impl.example001;


import de.sillysky.nyssr.address.CTargetAddress;
import org.jetbrains.annotations.NotNull;

import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;

class CPingEntry
{
    private static final AtomicInteger IDS = new AtomicInteger(0);
    private final long mId;
    private final CTargetAddress mTargetAddress;
    private long mCounter = 0L;
    private final LocalTime mStartTime = LocalTime.now();

    CPingEntry(@NotNull final CTargetAddress aTargetAddress)
    {
        mId = IDS.incrementAndGet();
        mTargetAddress = aTargetAddress;
    }

    long getId()
    {
        return mId;
    }

    @NotNull CTargetAddress getTargetAddress()
    {
        return mTargetAddress;
    }

    long getCounter()
    {
        return mCounter;
    }

    @Override
    public boolean equals(final Object aO)
    {
        if (aO == null || getClass() != aO.getClass())
        {
            return false;
        }

        final CPingEntry that = (CPingEntry) aO;
        return mId == that.mId;
    }

    @Override
    public int hashCode()
    {
        return Long.hashCode(mId);
    }

    public void incCounter()
    {
        mCounter++;
    }

    public double getPingsPerHour()
    {
        LocalTime now = LocalTime.now();

        long seconds = ChronoUnit.SECONDS.between(mStartTime,
                                                  now);

        if (seconds == 0)
        {
            return 0.0;
        }

        double entriesPerSecond = (double) mCounter / seconds;

        return entriesPerSecond * 3600;
    }
}
// in the service
private final Map<Long, CPingEntry> mPingEntries = new HashMap<>();

As the individual ping requests are referenced again when they are deleted, we simply introduce a numerical ID for each data set, which is easier to enter than the target address.

We also save the start time of the ping, a counter and, of course, the target address of the ping.

The command line handler

If the user enters a “ping” command, we receive a message from which we can take the parameters as a string array. We process this array with the kernel-internal class CSimpleArgParser, which can process flags (-l) as well as options (--delete xxxx) with parameters.

private boolean asyncCliHandle(@NotNull final CEnvelope aEnvelope,
                               @NotNull final CRecord aRecord) throws CException
{
    if (aEnvelope.isAnswer())
    {
        return false;
    }
    else
    {
        final String command = CRecordCliHandle.getCommand(aRecord,
                                                           "");
        if (PING.equals(command))
        {
            final String[] arguments = CRecordCliHandle.getArguments(aRecord,
                                                                     null);
            if (arguments.length > 0)
            {
                final CSimpleArgParser p = new CSimpleArgParser(arguments);

                final String at = p.getOption("target");
                if (at != null)
                {
                    addTargetAddress(at);
                }
                final String nodes = p.getOption("nodes");
                if (nodes != null)
                {
                    final String[] nodeArr = nodes.split(",");
                    for (final String node : nodeArr)
                    {
                        addNodeId(node);
                    }
                }
                final boolean doList = p.hasFlag("l");
                if (doList)
                {
                    printList();
                }
                final String del = p.getOption("delete");
                if (del != null)
                {
                    if ("*".equals(del))
                    {
                        printList();
                        mPingEntries.clear();
                        System.out.println("All ping entries removed");
                    }
                    else
                    {
                        final long id = Long.parseLong(del);
                        final CPingEntry removed = mPingEntries.remove(id);
                        if (removed == null)
                        {
                            System.out.println("ID not found: " + id);
                        }
                        else
                        {
                            System.out.println("Ping entries removed:");
                            System.out.println(createDumpLine(removed));
                        }
                    }
                }
            }
        }
        aEnvelope.setResultSuccess();
        return true;
    }
}

Add a target address

Target addresses in string form look like this: 4.SYSTEM.sun.ss, i.e. TargetId.NamespaceId.NodeId.SegmentId. If a target address is known, it can be entered with the option “--target targetAddress”.

private void addTargetAddress(@NotNull final String aAt) throws CException
{
    final CTargetAddress targetAddress = CTargetAddress.fromString(aAt);
    if (targetAddress != null)
    {
        final CPingEntry pe = new CPingEntry(targetAddress);
        mPingEntries.put(pe.getId(),
                         pe);
        sendPing(pe);
    }
}

The target address is saved and a ping is immediately sent to the target.

Add a node ID

A node ID is easier to enter: ping --node sun,ceres,venus. Each node has a “ping target” with the address PING.SYSTEM.nodeId.segmentId. As several NodeIds can be entered in a list, we split this list into an array. We generate the address for each NodeId, save it and start the ping.

private void addNodeId(final String aNode) throws CException
{
    final CNodeId nodeId = new CNodeId(aNode);
    final CTargetAddress targetAddress = new CTargetAddress(CWellKnownTID.PING,
                                                            CWellKnownNID.SYSTEM,
                                                            nodeId,
                                                            CSegmentId.getLocal());
    final CPingEntry pe = new CPingEntry(targetAddress);
    mPingEntries.put(pe.getId(),
                     pe);
    System.out.println("Adding " + aNode + aNode + ", ID=" + pe.getId());
    sendPing(pe);
}

Print the pings

Current pings can be output with ping -l.

private void printList()
{
    System.out.printf("%8s %30s %8s %14s %14s%n",
                      "ID",
                      "address",
                      "pings",
                      "pingsPerHour",
                      "msgPerSecond");
    System.out.println("------------------------------------------------------------------------------");

    final StringBuilder sb = new StringBuilder();
    for (final CPingEntry pe : mPingEntries.values())
    {
        sb.append(createDumpLine(pe));
    }
    System.out.println(sb);
}

@NotNull
private String createDumpLine(@NotNull final CPingEntry aEntry)
{
    return String.format("%8s %30s %8s %,14.0f %,14.0f%n",
                         aEntry.getId(),
                         aEntry.getTargetAddress(),
                         aEntry.getCounter(),
                         aEntry.getPingsPerHour(),
                         aEntry.getPingsPerHour() / 1800);
}
ping commands in the console

Deleting the pings

Pings can be deleted individually (delete 1) or all at once (delete *). The code for this can be found in the command line handler.

Sending and receiving pings

The ping message must not be confused with the low-level pings for monitoring the TCP links between the nodes. Our ping message is a high-level message and is actually sent to individual targets. To simulate the message IO, a payload is also sent. A slot (= a data container) contains a long integer, which is taken from the message by the recipient and transferred to another slot. The response therefore contains two long integers.

Incidentally, each target processes the ping message, which is handled by the base class CTarget.

We add the ID of the ping entry to the message.

private void sendPing(@NotNull final CPingEntry aPingEntry) throws CException
{
    final CEnvelope env = CEnvelope.forSingleTarget(aPingEntry.getTargetAddress());
    final CRecord record = CRecordPing.create();
    CRecordPing.setLoop(record,
                        aPingEntry.getId());
    sendRequest(env,
                record);
}

The response to the ping is again caught in a message handler. We always check the response code first. If an error is entered there, we discard the ping data record. If everything is OK, we take the ID of the ping data record, search for the data record and increment its counter. The ping is then sent again.

private boolean asyncPing(@NotNull final CEnvelope aEnvelope,
                          @NotNull final CRecord aRecord) throws CException
{
    if (aEnvelope.isAnswer())
    {
        final long id = CRecordPing.getLoopMirror(aRecord,
                                                  0L);
        if (aEnvelope.getResultCode() == CResultCode.SUCCESS)
        {
            final CPingEntry pe = mPingEntries.get(id);
            if (pe != null)
            {
                pe.incCounter();
                sendPing(pe);
            }
        }
        else
        {
            mPingEntries.remove(id);
        }

        return true;
    }
    else
    {
        return false;
    }
}

Download

ZIP archive of the project (MD5: 92790e324ee9b598ee5f7e0bd7d5ad32)

The download contains the Example001 project. The build directory contains the finished JAR files for testing:

  • Example001-2.4.0-8.jar for language level 8
  • Example001-2.4.0-23.jar for language level 23

Test

You can copy a jar into the plugins directory of the nyssr.net download.

If you start our evaluation environment (see download), you will start the following nodes:

The evaluation environment

To load the plugin by the node mercury, modify the file mercury/config/plugins.cfg and add two lines:

# nyssr.net
preference.node.name = plugin.manager/plugins
preference.node.clear.first = true
preference.key.overwrite.existing = true

# payload
0010 = NY_NetworkPlugIn
0020 = NY_TcpPlugIn
0030 = NY_MicroServicePlugIn
0040 = NY_FileStorePlugIn
0050 = NY_LocalSoftwareUpdaterPlugIn
0060 = NY_RemoteSkinServerPlugIn
0100 = NY_AppUserManager
0110 = NY_AppWidgetShowCase
0120 = NY_AppMonitor

0201 = NY_CliPlugIn
0201 = Example001

The NY_CliPlugIn plugin should only appear once in the list of plugins. It takes care of the input and output of the command line (already included in the node moon).

Restart the node. You can check in the LOG whether the plugin has been loaded correctly. You can enter “help” on the command line. The output should contain the following lines:

The help message

You can start pings by entering the following lines:

ping --nodes sun,ceres,venus,moon
ping -l

Note

A message that is sent from one node to another node travels through the network layer of nyssr.net. The message is processed in several namespaces/threads before it reaches the other node via TCP. Here too, the message travels through the network stack.

If the nodes are started on the same machine, we can achieve a throughput of 25,000 messages per second (depending on the machine). Only one message is sent at a time during a ping, otherwise the throughput would be even higher. That's pretty efficient. The system scales very well on multi-core machines.

The following console output shows pings starting from moon to sun, ceres, venus and mercury. You can clearly see that more hops also require more time. Nevertheless, the throughput for 3 hops from moon to venus is over 4300 messages per second (with simultaneous pings to sun, mercury and venus), i.e. a quarter of a millisecond per message.

ping commands in the console

Here is a snapshot of the Windows Task Manager (German Windows 11, AMD Ryzen 9 5950X 16-Core Processor):

Task Manager

Conclusion

  • We have built a plugin (JAR with IPlugIn implementation).
  • We have created and initialized a package with a service starter.
  • We have sent, received and processed messages.
  • We have used a nanoservice (command line handler).
  • And we have created something useful.

Further instructions can be found in the tutorial section of the documentation.

If you have any questions or comments, please contact us via the forum. We look forward to your feedback!