Dr. Hartmut Schorrig, www.vishia.org, 2020-11-13

1. Approach

Processors in Embedded often run algorithm in very short sampling time. For example 50 µs, 10 µs.

It is possible because the processors are fast (40 MHz clock, 200 MHz clock) and the algorithm are short and physical affine (Analog values, fast communication).

It is necessary because some controlling tasks need a very high sampling rate, high timing resolution and a less dead time, especially for controlling of electrical signals.

For such step times of controlling algorithm a Real Time Operation System (RTOS) is a too high effort and it does not guarantee thread switches in such a less time. Also if the RTOS thread scheduler by itself may guarantee a thread switching in less than 1 µs, there are no guarantee that the other user thread blocks the switching because of some locked data etc (mutex).

The execution of essential parts of the application immediately in a hardware interrupt is a simple and universally known solution, used since the beginning of Microprocessor usage, but rarely discussed as principle.

2. Data exchange, mutex in interrupts

2.1. Compare situation with mutual exclusion access in threads of a RTOS

In a RTOS usual a lock algorithm is used to assure mutal exclusion on the access:

RTOS- Multithread & Lock

The image above shows the typical situation using a Real Time Operation System (RTOS).

2.2. What is possible in interrupt routines?

Using interrupts this cannot be done in this way: If the interrupt should write data and the lock is active, it is not possible to switch to the locking thread and then back to the interrupt. It means, this schema cannot be used.

What is possible?

  • A) Try to think about data exchange without mutex. Is it possible to organize?

  • B) Interrupt disable range in the backloop for mutual exclusion to data.

  • C) Usage of compare and swap access.

2.3. A) Keep it simple and separated

The case A) may be possible, think about and keep it simple. If you have a ring buffer which is only written in interrupt, and read out in the backloop, you can increment the write pointer in the interrupt, and the independent read pointer in the backloop:

//in interrupt:
int wr1 = this->wr +1;
if(wr1 >= this->size) { wr1 = 0; } //wraps
if(wr1 != this.rd) {         //if equal, then the buffer is full.
  data[this->wr] = value;    //store data from interrupt
  this->wr = wr1;
}
//in backloop
if(this->rd != this.wr) {
  value = data[this->rd];
  this->rd +=1;
}

It is very simple because the sensible indices to the data are separated. The value of this→rd is never changed in the interrupt, so the backloop have the full control. It is tested in interrupt, but the new value is set only after the access. This order is important.

2.4. B) Using disable_Interrupt()

If the same algorithm, a ring buffer, can be written in more as one interrupt, or in the backloop too, this is a more complex requirement. Such can be necessary if the ring buffer stores events (for state machine, for specific execution control), and the events can be written in the ring buffer both in interrupt and in the backloop. Then the problem is given:

//in interrupt:
int wr1 = this->wr +1;
if(wr1 >= this->size) { wr1 = 0; } //wraps
if(wr1 != this.rd) {         //if equal, then the buffer is full.
  //<<=== if another interrupt have done the same, the data position is used twice.
  data[this->wr] = value;    //store data from interrupt
  this->wr = wr1;
}

Using solution B) the whole write access should be wrapped with

disable_Interrupt();
  ....
enable_Interrupt();

in all interrupts and the backloop except the highest interrupt. If the access (write data in the buffer, more as a simple float) needs more time, the highest may be fast interrupt may be delayed. It produces a 'jitter' for interrupt processing. If the system has a very fast interrupt cycle (10 µs) with a tightly calculated remaining computing time or a necessary real timing in the execution of the interrupt (it may be so for special requirements) and the time to write data in the queue is greater than about 2..5 µs, it does not work! Generally the disable-interrupt time should be as less as possible.

The disable_Interrupt() and enable_Interrupt() are special machine instructions, for some processors only possible to use in the so named 'kernel mode'.

2.5. C) compare and swap

