#!raku
use Sparky::JobApi;
use File::Directory::Tree;
#use Ecosystem;
use JSON::Fast;
use URI::Encode;
my %agent-stat;
my $round-id = Int(now);
class Pipeline
does Sparky::JobApi::Role
{
method base-dir {
"{%*ENV<HOME>}/.brownie"
}
method !distribute ($list,$agents, $sent-back,$rakudo-version) {
my @sent;
my @agents = $agents<>;
return @sent unless $list.elems;
return @sent unless @agents.elems;
my $chunk = 0;
#say "distribute, redistribution: $sent-back";
if @agents.elems > 0 && $list.elems > 0 {
if $list.elems < @agents.elems {
$chunk = 1;
@agents = @agents[0 .. $list.elems -1];
} else {
$chunk = Int($list.elems/@agents.elems);
$chunk = 10 if $chunk > 10;
}
my $a = 0;
say "agents used: {@agents.elems}";
say "chunk: $chunk";
for 1 .. @agents.elems -> $i {
my @slice = $list[$a .. $a + $chunk - 1];
for @slice -> $m {
# invalidate agents modules cache after 60 minutes
next unless %agent-stat{@agents[$i-1]}:exists;
next unless %agent-stat{@agents[$i-1]}{$m}:exists;
if Int(now - %agent-stat{@agents[$i-1]}{$m}) > 600*6 {
%agent-stat{@agents[$i-1]}{$m}:delete;
};
}
# grep makes it sure
# modules are not sent to agents
# to which they have already been sent
#@slice = @slice.grep({not %agent-stat{@agents[$i-1]}{$_}:exists}).sort;
if @slice.elems {
say "push job to agent {@agents[$i-1]}";
my $agents-queue = Sparky::JobApi.new(
:project<browny.queue>,
:job-id(@agents[$i-1]),
);
for @slice -> $m {
%agent-stat{@agents[$i-1]}{$m} = now;
}
$agents-queue.put-stash(
%(
version => $rakudo-version,
source-code => tags()<source_code> || False,
agent => @agents[$i-1],
modules => @slice,
run-id => time,
round-id => $round-id,
)
);
say "slice: {@slice.raku}";
}
$a = $a + $chunk;
say "...";
for @slice -> $s {
push @sent, $s;
}
}
}
say "debug: distribute, redistribution: {$sent-back}, {$list.elems}, {@agents.elems} ==> {@sent.elems}";
push @sent;
}
method stage-main {
#my $eco = Ecosystem.new;
my $default-version = "fe0e20c28859ea709eefad19af452c9d35dff20d";
my %rel-to-sha = %(
"2025.11" => $default-version,
"2025.10" => "3f8de27bcb2ee987a2997de8fe314c460b7e53f9",
"2025.08" => "0310b2a586e6a80d2cced05675abe76a1ba73e02",
"2025.06.1" => "cf048a948a850ec7e5923021fe9fc9112056199d",
"2025.06" => "bc4329b15b5085387189864452cd062222bdfbea",
"2025.05" => "c8ffd6dbdd63f6cf90d4832204e0a0d26217bc3e",
"2025.04" => "57778e432003df466d8c797070345b81cb1ffdbf",
);
my $mod_cnt = Int(tags()<mods_cnt> || 100);
my $ignore-skip = tags()<ignore_skip> || False;
my $rakudo-version = tags()<rakudo_version_release> eq "sha" ?? (tags()<sha> || $default-version)
!! (%rel-to-sha{tags()<rakudo_version_release>} || $default-version);
directory "{self.base-dir}/versions.known/{$rakudo-version}";
my @modules = (tags()<modules> || "").split(",").grep(/\S/).map({.chomp});
if tags()<modules> {
say "Testing module(s) against Rakudo version: {tags()<rakudo_version_release>} ($rakudo-version)";
say "modules: ", @modules;
} else {
say "Testing $mod_cnt modules against Rakudo version: {tags()<rakudo_version_release>} ($rakudo-version)";
}
say ">>>";
say "install from source code: ", tags()<source_code> || False;
say "use shuffle: ", tags()<shuffle> || False;
say "ignore_skip: ", $ignore-skip;
say "<<<";
my $start_time = time;
my @d;
my %skip;
unless $ignore-skip {
for config()<skip-tests><> -> $s {
%skip{$s} = True;
}
}
if tags()<modules> {
@d = @modules
} else {
bash q{curl https://raw.githubusercontent.com/Raku/REA/main/META.json -sf | jq -r '.[] | .name' | uniq > list.txt};
# eliminate skip-test modules
for "list.txt".IO.lines() -> $m {
if %skip{$m}:exists {
say "skip module $m due to skip test list";
} else {
push @d, $m;
}
}
@d.=pick(@d) if tags()<shuffle>;
if @d.elems < $mod_cnt {
$mod_cnt = @d.elems
}
@d = @d[0 .. $mod_cnt - 1];
}
#directory-delete("{self.base-dir}/versions/$rakudo-version/");
# for $eco.river.map(*.key).sort.head(100) -> $d {
# push @d, $d;
# #say $d;
# }
"list.txt".IO.spurt(@d.join("\n"));
my $me = Sparky::JobApi.new(:mine);
$me.put-file("list.txt","list.txt");
my $j = 0;
my %sent;
if tags()<modules> && $ignore-skip {
say "ignore skip is on and module is set, clean up module data";
for @modules -> $m {
if "{self.base-dir}/versions/$rakudo-version/{$m}".IO ~~ :d {
rmtree "{self.base-dir}/versions/$rakudo-version/{$m}";
}
if "{self.base-dir}/versions/$rakudo-version/{$m}.log".IO ~~ :f {
"{self.base-dir}/versions/$rakudo-version/{$m}.log".IO.unlink;
};
if "{self.base-dir}/versions/$rakudo-version/{$m}.FAIL".IO ~~ :f {
"{self.base-dir}/versions/$rakudo-version/{$m}.FAIL".IO.unlink;
};
if "{self.base-dir}/versions/$rakudo-version/{$m}.OK".IO ~~ :f {
"{self.base-dir}/versions/$rakudo-version/{$m}.OK".IO.unlink;
};
}
}
while (True) {
my @_list = []; # tests to distribute
my $done = 0; # finished tests
my @agents; # online agents
say "look for available agents ...";
if "{self.base-dir}/agents".IO ~~ :d {
for dir("{self.base-dir}/agents") -> $ag {
my $age = Int(now - $ag.created.DateTime);
if $age <= 360 { # only use agents with 6 minutes ago at least heartbeat
my $jobs-run-cnt = 0;
my $max-threads = 3;
if "{self.base-dir}/agents-meta/{$ag.basename}.json".IO ~~ :f {
my %meta = from-json("{self.base-dir}/agents-meta/{$ag.basename}.json".IO.slurp);
$jobs-run-cnt = %meta<jobs-run-cnt> || 0;
$max-threads = %meta<max-threads> || 3;
}
# next if $max-threads > 10; # temporary disable patric's agent
say "agent {$ag.basename} heartbeat OK - [$age] seconds | job runs cnt - [$jobs-run-cnt] / max-threads - [$max-threads]";
push @agents, %( name => $ag.basename, jobs-run-cnt => Int($jobs-run-cnt), max-threads => Int($max-threads) );
} else {
say "remove agent {$ag.basename} from rotation, BAD heartbit - $age seconds";
$ag.unlink;
}
};
}
say "xxx";
say "{@agents.elems} online agents found";
# prepare tests for
# distribution
my @done;
for @d -> $l {
if "{self.base-dir}/versions/$rakudo-version/{$l}.OK".IO ~~ :f ||
"{self.base-dir}/versions/$rakudo-version/{$l}.FAIL".IO ~~ :f {
# module is already completed
$done++;
push @done, $l;
next;
}
if %sent{$l} {
push @_list, %( module => $l, send-back => True );
} else {
push @_list, %( module => $l, send-back => False );
}
}
# notify agents about finsihed modules
if @done.elems {
say "notify agents about {@done.elems} processed modules";
my $agents-feed = Sparky::JobApi.new(
:project<browny.feedback>,
:job-id<done>,
);
$agents-feed.put-stash(
%(
done => @done,
version => $rakudo-version,
)
);
} else {
say "notify agents about 0 processed modules";
my $agents-feed = Sparky::JobApi.new(
:project<browny.feedback>,
:job-id<done>,
);
$agents-feed.put-stash(
%(
done => [],
version => $rakudo-version,
)
);
}
# shake list for more
# efficient distribution
@_list.=pick(@_list);
#say "current list: ", @_list.raku;
if $done == @d.elems {
last;
}
# distibute tests on agents with enough capacity
my @s = self!distribute(
[@_list.grep({$_<send-back> == False}).map({$_<module>})],
[@agents.grep({$_<max-threads> >= $_<jobs-run-cnt>}).map({$_<name>})],
False,
$rakudo-version,
);
my @rs;
if @s.elems == 0 {
# if no new tests have been distributed
# re-distibute tests on agents with enough capacity
@rs = self!distribute(
[@_list.grep({$_<send-back> == True}).map({$_<module>})],
[@agents.grep({$_<max-threads> >= $_<jobs-run-cnt>}).map({$_<name>})],
True,
$rakudo-version,
);
# mark distributed tests
for @rs -> $s {
%sent{$s} = now;
};
}
# mark distributed tests
for @s -> $s {
%sent{$s} = now;
};
my $els = Int(time - $start_time);
my $elh;
if $els < 60 {
$elh = "<1m";
} elsif $els < 60*60 {
$elh = sprintf("%.2dm", Int($els/60));
} elsif $els < 60*60*24 {
$elh = sprintf("%.3dh", Int($els/(60*60)));
}
say "||| TESTS STAT: rakudo_version: {tags()<rakudo_version_release>} | time: {$elh} | tests total: {@d.elems} | finished tests: {$done} | sent to queue: dist={@s.elems} / redist={@rs.elems} | agents cnt: {@agents.elems}";
sleep(20);
$j++;
last if $j > 10000;
}
say "done";
say "summary";
my @summary;
for @d -> $m {
my $meta = from-json("{self.base-dir}/versions/$rakudo-version/$m/meta.json".IO.slurp);
say "$m ... \t {$meta<status> ?? 'OK' !! 'FAIL' } \t {$meta<time>} sec";
$meta<name> = $m;
push @summary, $meta;
}
"summary.txt".IO.spurt(
@summary.sort({ $_<time> }).
reverse.
map({"$_<name> ... \t {$_<status> ?? 'OK' !! 'FAIL' } \t {$_<time>} sec\n"})
);
for @d -> $m {
if "{self.base-dir}/versions/$rakudo-version/{$m}.log".IO ~~ :f {
$me.put-file("{self.base-dir}/versions/$rakudo-version/{$m}.log","{uri_encode_component($m)}.log");
}
}
$me.put-file("summary.txt","summary.txt");
#my $brw-orch = Sparky::JobApi.new(
# :project<brw-orch>,
#);
#$brw-orch.queue({
# description => "go_go_go",
#});
}
}
Pipeline.new.run;