How to fix TSparseArray<T>? How to fix TSparseArray<T>? multithreading multithreading

How to fix TSparseArray<T>?


This is not a bug in System.CopyArray. By design it only supports managed types. The bug is in fact in TArray.Copy<T>. That is mistaken in calling System.CopyArray without discriminating on whether or not T is a managed type.

However, the latest version of TArray.Copy<T>, from XE7 update 1 does not appear to suffer from the problem you describe. The code looks like this:

class procedure TArray.Copy<T>(const Source, Destination: array of T;   SourceIndex, DestIndex, Count: NativeInt);begin  CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex,     Length(Source), DestIndex, Length(Destination), Count);  if IsManagedType(T) then    System.CopyArray(Pointer(@Destination[SourceIndex]),       Pointer(@Source[SourceIndex]), TypeInfo(T), Count)  else    System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^,       Count * SizeOf(T));end;

Unless I am mistaken in my analysis, you simply need to apply update 1 to resolve the problems with System.CopyArray.


But as Uwe points out in comments below, this code is still bogus. It uses SourceIndex erroneously where DestIndex should be used. And the source and destination parameters are passed in the wrong order. One also wonders why the author wrote Pointer(@Destination[SourceIndex])^ rather than Destination[SourceIndex]. I find this whole situation terribly depressing. How can Embarcadero release code of such appalling quality?


Deeper than the above are the problems with TSparseArray<T>. Which looks like this:

function TSparseArray<T>.Add(const Item: T): Integer;var  I: Integer;  LArray, NewArray: TArray<T>;begin  while True do  begin    LArray := FArray;    TMonitor.Enter(FLock);    try      for I := 0 to Length(LArray) - 1 do      begin        if LArray[I] = nil then        begin          FArray[I] := Item;          Exit(I);        end else if I = Length(LArray) - 1 then        begin          if LArray <> FArray then            Continue;          SetLength(NewArray, Length(LArray) * 2);          TArray.Copy<T>(LArray, NewArray, I + 1);          NewArray[I + 1] := Item;          Exit(I + 1);        end;      end;    finally      TMonitor.Exit(FLock);    end;  end;end;

The only time FArray is initialized is in the TSparseArray<T> constructor. This means that if the array becomes full, then items are added and lost. Presumably the I = Length(LArray) - 1 is meant to extend the length of FArray and capture the new item. However, note also that TSparseArray<T> exposes FArray through the Current property. And this exposure is not protected by the lock. So, I cannot see how this class can behave in any useful way once FArray becomes full.

I suggest that you construct an example where FArray becomes full an demonstrate that items which are added are lost. Submit a bug report demonstrating that, and linking to this question.


It does not matter if the items are written to TSparseArray<T>, because it is only needed if a worker thread has finished all of the tasks delegated to him and another worker thread has not finished yet. At this point the idle thread is looking at the queues of the other treads inside the pool and tries to steal some work.

If any queue did not get into this array there are not visible to the idle threads and therefore the working load cannot be shared.

To fix that I choose option 2

function TSparseArray<T>.Add(const Item: T): Integer;...SetLength(NewArray, Length(LArray) * 2);TArray.Copy<T>(LArray, NewArray, I + 1); // <- No Exception here with XE7U1NewArray[I + 1] := Item;{$IFDEF USE_BUGFIX}FArray := NewArray;{$ENDIF}Exit(I + 1);

But that stealing part is risky implemented without any locking

procedure TThreadPool.TQueueWorkerThread.Execute;...if Signaled thenbegin  I := 0;  while I < Length(ThreadPool.FQueues.Current) do  begin    if (ThreadPool.FQueues.Current[I] <> nil)       and (ThreadPool.FQueues.Current[I] <> WorkQueue)      and ThreadPool.FQueues.Current[I].TrySteal(Item)     then      Break;    Inc(I);  end;  if I <> Length(ThreadPool.FQueues.Current) then    Break;  LookedForSteals := True;end

The array length is only growing so

while I < Length(ThreadPool.FQueues.Current) do

and

if I <> Length(ThreadPool.FQueues.Current) then

should be safe enough.

if Signaled thenbegin  I := 0;  while I < Length(ThreadPool.FQueues.Current) do  begin    {$IFDEF USE_BUGFIX}    TMonitor.Enter(ThreadPool.FQueues);    try    {$ENDIF}      if (ThreadPool.FQueues.Current[I] <> nil) and (ThreadPool.FQueues.Current[I] <> WorkQueue) and ThreadPool.FQueues.Current[I].TrySteal(Item) then        Break;    {$IFDEF USE_BUGFIX}    finally      TMonitor.Exit(ThreadPool.FQueues);    end;    {$ENDIF}    Inc(I);  end;  if I <> Length(ThreadPool.FQueues.Current) then    Break;  LookedForSteals := True;end

Now we need a test environment to watch the stealing:

program WatchStealingTasks;{$APPTYPE CONSOLE}{$R *.res}uses  Winapi.Windows,  System.SysUtils,  System.Threading,  System.Classes,  System.Math;procedure OutputDebugStr( const AStr: string ); overload;begin  OutputDebugString( PChar( AStr ) );end;procedure OutputDebugStr( const AFormat: string; const AParams: array of const ); overload;begin  OutputDebugStr( Format( AFormat, AParams ) );end;function CreateInnerTask( AThreadId: Cardinal; AValue: Integer; APool: TThreadPool ): ITask;begin  Result := TTask.Run(      procedure    begin      Sleep( AValue );      if AThreadId <> TThread.CurrentThread.ThreadID      then        OutputDebugStr( '[%d] executed stolen task from [%d]', [TThread.CurrentThread.ThreadID, AThreadId] )      else        OutputDebugStr( '[%d] executed task', [TThread.CurrentThread.ThreadID] );    end, APool );end;function CreateTask( AValue: Integer; APool: TThreadPool ): ITask;begin  Result := TTask.Run(    procedure    var      LIdx: Integer;      LTasks: TArray<ITask>;    begin      // Create three inner tasks per task      SetLength( LTasks, 3 );      for LIdx := Low( LTasks ) to High( LTasks ) do        begin          LTasks[LIdx] := CreateInnerTask( TThread.CurrentThread.ThreadID, AValue, APool );        end;      OutputDebugStr( '[%d] waiting for tasks completion', [TThread.CurrentThread.ThreadID] );      TTask.WaitForAll( LTasks );      OutputDebugStr( '[%d] task finished', [TThread.CurrentThread.ThreadID] );    end, APool );end;procedure Test;var  LPool: TThreadPool;  LIdx: Integer;  LTasks: TArray<ITask>;begin  OutputDebugStr( 'Test started' );  try    LPool := TThreadPool.Create;    try      // Create three tasks      SetLength( LTasks, 3 );      for LIdx := Low( LTasks ) to High( LTasks ) do        begin          // Let's put some heavy work (200ms) on the first tasks shoulder          // and the other tasks just some light work (20ms) to do          LTasks[LIdx] := CreateTask( IfThen( LIdx = 0, 200, 20 ), LPool );        end;      TTask.WaitForAll( LTasks );    finally      LPool.Free;    end;  finally    OutputDebugStr( 'Test completed' );  end;end;begin  try    Test;  except    on E: Exception do      Writeln( E.ClassName, ': ', E.Message );  end;  ReadLn;end.

And the debug log is

Debug-Ausgabe: Test started Prozess WatchStealingTasks.exe (4532)Thread-Start: Thread-ID: 2104. Prozess WatchStealingTasks.exe (4532)Thread-Start: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532)Thread-Start: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2188] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2104] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)Thread-Start: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [4948] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2188] task finished Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [4948] task finished Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2104] executed task Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2188] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [4948] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: [2104] task finished Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: Thread Exiting: 2188 Prozess WatchStealingTasks.exe (4532)Debug-Ausgabe: Thread Exiting: 4948 Prozess WatchStealingTasks.exe (4532)Thread-Ende: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532)Thread-Ende: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532)Thread-Ende: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532)

Ok, stealing should be working now with any number of worker threads, so everything is alright?

No

This small test application will not come to an end, because now it freezes inside the destructor of the thread pool. The last worker thread will not terminate caused by

procedure TThreadPool.TQueueWorkerThread.Execute;...if ThreadPool.FWorkerThreadCount = 1 thenbegin  // it is the last thread after all tasks executed, but  // FQueuedRequestCount is still on 7 - WTF  if ThreadPool.FQueuedRequestCount = 0 then  begin

One more bug to fix here ... because when waiting for tasks with Task.WaitForAll then all of the tasks you are now waiting for, were executed internally but will not decrease the FQueuedRequestCount.

Fixing that

function TThreadPool.TryRemoveWorkItem(const WorkerData: IThreadPoolWorkItem): Boolean;begin  Result := (QueueThread <> nil) and (QueueThread.WorkQueue <> nil);  if Result then    Result := QueueThread.WorkQueue.LocalFindAndRemove(WorkerData);  {$IFDEF USE_BUGFIX}  if Result then    DecWorkRequestCount;  {$ENDIF}end;

and now it runs like it should have done at once.


Update

As a comment by Uwe we also need to fix the fixed System.Generics.Collections.TArray.Copy<T>

class procedure TArray.Copy<T>(const Source, Destination: array of T; SourceIndex, DestIndex, Count: NativeInt);{$IFDEF USE_BUGFIX}begin  CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count);  if IsManagedType(T) then    System.CopyArray(Pointer(@Destination[DestIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count)  else    System.Move(Pointer(@Source[SourceIndex])^,Pointer(@Destination[DestIndex])^, Count * SizeOf(T) );end;{$ELSE}begin  CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count);  if IsManagedType(T) then    System.CopyArray(Pointer(@Destination[SourceIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count)  else    System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^, Count * SizeOf(T));end;{$ENDIF}

A simple check to test:

procedure TestArrayCopy;var  LArr1, LArr2: TArray<Integer>;begin  LArr1 := TArray<Integer>.Create( 10, 11, 12, 13 );  LArr2 := TArray<Integer>.Create( 20, 21 );  // copy the last 2 elements from LArr1 to LArr2  TArray.Copy<Integer>( LArr1, LArr2, 2, 0, 2 );end;
  • with XE7 you will get an exception
  • with XE7 Update1 you will get
    LArr1 = ( 10, 11, 0, 0 )LArr2 = ( 20, 21 )
  • with that fix above will get
    LArr1 = ( 10, 11, 12, 13 )LArr2 = ( 12, 13 )