The 'compare and swap' methodology is possible to use both in interrupts and in multithreading. In interrupts it is the only one possible solution to organize a mutual exclusion (except disabling), for multithreading it is more fast. But 'compare and swap' can only be used if only one cell in RAM is the candidate of mutex. This is an important restriction, but some algorithm especially for container handling are possible with well-thouht-out usage of dedicate variables for mutex with 'compare and swap'.

'compare and swap' was introduced with Java-5 in about 2004. The basic is a machine instruction which was introduced already with the Intel 486 processor family:

lock cmpxchg addr, expect

For other processors using cache and/or multicore adequate machine instructions are existing. Because the 'compare and swap' istruction is processor specific, it should be organized in a special OSAL or HAL adaption layer ('Operation System Adaption Layer' or 'Hardware Adaption Layer'. For simple processors often used in embedded control such an instruction is not existent. But the functionality is the same as (for example for Texas Instruments TMS320C28 series):

static inline int32 compareAndSwap_AtomicInt32 (
  int32 volatile* reference, int32 expect, int32 update) {
  __asm(" setc INTM");
  int32 read = *reference;
  if(read == expect) {
    *reference = update;
  }
  __asm(" clrc INTM");
  return read;
 }

It is implemented in C language (or C++) but with special assembler instructions: The important thing is the interrupt disabling and enabling. It means the approach B) is used (chapter above), but not in the users algorithm, instead in a 'system routine' or OSAL or HAL layer. The application source uses only the common present `compareAndSwap_AtomicInt…​ ` invocation which is adapted for gcc on Intel-based PC, for Windows etc. in form:

int32 compareAndSwap_AtomicInt32(int32 volatile* reference, int32 expect, int32 update) {
 return InterlockedCompareExchange((uint32*)reference, (uint32)update, (uint32)expect);
}

and for GCC for Intel based execution:

int32 compareAndSwap_AtomicInt32(int32 volatile* reference, int32 expect, int32 update)
{ __typeof (*reference) ret;
 __asm __volatile ( "lock cmpxchgl %2, %1"
	       : "=a" (ret), "=m" (*reference)
	       : "r" (update), "m" (*reference), "0" (expect));
 return ret;
}

All three routines have the same signature, they are equal for usage in the application. The implementations are different due to the platform.

If an user’s algorithm is written with 'compare and swap', for example and especially using common container library routines, it can be used without adaption both in interrupts and its backloop and in multithreading in a RTOS. That’s the advantage.

'compare and swap' is slightly slower (more calculation time) in comparison with a simple disable interrupt part. The difference is given, but is is less. A fast interrupt as usual the highest priority, it can spare the encapsulation, execute immediately. No one interrupts the access, hence it is atomic too. In lower prior interrupts the minor increased calculation time may be admissible.

3. Organization of more as one task in the back loop

Though interrupts are immediately programmed with user’s code, a RTOS can run in the backloop. But often small processors are too less powerful for a complex RTOS. Any thread needs an extra stack, etc.

The important request is: If a user’s algorithm should run a longer time and other algorithm (threads) should be executed a longer time concurrently, only a RTOS can help. But if the user’s algorithm can be dissipate in short actions, any of it can be executed one after another (non preemptive), together with other short actions, a simple task system based on events is possible.

3.1. A simple task system without message queue

A task is a short routine to execute. An event is the occurence of the necessity to execute somewhat, in that meaning commonly known in information processing. An "event queue" is a false wording, events by itself cannot be queued. What is queued, it is a "message" to start an event. Hence the "event queue" is really a "message queue". This is often used in information processing.

But is a queue necessary? If the system knows less event types, any event can have its EventData_emC instance. In this instance the occurence of the event is marked, if it is occuring.

A simple backloop can abstain of a queue for messages to execute the event. Only the EventData_emC instance to notify the event is present. This instance contains a priority which helps to execute important events faster then lesser important ones. This may be better an an message queue with its FIFO principle ("first in, first out").

