Project: brw-orch

Build now

Configuration

sparrowdo:
  no_sudo: true
  no_index_update: false
  bootstrap: false
  format: default
  repo: https://sparrowhub.io/repo
  tags: rakudo_version_release=2025.11
disabled: false
keep_builds: 100
allow_manual_run: true

vars:
  -
      name: mods_cnt
      values: [ 2, 10, 100, 200, 1000, 2000, 3000 ]
      type: select
      default: 2

  -
        name: shuffle
        default: false
        type: checkbox

  -
        name: ignore_skip
        default: false
        type: checkbox

  -
        name: rakudo_version_release
        values: [ sha, "2025.11", "2025.10", "2025.08", "2025.06.1", "2025.06", "2025.05", "2025.04" ]
        type: select
        default: sha
  -
        name: sha
        type: input

  -
        name: modules
        type: textarea

  -
        name: source_code
        default: false
        type: checkbox

Job

#!raku

use Sparky::JobApi;
use File::Directory::Tree;

#use Ecosystem;

use JSON::Fast;

use URI::Encode;

my %agent-stat;

my $round-id = Int(now);

class Pipeline

does Sparky::JobApi::Role

{

  method base-dir {
    "{%*ENV<HOME>}/.brownie"
  }

  method !distribute ($list,$agents, $sent-back,$rakudo-version) {

    my @sent;

    my @agents = $agents<>;

    return @sent unless $list.elems;

    return @sent unless @agents.elems;

    my $chunk = 0;
    
    #say "distribute, redistribution: $sent-back";

    if @agents.elems > 0 && $list.elems > 0 {

      if $list.elems < @agents.elems {
          $chunk = 1;
          @agents = @agents[0 .. $list.elems -1];
      } else {
        $chunk = Int($list.elems/@agents.elems);
        $chunk = 10 if $chunk > 10;
      }
    
      my $a = 0;

      say "agents used: {@agents.elems}";

      say "chunk: $chunk";

      for 1 .. @agents.elems -> $i {

        my @slice = $list[$a .. $a + $chunk - 1];

        for @slice -> $m {
          # invalidate agents modules cache after 60 minutes
          next unless %agent-stat{@agents[$i-1]}:exists;
          next unless %agent-stat{@agents[$i-1]}{$m}:exists;
          if Int(now - %agent-stat{@agents[$i-1]}{$m}) > 600*6 {
              %agent-stat{@agents[$i-1]}{$m}:delete;
          };
        }

        # grep makes it sure
        # modules are not sent to agents
        # to which they have already been sent

        #@slice = @slice.grep({not %agent-stat{@agents[$i-1]}{$_}:exists}).sort;

        if @slice.elems {

        say "push job to agent {@agents[$i-1]}";

        my $agents-queue = Sparky::JobApi.new(
          :project<browny.queue>,
          :job-id(@agents[$i-1]),
        );

        for @slice -> $m {
          %agent-stat{@agents[$i-1]}{$m} = now;
        }
        $agents-queue.put-stash(
          %( 
            version => $rakudo-version,
            source-code => tags()<source_code> || False,
            agent => @agents[$i-1],
            modules => @slice,
            run-id => time,
            round-id => $round-id,
          )
        );

        say "slice: {@slice.raku}";
        }
        $a = $a + $chunk;
        say "...";
        
        for @slice -> $s {
          push @sent, $s;
        }
      }
    }

    say "debug: distribute, redistribution: {$sent-back}, {$list.elems}, {@agents.elems} ==> {@sent.elems}";

    push @sent;

  }

  method stage-main {

    #my $eco = Ecosystem.new;

    my $default-version = "fe0e20c28859ea709eefad19af452c9d35dff20d";
    
    my %rel-to-sha = %(
      "2025.11" => $default-version,
      "2025.10" => "3f8de27bcb2ee987a2997de8fe314c460b7e53f9",
      "2025.08" => "0310b2a586e6a80d2cced05675abe76a1ba73e02",
      "2025.06.1" => "cf048a948a850ec7e5923021fe9fc9112056199d",
      "2025.06" => "bc4329b15b5085387189864452cd062222bdfbea",
      "2025.05" => "c8ffd6dbdd63f6cf90d4832204e0a0d26217bc3e",
      "2025.04" => "57778e432003df466d8c797070345b81cb1ffdbf",
    );

    my $mod_cnt = Int(tags()<mods_cnt> || 100);

    my $ignore-skip = tags()<ignore_skip> || False;

    my $rakudo-version = tags()<rakudo_version_release> eq "sha" ?? (tags()<sha> || $default-version)
    !! (%rel-to-sha{tags()<rakudo_version_release>} || $default-version);
        
    directory "{self.base-dir}/versions.known/{$rakudo-version}";

    my @modules = (tags()<modules> || "").split(",").grep(/\S/).map({.chomp});

    if tags()<modules> {
       say "Testing module(s) against Rakudo version: {tags()<rakudo_version_release>} ($rakudo-version)";
       say "modules: ", @modules;
    } else {
       say "Testing $mod_cnt modules against Rakudo version: {tags()<rakudo_version_release>} ($rakudo-version)";
    }
 
    say ">>>";
    say "install from source code: ", tags()<source_code> || False;
    say "use shuffle: ", tags()<shuffle> || False;
    say "ignore_skip: ", $ignore-skip;
    say "<<<";
    
    my $start_time = time;

    my @d;

    my %skip;
    unless $ignore-skip {
      for config()<skip-tests><> -> $s {
        %skip{$s} = True;
      }
    }

    if tags()<modules> {
      @d = @modules
    } else {
      bash q{curl https://raw.githubusercontent.com/Raku/REA/main/META.json -sf | jq -r '.[] | .name' | uniq > list.txt};
      # eliminate skip-test modules
      for "list.txt".IO.lines() -> $m {
        if %skip{$m}:exists {
           say "skip module $m due to skip test list";
        } else {
           push @d, $m;
        }
      }
      @d.=pick(@d) if tags()<shuffle>;
      if  @d.elems < $mod_cnt {
        $mod_cnt = @d.elems
      }
      @d = @d[0 .. $mod_cnt - 1];
    }
    
    #directory-delete("{self.base-dir}/versions/$rakudo-version/");

    # for $eco.river.map(*.key).sort.head(100) -> $d {
    #     push @d, $d;
    #     #say $d;
    # }

    "list.txt".IO.spurt(@d.join("\n"));

    my $me = Sparky::JobApi.new(:mine);

    $me.put-file("list.txt","list.txt");
    
    my $j = 0;

    my %sent;

    if tags()<modules> && $ignore-skip {
      say "ignore skip is on and module is set, clean up module data";
      for @modules -> $m {
        if "{self.base-dir}/versions/$rakudo-version/{$m}".IO ~~ :d {
          rmtree "{self.base-dir}/versions/$rakudo-version/{$m}";
        }
        if "{self.base-dir}/versions/$rakudo-version/{$m}.log".IO ~~ :f {
          "{self.base-dir}/versions/$rakudo-version/{$m}.log".IO.unlink;
        };
        if "{self.base-dir}/versions/$rakudo-version/{$m}.FAIL".IO ~~ :f {
          "{self.base-dir}/versions/$rakudo-version/{$m}.FAIL".IO.unlink;
        };
        if "{self.base-dir}/versions/$rakudo-version/{$m}.OK".IO ~~ :f {
          "{self.base-dir}/versions/$rakudo-version/{$m}.OK".IO.unlink;
        };
      }
    }

    while (True) {

      my @_list = []; # tests to distribute
      
      my $done = 0; # finished tests

      my @agents; # online agents

      say "look for available agents ...";

      if "{self.base-dir}/agents".IO ~~ :d { 
        
        for dir("{self.base-dir}/agents") -> $ag {

          my $age = Int(now - $ag.created.DateTime);

          if $age <= 360 { # only use agents with 6 minutes ago at least heartbeat
            my $jobs-run-cnt = 0;
            my $max-threads = 3;
            if "{self.base-dir}/agents-meta/{$ag.basename}.json".IO ~~ :f {
              my %meta = from-json("{self.base-dir}/agents-meta/{$ag.basename}.json".IO.slurp);
              $jobs-run-cnt = %meta<jobs-run-cnt> || 0;
              $max-threads = %meta<max-threads> || 3;
            }
            # next if $max-threads > 10; # temporary disable patric's agent
            say "agent {$ag.basename} heartbeat OK - [$age] seconds | job runs cnt - [$jobs-run-cnt] / max-threads - [$max-threads]";
            push @agents, %( name => $ag.basename, jobs-run-cnt => Int($jobs-run-cnt), max-threads => Int($max-threads) );
          } else {
            say "remove agent {$ag.basename} from rotation, BAD heartbit -  $age seconds";
            $ag.unlink; 
          }
        };
      }

      say "xxx";

      say "{@agents.elems} online agents found";
      
      # prepare tests for 
      # distribution

      my @done;

      for @d -> $l {
        if "{self.base-dir}/versions/$rakudo-version/{$l}.OK".IO ~~ :f  ||
           "{self.base-dir}/versions/$rakudo-version/{$l}.FAIL".IO ~~ :f  {
              # module is already completed
              $done++;
              push @done, $l;
              next;
        }
        if %sent{$l} {
          push @_list, %( module => $l, send-back => True );
        } else {
          push @_list, %( module => $l, send-back => False );
        }
      }

      # notify agents about finsihed modules
      if @done.elems {
        say "notify agents about {@done.elems} processed modules";
        my $agents-feed = Sparky::JobApi.new(
          :project<browny.feedback>,
          :job-id<done>,
        );
        $agents-feed.put-stash(
          %(
            done => @done,
            version => $rakudo-version,
          )
        );
      } else {
        say "notify agents about 0 processed modules";
        my $agents-feed = Sparky::JobApi.new(
          :project<browny.feedback>,
          :job-id<done>,
        );
        $agents-feed.put-stash(
          %(
            done => [],
            version => $rakudo-version,
          )
        );        
      }
      
      # shake list for more
      # efficient distribution

      @_list.=pick(@_list);

      #say "current list: ", @_list.raku;

      if $done == @d.elems {
        last;
      }
      
      # distibute tests on agents with enough capacity
      my @s = self!distribute(
        [@_list.grep({$_<send-back> == False}).map({$_<module>})],
        [@agents.grep({$_<max-threads> >= $_<jobs-run-cnt>}).map({$_<name>})],
        False, 
        $rakudo-version,
      );
      my @rs;
      if @s.elems == 0 {
        # if no new tests have been distributed
        # re-distibute tests on agents with enough capacity
        @rs = self!distribute(
          [@_list.grep({$_<send-back> == True}).map({$_<module>})],
          [@agents.grep({$_<max-threads> >= $_<jobs-run-cnt>}).map({$_<name>})],
          True,
          $rakudo-version,
        );
        # mark distributed tests
        for @rs -> $s {
          %sent{$s} = now;
        };
      }
      # mark distributed tests
      for @s -> $s {
        %sent{$s} = now;
      };

      my $els = Int(time - $start_time);

      my $elh;

      if $els < 60 {
         $elh = "<1m";
      } elsif $els < 60*60 {
         $elh = sprintf("%.2dm", Int($els/60));
      } elsif $els < 60*60*24 {
         $elh = sprintf("%.3dh", Int($els/(60*60)));
      }

      say "||| TESTS STAT: rakudo_version: {tags()<rakudo_version_release>} | time: {$elh} | tests total: {@d.elems} | finished tests: {$done} | sent to queue: dist={@s.elems} / redist={@rs.elems} | agents cnt: {@agents.elems}";

      sleep(20);

      $j++;
      
      last if $j > 10000;
      
    }

    say "done";

    say "summary";

    my @summary;

    for @d -> $m {

      my $meta = from-json("{self.base-dir}/versions/$rakudo-version/$m/meta.json".IO.slurp);
 
      say "$m ... \t {$meta<status> ?? 'OK' !! 'FAIL' } \t {$meta<time>} sec";
      $meta<name> = $m;
      push @summary, $meta;
     
    }

    "summary.txt".IO.spurt(
      @summary.sort({ $_<time> }).
      reverse.
      map({"$_<name> ... \t {$_<status> ?? 'OK' !! 'FAIL' } \t {$_<time>} sec\n"})
    );
    
    for @d -> $m {
      if "{self.base-dir}/versions/$rakudo-version/{$m}.log".IO ~~ :f {
        $me.put-file("{self.base-dir}/versions/$rakudo-version/{$m}.log","{uri_encode_component($m)}.log");
      }
    }

    $me.put-file("summary.txt","summary.txt");

    #my $brw-orch = Sparky::JobApi.new(
     # :project<brw-orch>,
    #);

    #$brw-orch.queue({
     # description => "go_go_go", 
    #});
  }
}  

Pipeline.new.run;