typedef struct EventData_emC_T {
 union { ObjectJc obj; } base;
 struct EvQueue_emC_T* evQueue;
 float Tstep;
 char const* idString;
 uint8 ctPriority;
 uint8 stateInp;
 uint eventIdent;
 int ctEvents;
 HandleADDR_emC(ObjectJc) hdst;
 HandleADDR_emC(int32) hdata;
 int32 timestamp;
} EventData_emC_s;

That are complete data of an EventData_emC instance. Not all are necessary for a small simple implementation on a poor processor.

  • evQueue can be null and helps to enqueue the event if necessary.

  • idString helps on debugging respectively comparison whether the instance is corrent on executing (check via ASSERT).

  • ctPriority is the important one to associate an executing priority. On activating the event this counter may be set to 1 for fast execution or to an higher value for unprior execution. All EventData_emC instances are checked, the priority is compared, and decremented. So a fast event is executed firstly. But if an event is activated for a longer time, it gets a faster priority on any check.

  • stateInp is a helper variable for edge detecting.

  • ctEvents is only for debugging, it counts the activating.

  • ` hdst` is either an memory address of a handle to the instance for a consumer of the event, may be null if the consumer is associated with the event instance by the user’s algorithm.

  • hdata is a memory address or a handle to the data instance.

  • timestamp is for debugging. Sometimes event execution blocks, or events are fastly repeated, in error situations. The timestamp of the event occurence can help.

The user’s software can consist of some EventData_emC instances. Either there are arranged intrincicly in an array, or an array of references to this instances should be part of the user’s software. With them a simple polling can be executed in the backloop. Because this algorithm needs a function pointer to the execution operation, and this function pointer should be protected against software errors, hence stored in the const memory, a second struct is used:

typedef struct TaskConstData_emC_T {
 char const* const signatureString;
 TaskOperation_emC operation;
 ObjectJc* thizData;
 EventData_emC_s* const evEntry;
} TaskConstData_emC;

This structure references the EventData_emC instance. The following operation works with an array of TaskConstData_emC which should be stored in the const memory (should be on Flash Memory) too:

void execEvPolling ( TaskConstData_emC const* const* evEntryArray, int nrofEntries) {
 TaskConstData_emC const* entryAct = null;
 uint minPrio = 255;
 for(int ix = 0; ix < nrofEntries; ++ix) {
   TaskConstData_emC const* entry = evEntryArray[ix];
   EventData_emC_s* entryData = entry->evEntry;
   if(entryData->ctPriority < minPrio) {
     entryAct = entry;
     minPrio = entryData->ctPriority;
   }
   if(entryData->ctPriority >1 && entryData->ctPriority != (uint)-1) {
     entryData->ctPriority -=1;    //ct down priority, till it is the highest one
   }
 }
 if(entryAct !=null) {
   EventData_emC_s* entryData = entryAct->evEntry;
   entryData->ctPriority = 0;
   entryAct->operation(entryAct);
 }
}

This routine is part of emC/Base/EventQu_emC.h and can be called in the backloop cycle. It invokes the task to the event if the EventData_emC instance is marked and has the highest priority (lowest ctPriority).

3.2. Using a message queue

The message queue is a better solution if many EventData_emC instance exist, the polling may need too much time, and the FIFO principle is important.

Message queue in back loop

The point of interesting of this solution in cohesion with the EventData_emC instance and the TaskConstData_emC const instances: The last one contains the function pointer to the operation to execute. Using virtual operations (C++) with the event data instances may cause security problems on any data error. The solution with the function pointer in the const memory is secure. Another aspect: In Simulink (able to use for graphical programming) so named "Triggered Operations" can be used to switch Simulink State Machines. The EventData_emC instances are FBlocks with the necessary Trigger output.

Hence, the EventData_emC instances are necessary though, the message queue only controls the sequence of execution.

…​further TODO, especially explain the Simulink solution.