diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 05c75ecbb..6d2471af3 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -10,7 +10,7 @@ on: inputs: platform: description: 'Platform' - default: 'linux/amd64,linux/arm64/v8' + default: 'linux/amd64' required: false delete: @@ -67,7 +67,7 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - - + - name: Push Docker Image X86_64 # if: ${{ arch != 'arm' }} uses: docker/build-push-action@v3 @@ -78,7 +78,7 @@ jobs: tags: ${{ steps.docker_meta.outputs.tags }} labels: ${{ steps.docker_meta.outputs.labels }} - # - + # - # name: Push Docker Image # if: ${{ arch == 'arm' }} # uses: docker/build-push-action@v3 @@ -132,4 +132,4 @@ jobs: uses: WIPACrepo/build-singularity-cvmfs-action@v1.2 with: github_token: ${{ secrets.PERSONAL_ACCESS_TOKEN }} # so job can git push - remove_regex_path: 'realtime/${{ github.event.ref }}.*' \ No newline at end of file + remove_regex_path: 'realtime/${{ github.event.ref }}.*' diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0761b6f9d..18ee2ff25 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,13 +19,15 @@ env: PY_COLORS: "1" BOT_NAME: wipacdevbot BOT_EMAIL: developers@icecube.wisc.edu + # + CI_DOCKER_IMAGE_TAG: icecube/skymap_scanner:local + # CI_TEST_RUN_STDOUT_STDERR_DIR: /home/runner/work/skymap_scanner/testrun_outputs + N_WORKERS: 2 REALTIME_EVENTS_DIR: /home/runner/work/skymap_scanner/skymap_scanner/tests/data/realtime_events - SKYSCAN_CACHE_DIR: /home/runner/work/skymap_scanner/skymap_scanner/cache - SKYSCAN_OUTPUT_DIR: /home/runner/work/skymap_scanner/skymap_scanner/output - SKYSCAN_DEBUG_DIR: /home/runner/work/skymap_scanner/skymap_scanner/debug - EWMS_PILOT_DUMP_SUBPROC_OUTPUT: False # get logs in "reco-icetray logs" step instead - EWMS_PILOT_STOP_LISTENING_ON_TASK_ERROR: False # allow a reco to fail (if it keeps failing on redelivery/ies, expect it to affect the scanner server's timeout eventually) + CI_SKYSCAN_CACHE_DIR: /home/runner/work/skymap_scanner/skymap_scanner/cache + CI_SKYSCAN_OUTPUT_DIR: /home/runner/work/skymap_scanner/skymap_scanner/output + CI_SKYSCAN_DEBUG_DIR: /home/runner/work/skymap_scanner/skymap_scanner/debug # see source tests/env-vars.sh @@ -42,15 +44,15 @@ jobs: outputs: matrix: ${{ steps.versions.outputs.matrix }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - id: versions - uses: WIPACrepo/wipac-dev-py-versions-action@v2.2 + uses: WIPACrepo/wipac-dev-py-versions-action@v2.5 # flake8: # runs-on: ubuntu-latest # steps: - # - uses: actions/checkout@v3 - # - uses: actions/setup-python@v3 + # - uses: actions/checkout@v4 + # - uses: actions/setup-python@v5 # - uses: WIPACrepo/wipac-dev-flake8-action@v1.0 mypy: @@ -61,8 +63,8 @@ jobs: matrix: py3: ${{ fromJSON(needs.py-versions.outputs.matrix) }} steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v3 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.py3 }} - uses: WIPACrepo/wipac-dev-mypy-action@v2.0 @@ -71,7 +73,7 @@ jobs: runs-on: ubuntu-latest steps: - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: token: ${{ secrets.PERSONAL_ACCESS_TOKEN }} - uses: WIPACrepo/wipac-dev-py-setup-action@v3.1 @@ -88,7 +90,7 @@ jobs: with: docker-images: false - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: token: ${{ secrets.PERSONAL_ACCESS_TOKEN }} - uses: WIPACrepo/wipac-dev-py-dependencies-action@v1.1 @@ -107,23 +109,35 @@ jobs: test-build-docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: docker/setup-buildx-action@v2 - - uses: docker/build-push-action@v3 + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/build-push-action@v6 with: context: . - cache-from: type=gha - cache-to: type=gha,mode=min - file: Dockerfile - tags: icecube/skymap_scanner:local - + file: ./Dockerfile + tags: ${{ env.CI_DOCKER_IMAGE_TAG }} + load: true - test-run-singularity-dummy-reco: - needs: test-build-docker + test-run-dummy: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + reco_algo: [ + "dummy", + "crash_dummy" + ] + container_platform: [ + "docker", + "apptainer", + ] env: - SKYSCAN_BROKER_ADDRESS: user1:password@localhost/test - # SKYSCAN_BROKER_AUTH: user1 # using this would override password in address + SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS: user1@localhost/test + SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN: password # using this would override password in address + SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS: user1@localhost/test + SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN: password # using this would override password in address + EWMS_PILOT_STOP_LISTENING_ON_TASK_ERROR: False # we want many crashes + EWMS_PILOT_TASK_TIMEOUT: 5 services: rabbitmq: # see image.tag -> https://github.com/Observation-Management-Service/path-kubernetes/blob/main/helm-values-rabbitmq-bitnami.yaml (see https://artifacthub.io/packages/helm/bitnami/rabbitmq/11.14.3) @@ -147,76 +161,95 @@ jobs: - uses: jlumbroso/free-disk-space@main # need space for mq broker and image with: docker-images: false - - uses: actions/checkout@v3 - - uses: docker/setup-buildx-action@v2 - - uses: docker/build-push-action@v3 + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/build-push-action@v6 with: context: . - cache-from: type=gha - # cache-to: type=gha,mode=min - file: Dockerfile - tags: icecube/skymap_scanner:local + file: ./Dockerfile + tags: ${{ env.CI_DOCKER_IMAGE_TAG }} load: true - - uses: eWaterCycle/setup-apptainer@v2 + - if: ${{ matrix.container_platform == 'apptainer' }} + uses: eWaterCycle/setup-apptainer@v2 with: apptainer-version: 1.3.2 - - name: build singularity image + - if: ${{ matrix.container_platform == 'apptainer' }} + name: build apptainer (singularity) image run: | - sudo singularity build skymap_scanner.sif docker-daemon://icecube/skymap_scanner:local + sudo singularity build skymap_scanner.sif docker-daemon://$CI_DOCKER_IMAGE_TAG ls -lh skymap_scanner.sif - - name: run singularity container + + - name: run + timeout-minutes: 15 # on average ~9min # yes, `timeout` is used below but this is insurance run: | + set -x source tests/env-vars.sh + export _RECO_ALGO="${{ matrix.reco_algo }}" + export _EVENTS_FILE=$(realpath $REALTIME_EVENTS_DIR/hese_event_01.json) + export _NSIDES="1:0 2:12 4:12" + + if [ "${{ matrix.container_platform }}" == "apptainer" ]; then + export _RUN_THIS_SINGULARITY_IMAGE="$(realpath skymap_scanner.sif)" + export _EWMS_PILOT_APPTAINER_IMAGE_DIRECTORY_MUST_BE_PRESENT=False + fi + + if [ "${{ matrix.reco_algo }}" == "crash_dummy" ]; then + export _SKYSCAN_CI_CRASH_DUMMY_PROBABILITY=0.75 + fi + + cd ./resources/launch_scripts + # since _SKYSCAN_CI_CRASH_DUMMY_PROBABILITY<1, this step will go forever + # so, stop it after some time and chek that it has stderrfiles + timeout 240 ./local-scan.sh $N_WORKERS $CI_TEST_RUN_STDOUT_STDERR_DIR || true + + - if: ${{ matrix.reco_algo == 'crash_dummy' }} + name: look at stderrfiles + run: | + set -x + # check for fails/errors - mkdir $SKYSCAN_CACHE_DIR - mkdir $SKYSCAN_OUTPUT_DIR - - # Launch Server - singularity run skymap_scanner.sif \ - python -m skymap_scanner.server \ - --reco-algo dummy \ - --event-file $REALTIME_EVENTS_DIR/hese_event_01.json \ - --cache-dir $SKYSCAN_CACHE_DIR \ - --output-dir $SKYSCAN_OUTPUT_DIR \ - --client-startup-json ./startup.json \ - --nsides 1:0 \ - --simulated-event \ - & - - ./resources/launch_scripts/wait_for_file.sh ./startup.json $CLIENT_STARTER_WAIT_FOR_STARTUP_JSON - - # Launch Clients - nclients=2 - echo "Launching $nclients clients" - mkdir $SKYSCAN_DEBUG_DIR - export EWMS_PILOT_TASK_TIMEOUT=1800 # 30 mins - for i in $( seq 1 $nclients ); do - singularity run skymap_scanner.sif \ - python -m skymap_scanner.client \ - --client-startup-json ./startup.json \ - --debug-directory $SKYSCAN_DEBUG_DIR \ - & - echo -e "\tclient #$i launched" - done - - wait -n # for server - for i in $( seq 1 $nclients ); do - wait -n # for client - done - - name: look at results file (.npz) + error_type_1='intentional crash-dummy error' + error_type_2='subprocess timed out after' + pattern="$error_type_1|$error_type_2" + + if find "$CI_TEST_RUN_STDOUT_STDERR_DIR/worker-"*/pilot.out -type f -exec grep -qE "$pattern" {} +; then + echo "Match(es) found: PilotSubprocessError and/or TimeoutError occurred." + else + echo "No matches found." + exit 1 + fi + - if: ${{ matrix.reco_algo == 'dummy' }} + name: look at results file (.npz) run: | ls . - ls $SKYSCAN_OUTPUT_DIR - outfile=$(ls -d $SKYSCAN_OUTPUT_DIR/*.npz) + ls $CI_SKYSCAN_OUTPUT_DIR + outfile=$(ls -d $CI_SKYSCAN_OUTPUT_DIR/*.npz) echo $outfile - - name: broker docker logs + + - name: central server stdout/stderr if: always() run: | - docker logs rabbitmq - + cat $CI_TEST_RUN_STDOUT_STDERR_DIR/server.out + - name: worker pilot \#1 stdout/stderr + if: always() + run: | + more $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot.out | cat + - name: worker clients / reco-icetray instances \#1 stdouts/stderrs + if: always() + run: | + find $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot-* -name "stderrfile" -o -name "stdoutfile" | xargs more | cat + echo "::::::::::::::" && tree $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot-* + - name: worker pilot \#2 stdout/stderr + if: always() && env.N_WORKERS == '2' + run: | + more $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot.out | cat + - name: worker clients / reco-icetray instances \#2 stdouts/stderrs + if: always() && env.N_WORKERS == '2' + run: | + find $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot-* -name "stderrfile" -o -name "stdoutfile" | xargs more | cat + echo "::::::::::::::" && tree $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot-* test-run-nsides-thresholds-dummy: - needs: test-build-docker runs-on: ubuntu-latest strategy: fail-fast: false @@ -231,8 +264,10 @@ jobs: 0.65, ] env: - SKYSCAN_BROKER_ADDRESS: user1@localhost/test - SKYSCAN_BROKER_AUTH: password # using this would override password in address + SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS: user1@localhost/test + SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN: password # using this would override password in address + SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS: user1@localhost/test + SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN: password # using this would override password in address services: rabbitmq: # see image.tag -> https://github.com/Observation-Management-Service/path-kubernetes/blob/main/helm-values-rabbitmq-bitnami.yaml (see https://artifacthub.io/packages/helm/bitnami/rabbitmq/11.14.3) @@ -256,18 +291,16 @@ jobs: - uses: jlumbroso/free-disk-space@main # need space for mq broker and image with: docker-images: false - - uses: actions/checkout@v3 - - uses: docker/setup-buildx-action@v2 - - uses: docker/build-push-action@v3 + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/build-push-action@v6 with: context: . - cache-from: type=gha - # cache-to: type=gha,mode=min - file: Dockerfile - tags: icecube/skymap_scanner:local + file: ./Dockerfile + tags: ${{ env.CI_DOCKER_IMAGE_TAG }} load: true - name: run - timeout-minutes: 10 # on average max~=5min + timeout-minutes: 12 # on average max~=8.5min run: | set -x source tests/env-vars.sh @@ -277,13 +310,13 @@ jobs: export _PREDICTIVE_SCANNING_THRESHOLD=${{ matrix.predictive_scanning_threshold }} cd ./resources/launch_scripts - ./local-scan.sh 2 $CI_TEST_RUN_STDOUT_STDERR_DIR + ./local-scan.sh $N_WORKERS $CI_TEST_RUN_STDOUT_STDERR_DIR - name: check no nsides skipped run: | - ls $SKYSCAN_OUTPUT_DIR + ls $CI_SKYSCAN_OUTPUT_DIR # get newest run*.json - export outfile=$(find $SKYSCAN_OUTPUT_DIR -type f -name "run*.json" -exec stat -c '%y %n' {} + | sort | tail -1 | awk '{print $4}') + export outfile=$(find $CI_SKYSCAN_OUTPUT_DIR -type f -name "run*.json" -exec stat -c '%y %n' {} + | sort | tail -1 | awk '{print $4}') echo $outfile python3 -c ' import json @@ -298,34 +331,40 @@ jobs: if: always() run: | cat $CI_TEST_RUN_STDOUT_STDERR_DIR/server.out - - name: client worker \#1 stdout/stderr + - name: worker pilot \#1 stdout/stderr if: always() run: | - cat $CI_TEST_RUN_STDOUT_STDERR_DIR/client-1.out - - name: client worker \#2 stdout/stderr + more $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot.out | cat + - name: worker clients / reco-icetray instances \#1 stdouts/stderrs if: always() run: | - cat $CI_TEST_RUN_STDOUT_STDERR_DIR/client-2.out - - - name: reco-icetray logs - if: always() + find $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot-* -name "stderrfile" -o -name "stdoutfile" | xargs more | cat + echo "::::::::::::::" && tree $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot-* + - name: worker pilot \#2 stdout/stderr + if: always() && env.N_WORKERS == '2' + run: | + more $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot.out | cat + - name: worker clients / reco-icetray instances \#2 stdouts/stderrs + if: always() && env.N_WORKERS == '2' run: | - sudo apt install tree - tree $SKYSCAN_DEBUG_DIR - find $SKYSCAN_DEBUG_DIR -type f -not -name "*.i3" -not -name "*.pkl" -exec "more" {} + | cat # recursively cats with filenames (delimited by :::::::) + find $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot-* -name "stderrfile" -o -name "stdoutfile" | xargs more | cat + echo "::::::::::::::" && tree $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot-* + - - name: broker docker logs + - name: rabbitmq logs if: always() run: | docker logs rabbitmq test-run-crash-dummy: - needs: test-build-docker runs-on: ubuntu-latest env: - SKYSCAN_BROKER_ADDRESS: user1@localhost/test - SKYSCAN_BROKER_AUTH: password # using this would override password in address + SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS: user1@localhost/test + SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN: password # using this would override password in address + SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS: user1@localhost/test + SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN: password # using this would override password in address + EWMS_PILOT_STOP_LISTENING_ON_TASK_ERROR: False # we want many crashes services: rabbitmq: # see image.tag -> https://github.com/Observation-Management-Service/path-kubernetes/blob/main/helm-values-rabbitmq-bitnami.yaml (see https://artifacthub.io/packages/helm/bitnami/rabbitmq/11.14.3) @@ -349,38 +388,42 @@ jobs: - uses: jlumbroso/free-disk-space@main # need space for mq broker and image with: docker-images: false - - uses: actions/checkout@v3 - - uses: docker/setup-buildx-action@v2 - - uses: docker/build-push-action@v3 + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/build-push-action@v6 with: context: . - cache-from: type=gha - # cache-to: type=gha,mode=min - file: Dockerfile - tags: icecube/skymap_scanner:local + file: ./Dockerfile + tags: ${{ env.CI_DOCKER_IMAGE_TAG }} load: true - name: run - timeout-minutes: 10 # on average ~6min # yes, `timeout` is used below but this is insurance + timeout-minutes: 12 # on average ~9min # yes, `timeout` is used below but this is insurance run: | set -x source tests/env-vars.sh export _RECO_ALGO=crash_dummy export _EVENTS_FILE=$(realpath $REALTIME_EVENTS_DIR/hese_event_01.json) export _NSIDES="1:0 2:12 4:12" - export SKYSCAN_CRASH_DUMMY_PROBABILITY=0.75 + export _SKYSCAN_CI_CRASH_DUMMY_PROBABILITY=0.75 export EWMS_PILOT_TASK_TIMEOUT=15 cd ./resources/launch_scripts - # since SKYSCAN_CRASH_DUMMY_PROBABILITY<1, this step will go forever + # since _SKYSCAN_CI_CRASH_DUMMY_PROBABILITY<1, this step will go forever # so, stop it after some time and chek that it has stderrfiles - timeout 240 ./local-scan.sh 2 $CI_TEST_RUN_STDOUT_STDERR_DIR || true + timeout 240 ./local-scan.sh $N_WORKERS $CI_TEST_RUN_STDOUT_STDERR_DIR || true - name: look at stderrfiles run: | set -x - # check for fails - stderrfiles=$(find $SKYSCAN_DEBUG_DIR/ -name "stderrfile" -type f -exec sh -c "tail -1 {} | sed -e 's/^.*DEBUG //p' " \; | sort | uniq -c) - echo $stderrfiles - if [ -z "${stderrfiles}" ]; then + # check for fails/errors + + error_type_1='intentional crash-dummy error' + error_type_2='subprocess timed out after' + pattern="$error_type_1|$error_type_2" + + if find "$CI_TEST_RUN_STDOUT_STDERR_DIR/worker-"*/pilot.out -type f -exec grep -qE "$pattern" {} +; then + echo "Match(es) found: PilotSubprocessError and/or TimeoutError occurred." + else + echo "No matches found." exit 1 fi @@ -388,18 +431,27 @@ jobs: if: always() run: | cat $CI_TEST_RUN_STDOUT_STDERR_DIR/server.out - - name: client worker \#1 stdout/stderr + - name: worker pilot \#1 stdout/stderr if: always() run: | - cat $CI_TEST_RUN_STDOUT_STDERR_DIR/client-1.out - - name: client worker \#2 stdout/stderr + more $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot.out | cat + - name: worker clients / reco-icetray instances \#1 stdouts/stderrs if: always() run: | - cat $CI_TEST_RUN_STDOUT_STDERR_DIR/client-2.out + find $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot-* -name "stderrfile" -o -name "stdoutfile" | xargs more | cat + echo "::::::::::::::" && tree $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot-* + - name: worker pilot \#2 stdout/stderr + if: always() && env.N_WORKERS == '2' + run: | + more $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot.out | cat + - name: worker clients / reco-icetray instances \#2 stdouts/stderrs + if: always() && env.N_WORKERS == '2' + run: | + find $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot-* -name "stderrfile" -o -name "stdoutfile" | xargs more | cat + echo "::::::::::::::" && tree $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot-* test-run-realistic: - needs: test-build-docker runs-on: ubuntu-latest strategy: fail-fast: false @@ -423,8 +475,10 @@ jobs: - reco_algo: splinempe_pointed eventfile: hese_event_01.json env: - SKYSCAN_BROKER_ADDRESS: user1@localhost/test - SKYSCAN_BROKER_AUTH: password # using this would override password in address + SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS: user1@localhost/test + SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN: password # using this would override password in address + SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS: user1@localhost/test + SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN: password # using this would override password in address services: rabbitmq: # see image.tag -> https://github.com/Observation-Management-Service/path-kubernetes/blob/main/helm-values-rabbitmq-bitnami.yaml (see https://artifacthub.io/packages/helm/bitnami/rabbitmq/11.14.3) @@ -451,18 +505,17 @@ jobs: - uses: actionhippie/swap-space@v1 with: size: 10G - - uses: actions/checkout@v3 - - uses: docker/setup-buildx-action@v2 - - uses: docker/build-push-action@v3 + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/build-push-action@v6 with: context: . - cache-from: type=gha - # cache-to: type=gha,mode=min - file: Dockerfile - tags: icecube/skymap_scanner:local + file: ./Dockerfile + tags: ${{ env.CI_DOCKER_IMAGE_TAG }} load: true + - name: run - timeout-minutes: 35 # on average max~=26min + timeout-minutes: 45 # on average max~=35min run: | set -x lscpu @@ -473,13 +526,13 @@ jobs: export _NSIDES="1:0" cd ./resources/launch_scripts - ./local-scan.sh 2 $CI_TEST_RUN_STDOUT_STDERR_DIR + ./local-scan.sh $N_WORKERS $CI_TEST_RUN_STDOUT_STDERR_DIR - name: test output against known result (.json) run: | - ls $SKYSCAN_OUTPUT_DIR + ls $CI_SKYSCAN_OUTPUT_DIR # get newest run*.json - outfile=$(find $SKYSCAN_OUTPUT_DIR -type f -name "run*.json" -exec stat -c '%y %n' {} + | sort | tail -1 | awk '{print $4}') + outfile=$(find $CI_SKYSCAN_OUTPUT_DIR -type f -name "run*.json" -exec stat -c '%y %n' {} + | sort | tail -1 | awk '{print $4}') echo $outfile cat $outfile pip install . # don't need icecube, so no docker container needed @@ -493,51 +546,53 @@ jobs: if: always() run: | cat $CI_TEST_RUN_STDOUT_STDERR_DIR/server.out - - name: client worker \#1 stdout/stderr + - name: worker pilot \#1 stdout/stderr if: always() run: | - cat $CI_TEST_RUN_STDOUT_STDERR_DIR/client-1.out - - name: client worker \#2 stdout/stderr + more $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot.out | cat + - name: worker clients / reco-icetray instances \#1 stdouts/stderrs if: always() run: | - cat $CI_TEST_RUN_STDOUT_STDERR_DIR/client-2.out - - - name: reco-icetray logs - if: always() + find $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot-* -name "stderrfile" -o -name "stdoutfile" | xargs more | cat + echo "::::::::::::::" && tree $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-1/pilot-* + - name: worker pilot \#2 stdout/stderr + if: always() && env.N_WORKERS == '2' run: | - sudo apt install tree - tree $SKYSCAN_DEBUG_DIR - find $SKYSCAN_DEBUG_DIR -type f -not -name "*.i3" -not -name "*.pkl" -exec "more" {} + | cat # recursively cats with filenames (delimited by :::::::) + more $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot.out | cat + - name: worker clients / reco-icetray instances \#2 stdouts/stderrs + if: always() && env.N_WORKERS == '2' + run: | + find $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot-* -name "stderrfile" -o -name "stdoutfile" | xargs more | cat + echo "::::::::::::::" && tree $CI_TEST_RUN_STDOUT_STDERR_DIR/worker-2/pilot-* - - name: broker docker logs + - name: rabbitmq logs if: always() run: | docker logs rabbitmq test-file-staging: - needs: test-build-docker runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: docker/setup-buildx-action@v2 - - uses: docker/build-push-action@v3 + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/build-push-action@v6 with: context: . - cache-from: type=gha - # cache-to: type=gha,mode=min - file: Dockerfile - tags: icecube/skymap_scanner:local + file: ./Dockerfile + tags: ${{ env.CI_DOCKER_IMAGE_TAG }} load: true - name: run run: | + source tests/env-vars.sh + docker run --rm -i \ - icecube/skymap_scanner:local \ + $(env | grep -E '^(SKYSCAN_|_SKYSCAN_)' | cut -d'=' -f1 | sed 's/^/--env /') \ + $CI_DOCKER_IMAGE_TAG \ python tests/file_staging.py test-run-single-pixel: - needs: test-build-docker runs-on: ubuntu-latest strategy: fail-fast: false @@ -561,42 +616,37 @@ jobs: - reco_algo: splinempe_pointed dir: "JSON" steps: - - uses: actions/checkout@v3 - - uses: docker/setup-buildx-action@v2 - - uses: docker/build-push-action@v3 + - uses: actions/checkout@v4 + - uses: docker/setup-buildx-action@v3 + - uses: docker/build-push-action@v6 with: context: . - cache-from: type=gha - # cache-to: type=gha,mode=min - file: Dockerfile - tags: icecube/skymap_scanner:local + file: ./Dockerfile + tags: ${{ env.CI_DOCKER_IMAGE_TAG }} load: true - name: run - timeout-minutes: 15 # on average max~=7min + timeout-minutes: 10 # on average max~=5min run: | set -e source tests/env-vars.sh - - # grab the GCDQp_packet key and throw into a file - jq '.GCDQp_packet' \ - tests/data/reco_pixel_single/${{ matrix.reco_algo }}/${{ matrix.dir }}/startup.json > \ - tests/data/reco_pixel_single/${{ matrix.reco_algo }}/${{ matrix.dir }}//GCDQp_packet.json - # run reco + # run reco directly docker run --network="host" --rm -i \ --shm-size=6gb \ --mount type=bind,source=$(readlink -f tests/data/reco_pixel_single/${{ matrix.reco_algo }}/${{ matrix.dir }}),target=/local/test-data \ --env PY_COLORS=1 \ - $(env | grep '^SKYSCAN_' | awk '$0="--env "$0') \ - icecube/skymap_scanner:local \ - python -m skymap_scanner.client.reco_icetray \ + $(env | grep -E '^(SKYSCAN_|_SKYSCAN_)' | cut -d'=' -f1 | sed 's/^/--env /') \ + $CI_DOCKER_IMAGE_TAG \ + python -m skymap_scanner.client \ --infile /local/test-data/in.json \ - --gcdqp-packet-json /local/test-data/GCDQp_packet.json \ - --baseline-gcd-file $(jq -r '.baseline_GCD_file' tests/data/reco_pixel_single/${{ matrix.reco_algo }}/${{ matrix.dir }}/startup.json) \ + --client-startup-json /local/test-data/startup.json \ --outfile /local/test-data/out-actual.json - name: test output against known result run: | + source tests/env-vars.sh + + ls tests/data/reco_pixel_pkls/${{ matrix.reco_algo }}/${{ matrix.dir }} ls tests/data/reco_pixel_single/${{ matrix.reco_algo }}/${{ matrix.dir }} # need icecube for depickling, so docker container needed @@ -604,8 +654,8 @@ jobs: --shm-size=6gb \ --mount type=bind,source=$(readlink -f tests/data/reco_pixel_single/${{ matrix.reco_algo }}/${{ matrix.dir }}),target=/local/test-data \ --env PY_COLORS=1 \ - $(env | grep '^SKYSCAN_' | awk '$0="--env "$0') \ - icecube/skymap_scanner:local \ + $(env | grep -E '^(SKYSCAN_|_SKYSCAN_)' | cut -d'=' -f1 | sed 's/^/--env /') \ + $CI_DOCKER_IMAGE_TAG \ python tests/compare_reco_pixel_single.py \ --actual /local/test-data/out-actual.json \ --expected /local/test-data/out.json \ @@ -628,17 +678,17 @@ jobs: py-setup, py-dependencies, test-build-docker, - test-run-singularity-dummy-reco, + test-run-dummy, test-file-staging, test-run-nsides-thresholds-dummy, test-run-crash-dummy, test-run-realistic, - test-run-single-pixel + test-run-single-pixel, ] runs-on: ubuntu-latest concurrency: release steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 token: ${{ secrets.PERSONAL_ACCESS_TOKEN }} diff --git a/.gitignore b/.gitignore index 649d5897c..38881a12f 100644 --- a/.gitignore +++ b/.gitignore @@ -173,3 +173,4 @@ plots-*/ !dependencies*.log device-refresh-token +resources/launch_scripts/ewms-v1/scan-dir-* diff --git a/Dockerfile b/Dockerfile index a659cca3c..37dd1f4b0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ # ARG ICETRAY_VERSION=v1.9.1-ubuntu22.04-X64 -FROM icecube/icetray:icetray-prod-$ICETRAY_VERSION as prod +FROM icecube/icetray:icetray-prod-$ICETRAY_VERSION AS prod RUN mkdir -p /opt/i3-data/baseline_gcds && \ wget -nv -N -t 5 -P /opt/i3-data/baseline_gcds -r -l 1 -A *.i3* -nd http://prod-exe.icecube.wisc.edu/baseline_gcds/ && \ @@ -45,8 +45,7 @@ RUN mkdir -p /opt/i3-data/baseline_gcds && \ # WORKDIR /local COPY . . -# client-starter fails to install on architectures not supporting htcondor, so silently fail without the extra -RUN pip install .[client-starter,rabbitmq] || pip install .[rabbitmq] +RUN pip install .[rabbitmq] RUN pip freeze diff --git a/Dockerfile_no_cvmfs b/Dockerfile_no_cvmfs index d06b8864c..e7555d928 100644 --- a/Dockerfile_no_cvmfs +++ b/Dockerfile_no_cvmfs @@ -1,6 +1,6 @@ ARG SKYMAP_SCANNER_VERSION=latest -FROM icecube/skymap_scanner:$SKYMAP_SCANNER_VERSION as prod +FROM icecube/skymap_scanner:$SKYMAP_SCANNER_VERSION AS prod RUN mkdir -p /opt/i3-data/baseline_gcds && \ diff --git a/README.md b/README.md index 515119cd9..b6521b882 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ [![GitHub release (latest by date including pre-releases)](https://img.shields.io/github/v/release/icecube/skymap_scanner?include_prereleases)](https://github.com/icecube/skymap_scanner/) [![Lines of code](https://img.shields.io/tokei/lines/github/icecube/skymap_scanner)](https://github.com/icecube/skymap_scanner/) [![GitHub issues](https://img.shields.io/github/issues/icecube/skymap_scanner)](https://github.com/icecube/skymap_scanner/issues?q=is%3Aissue+sort%3Aupdated-desc+is%3Aopen) [![GitHub pull requests](https://img.shields.io/github/issues-pr/icecube/skymap_scanner)](https://github.com/icecube/skymap_scanner/pulls?q=is%3Apr+sort%3Aupdated-desc+is%3Aopen) + # Skymap Scanner v3 A distributed system that performs a likelihood scan of event directions for real-time alerts using inter-CPU queue-based message passing. @@ -9,269 +10,81 @@ Skymap Scanner is the computational core of the [SkyDriver orchestration service `skymap_scanner` is a python package containing two distinct applications meant to be deployed within containers (1 `skymap_scanner.server`, n `skymap_scanner.client`s), along with `skymap_scanner.utils` (utility functions) and `skymap_scanner.recos` (`icetray` reco-specific logic). Additional, package-independent, utility scripts are in `resources/utils/`. -## Queue Types - -The default queue type in the container is RabbitMQ, since v3.1.0. -Build `Dockerfile_pulsar` for a pulsar container. - -### RabbitMQ -Env variables - -``` -# export SKYSCAN_BROKER_CLIENT=rabbitmq # rabbitmq is the default so env var is not needed -export SKYSCAN_BROKER_ADDRESS=/ -export SKYSCAN_BROKER_AUTH= -export EWMS_PILOT_QUARANTINE_TIME=1200 # helps decrease condor blackhole nodes -export EWMS_PILOT_TASK_TIMEOUT=1200 -``` - -Currently, RabbitMQ uses URL parameters for the hostname, virtual host, and port (`[https://]HOST[:PORT][/VIRTUAL_HOST]`). The heartbeat is configured by `EWMS_PILOT_TASK_TIMEOUT`. This may change in future updates. - -Python install: -``` -pip install .[rabbitmq] -``` - -### Pulsar -Env variables - -``` -export SKYSCAN_BROKER_CLIENT=pulsar -export SKYSCAN_BROKER_ADDRESS= -export SKYSCAN_BROKER_AUTH= -export EWMS_PILOT_QUARANTINE_TIME=1200 # helps decrease condor blackhole nodes -export EWMS_PILOT_TASK_TIMEOUT=1200 -``` - -Python install: -``` -pip install .[pulsar] -``` - -## Example -This example is for the rabbitmq (default) broker. Steps for using a pulsar broker are similar and differences are noted throughout this example. The predominant difference is noted in [Queue Types](queue-types) (`Dockerfile_pulsar`). - -### Example Startup -You will need to get a rabbitmq broker address and authentication token to pass to both the server and client. Send a poke on slack #skymap-scanner to get those! - -#### 1. Launch the Server -The server can be launched from anywhere with a stable network connection. You can run it from the cobalts for example. - -##### Figure Your Args -###### Environment Variables -``` -export SKYSCAN_BROKER_ADDRESS=BROKER_ADDRESS -# export SKYSCAN_BROKER_CLIENT=rabbitmq # rabbitmq is the default so env var is not needed -export SKYSCAN_BROKER_AUTH=$(cat ~/skyscan-broker.token) # obfuscated for security -export EWMS_PILOT_TASK_TIMEOUT=1200 -``` -###### Command-Line Arguments -``` - --client-startup-json PATH_TO_CLIENT_STARTUP_JSON \ - --cache-dir `pwd`/server_cache \ - --output-dir `pwd` \ - --reco-algo millipede_original \ - --event-file `pwd`/run00136662-evt000035405932-BRONZE.pkl # could also be a .json file -``` -_NOTE: The `--*dir` arguments can all be the same if you'd like. Relative paths are also fine._ -_NOTE: There are more CL arguments not shown. They have defaults._ -###### `client-startup.json` -The server will create a `PATH_TO_CLIENT_STARTUP_JSON` file that has necessary info to launch a client. the parent directory of `--client-startup-json` needs to be somewhere accessible by your client launch script, whether that's via condor or manually. -##### Run It -###### with Singularity -``` -singularity run /cvmfs/icecube.opensciencegrid.org/containers/realtime/skymap_scanner:x.y.z" \ - python -m skymap_scanner.server \ - YOUR_ARGS -``` -###### or with Docker -``` -# side note: you may want to first set environment variables, see below -./resources/launch_scripts/docker/launch_server.sh \ - YOUR_ARGS -``` -_NOTE: By default the launch script will pull, build, and run the latest image from Docker Hub. You can optionally set environment variables to configure how to find a particular tag. For example:_ -``` -export SKYSCAN_DOCKER_IMAGE_TAG='x.y.z' # defaults to 'latest' -export SKYSCAN_DOCKER_PULL_ALWAYS=0 # defaults to 1 which maps to '--pull=always' -export EWMS_PILOT_TASK_TIMEOUT=1200 -``` - -#### 2. Launch Each Client -The client jobs can submitted via HTCondor from sub-2. Running the script below should create a condor submit file requesting the number of workers specified. You'll need to give it the same `SKYSCAN_BROKER_ADDRESS` and `BROKER_AUTH` as the server, and the path to the client-startup json file created by the server. - -##### Figure Your Args -###### Environment Variables -``` -export SKYSCAN_BROKER_ADDRESS=BROKER_ADDRESS -# export SKYSCAN_BROKER_CLIENT=rabbitmq # rabbitmq is the default so env var is not needed -export SKYSCAN_BROKER_AUTH=$(cat ~/skyscan-broker.token) # obfuscated for security -export EWMS_PILOT_QUARANTINE_TIME=1200 # helps decrease condor blackhole nodes -export EWMS_PILOT_TASK_TIMEOUT=1200 -``` -###### Command-Line Arguments -_See notes about `--client-startup-json` below. See `client.py` for additional optional args._ -##### Run It -###### with Condor (via Singularity) -You'll want to put your `skymap_scanner.client` args in a JSON file, then pass that to the helper script. -``` -echo my_client_args.json # just an example -./resources/client_starter.py \ - --jobs #### \ - --memory #GB \ - --singularity-image URL_OR_PATH_TO_SINGULARITY_IMAGE \ - --client-startup-json PATH_TO_CLIENT_STARTUP_JSON \ - --client-args-json my_client_args.json -``` -_NOTE: `client_starter.py` will wait until `--client-startup-json PATH_TO_CLIENT_STARTUP_JSON` exists, since it needs to file-transfer it to the worker node. Similarly, the client's `--client-startup-json` is auto-set by the script and thus, is disallowed from being in the `--client-args` arguments._ -###### or Manually (Docker) -``` -# side note: you may want to first set environment variables, see below -./resources/launch_scripts/wait_for_file.sh PATH_TO_CLIENT_STARTUP_JSON 600 -./resources/launch_scripts/docker/launch_client.sh \ - --client-startup-json PATH_TO_CLIENT_STARTUP_JSON \ - YOUR_ARGS -``` -_NOTE: By default the launch script will pull, build, and run the latest image from Docker Hub. You can optionally set environment variables to configure how to find a particular tag. For example:_ -``` -export SKYSCAN_DOCKER_IMAGE_TAG='x.y.z' # defaults to 'latest' -export SKYSCAN_DOCKER_PULL_ALWAYS=0 # defaults to 1 which maps to '--pull=always' -``` - -#### 3. Results -When the server is finished processing reconstructions, it will write a single `.npz` file to `--output-dir`. See `skymap_scanner.utils.scan_result` for more detail. - -#### 4. Cleanup & Error Handling -The server will exit on its own once it has received and processed all the reconstructions. The server will write a directory, like `run00127907.evt000020178442.HESE/`, to `--cache-dir`. The clients will exit according to their receiving-queue's timeout value (`SKYSCAN_MQ_TIMEOUT_TO_CLIENTS`), unless they are killed manually (`condor_rm`). - -All will exit on fatal errors (for clients, use HTCondor to manage re-launching). The in-progress pixel reconstruction is abandoned when a client fails, so there is no concern for duplicate reconstructions at the server. The pre-reconstructed pixel will be re-queued to be delivered to a different client. - -### In-Production Usage Note: Converting i3 to json and scaling up -You may want to run on events stored in i3 files. To convert those into a json format readable by the scanner, you can do -``` -cd resources/utils -python i3_to_json.py --basegcd /data/user/followup/baseline_gcds/baseline_gcd_136897.i3 EVENT_GCD.i3 EVENT_FILE.i3 -``` -This will pull all the events in the i3 file into `run*.evt*.json` which can be passed as an argument to the server. - -For now, it's easy to scale up using the command line. Multiple server instances can be run simultaneously and a separate submit file created for each one. To run `N` servers in parallel - -``` -export SKYSCAN_BROKER_ADDRESS=BROKER_ADDRESS -export SKYSCAN_BROKER_CLIENT=rabbitmq -export SKYSCAN_BROKER_AUTH=$(cat ~/skyscan-broker.token) # obfuscated for security -ls *.json | xargs -n1 -PN -I{} bash -c 'mkdir /path/to/json/{} && python -m skymap_scanner.server --client-startup-json /path/to/json/{}/client-startup.json --cache-dir /path/to/cache --output-dir /path/to/out --reco-algo RECO_ALGO --event-file /path/to/data/{}' -``` - -Then, from sub-2 run `ls *.json |xargs -I{} bash -c 'sed "s/UID/{}/g" ../condor > /scratch/$USER/{}.condor'` using the template condor submit file below. Then you should be able to just run: -``` -ls /scratch/$USER/run*.condor | head -nN | xargs -I{} condor_submit {} -``` -``` -executable = /bin/sh -arguments = /usr/local/icetray/env-shell.sh python -m skymap_scanner.client --client-startup-json ./client-startup.json -+SingularityImage = "/cvmfs/icecube.opensciencegrid.org/containers/realtime/skymap_scanner:x.y.z" -environment = "SKYSCAN_BROKER_AUTH=AUTHTOKEN SKYSCAN_BROKER_ADDRESS=BROKER_ADDRESS EWMS_PILOT_TASK_TIMEOUT=1200 EWMS_PILOT_QUARANTINE_TIME=1200" -Requirements = HAS_CVMFS_icecube_opensciencegrid_org && has_avx -output = /scratch/$USER/UID.out -error = /scratch/$USER/UID.err -log = /scratch/$USER/UID.log -+FileSystemDomain = "blah" -should_transfer_files = YES -transfer_input_files = /path/to/json/UID/client-startup.json -request_cpus = 1 -request_memory = 8GB -notification = Error -queue 300 -``` - -You may also need to add this line to the condor submit file if running `millipede_wilks` as some resources have been removed from the image. -``` -environment = "APPTAINERENV_I3_DATA=/cvmfs/icecube.opensciencegrid.org/data SINGULARITYENV_I3_DATA=/cvmfs/icecube.opensciencegrid.org/data I3_DATA=/cvmfs/icecube.opensciencegrid.org/data I3_TESTDATA=/cvmfs/icecube.opensciencegrid.org/data/i3-test-data-svn/trunk" -``` -The extra envs for `I3_DATA` are to ensure it gets passed through for use inside the container. Additionally, ftp-v1 ice was introduced in v3.4.0. - -### Additional Configuration -#### Environment Variables -When the server and client(s) are launched within Docker containers, all environment variables must start with `SKYSCAN_` in order to be auto-copied forward by the [launch scripts](#how-to-run). `EWMS_`-prefixed variables are also forwarded. See `skymap_scanner.config.ENV` for more detail. -##### Timeouts -The Skymap Scanner is designed to have realistic timeouts for HTCondor. That said, there are three main timeouts which can be altered: -``` - # seconds -- how long client waits between receiving pixels before thinking event scan is 100% done - # - set to `max(reco duration) + max(subsequent iteration startup time)` - # - think about starved clients - # - normal expiration scenario: the scan is done, no more pixels to scan (alternative: manually kill client process) - SKYSCAN_MQ_TIMEOUT_TO_CLIENTS: int = 60 * 30 # 30 mins - # - # seconds -- how long server waits before thinking all clients are dead - # - set to duration of first reco + client launch (condor) - # - important if clients launch *AFTER* server - # - normal expiration scenario: all clients died (bad condor submit file), otherwise never (server knows when all recos are done) - SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS: int = 3 * 24 * 60 * 60 # 3 days - # - # seconds -- how long client waits before first message (set to duration of server startup) - # - important if clients launch *BEFORE* server - # - normal expiration scenario: server died (ex: tried to read corrupted event file), otherwise never - SKYSCAN_MQ_CLIENT_TIMEOUT_WAIT_FOR_FIRST_MESSAGE: int = 60 * 60 # 60 mins -``` -Relatedly, the environment variable `EWMS_PILOT_TASK_TIMEOUT` & `EWMS_PILOT_QUARANTINE_TIME` can also be configured (see [1. Launch the Server](#1-launch-the-server) and [2. Launch Each Client](#2-launch-each-client)). - -#### Command-Line Arguments -There are more command-line arguments than those shown in [Example Startup](#example-startup). See `skymap_scanner.server.start_scan.main()` and `skymap_scanner.client.client.main()` for more detail. - -#### Runtime-Configurable Reconstructions -Recos are registered by being placed in a dedicated module within the `skymap_scanner.recos` sub-package. Each module must contain a class of the same name (eg: `skymap_scanner.recos.foo` has `skymap_scanner.recos.foo.Foo`) that fully inherits from `skymap_scanner.recos.RecoInterface`. This includes implementing the static methods: `traysegment()` (for IceTray) and `to_pixelreco()` (for MQ). The reco-specific logic in the upstream/pixel-generation phase is defined in the same class by the `prepare_frames()` (pulse cleaning, vertex generation) and `get_vertex_variations()` (variations of the vertex positions to be used as additional seeds for each pixel). On the command line, choosing your reco is provided via `--reco-algo` (on the server). +## Reconstructions Algorithms + +Reconstructions algorithms (reco algos) are registered by being placed in a dedicated module within the `skymap_scanner.recos` sub-package. Each module must contain a class of the same name (eg: `skymap_scanner.recos.foo` has `skymap_scanner.recos.foo.Foo`) that fully inherits from `skymap_scanner.recos.RecoInterface`. This includes implementing the static methods: `traysegment()` (for IceTray) and `to_pixelreco()` (for MQ). The reco-specific logic in the upstream/pixel-generation phase is defined in the same class by the `prepare_frames()` (pulse cleaning, vertex generation) and `get_vertex_variations()` (variations of the vertex positions to be used as additional seeds for each pixel). On the command line, choosing your reco is provided via `--reco-algo` (on the server). ## Making Branch-Based Images for Production-like Testing + If you need to test your updates in a production-like environment at a scale that isn't provided by CI, then create a branch-based image. This image will be available on Docker Hub and CVMFS. + ### Steps: + 1. Go to _Actions_ tab 1. Go to `docker & singularity/cvmfs releases` workflow tab (on left column) 1. Click _Run workflow_, select your branch, and click the _Run workflow_ button 1. Wait for the workflow steps to complete * You can check the workflow's progress by clicking the top-most entry (there will be a small delay after the previous step) 1. Check https://hub.docker.com/r/icecube/skymap_scanner/tags and/or CVMFS (the filepath will be the bottom-most line of https://github.com/WIPACrepo/cvmfs-actions/blob/main/docker_images.txt) + ### Note -The resulting image is specific to the branch's most recent commit. To test subsequent updates, you will need to repeat this process. +The resulting image is specific to the branch's most recent commit. To test subsequent updates, you will need to repeat this process. ## Data Types + These are the important data types within the scanner. Since memory-reduction is a consideration, some are persisted longer than others. ### Pixel-Like + There are 5 data types to represent a pixel-like thing in its various forms. In order of appearance: + #### 1. `(nside, pixel_id)`-tuple + - generated by `pixels.py` + #### 2. `I3Frame` + - generated by `PixelsToReco` - introduces position-variations (eg: `milipede_original`) - M `I3Frame` : 1 `(nside, pixel_id)`-tuple - sent to client(s), not persisted on the server - ~800 bytes + #### 3. `SentPixelVariation` + - used for tracking a single sent pixel variation - 1 `SentPixelVariation` : 1 `I3Frame` - persisted on the server in place of `I3Frame` - ~50 bytes + #### 4. `RecoPixelVariation` + - represents a pixel-variation reconstruction - sent from client to server, persisted on the server - 1 `RecoPixelVariation` : 1 `SentPixelVariation` - ~50 bytes + #### 5. `RecoPixelFinal` + - represents a final saved pixel post-reco (on the server only) - 1 `RecoPixelFinal` : M `RecoPixelVariation` - These types are saved in `nsides_dict` (`NSidesDict`) - ~50 bytes ### Sky Map-Like + Unlike pixel-like data types, these types are meant to exist as singular instances within the scanner. + #### `nsides_dict` (`NSidesDict`) + - a dict of dicts containing `RecoPixelFinal` objects, keyed by nside & pixel id - exists on the server only - grows as the scan progresses - not persisted past the lifetime of a scan + #### `skyreader.SkyScanResult` + - a class/object for using the result of a scan outside of the scanner (see [icecube/skyreader](https://github.com/icecube/skyreader)) - created at the end of the scan (from `nsides_dict`) * intermediate/incomplete instances exist only to be sent to SkyDriver @@ -280,6 +93,7 @@ Unlike pixel-like data types, these types are meant to exist as singular instanc - SkyDriver persists a serialized (JSON) version for each scan ## Versioning + The `MAJOR.MINOR.PATCH` versioning scheme is updated according to the following 1. `MAJOR`: Breaking change or other fundamental change in the skymap-scanner diff --git a/dependencies-from-Dockerfile.log b/dependencies-from-Dockerfile.log index 0501b230b..b5e370a85 100644 --- a/dependencies-from-Dockerfile.log +++ b/dependencies-from-Dockerfile.log @@ -33,13 +33,10 @@ decorator==4.4.2 defusedxml==0.7.1 docutils==0.20.1 et-xmlfile==1.0.1 -ewms-pilot==0.19.0 fonttools==4.29.1 fs==2.4.12 gast==0.5.2 healpy==1.15.0 -htchirp==3.0 -htcondor==23.10.1 html5lib==1.1 hypothesis==6.36.0 icecube-skyreader==1.3.0 @@ -70,7 +67,7 @@ numexpr==2.8.1 numpy==1.21.5 odfpy==1.4.2 olefile==0.46 -oms-mqclient==2.4.10 +oms-mqclient==2.6.0 openpyxl==3.0.9 packaging==21.3 pandas==1.3.5 @@ -136,9 +133,10 @@ urwid==2.1.2 wcwidth==0.2.5 webencodings==0.5.1 wipac-dev-tools==1.13.0 -wipac-rest-tools==1.8.0 +wipac-rest-tools==1.8.2 xlwt==1.3.0 zipp==1.0.0 +zstd==1.5.5.1 ######################################################################## # pipdeptree ######################################################################## @@ -155,7 +153,7 @@ breathe==4.35.0 ├── docutils [required: >=0.18.1,<0.21, installed: 0.20.1] ├── imagesize [required: >=1.3, installed: 1.4.1] ├── Jinja2 [required: >=3.0, installed: 3.0.3] - ├── packaging [required: >=21.0, installed: 24.1] + ├── packaging [required: >=21.0, installed: 24.2] ├── Pygments [required: >=2.14, installed: 2.17.2] ├── requests [required: >=2.25.0, installed: 2.25.1] ├── snowballstemmer [required: >=2.0, installed: 2.2.0] @@ -212,8 +210,8 @@ pickleshare==0.7.5 pika==1.3.2 Pillow==9.0.1 pipdeptree==2.23.4 -├── packaging [required: >=24.1, installed: 24.1] -└── pip [required: >=24.2, installed: 24.2] +├── packaging [required: >=24.1, installed: 24.2] +└── pip [required: >=24.2, installed: 24.3.1] pluggy==0.13.0 ply==3.11 prompt-toolkit==3.0.28 @@ -243,13 +241,6 @@ pyzmq==22.3.0 setuptools==59.6.0 six==1.16.0 skymap-scanner -├── ewms-pilot [required: ==0.19.0, installed: 0.19.0] -│ ├── htchirp [required: Any, installed: 3.0] -│ ├── htcondor [required: Any, installed: 23.10.1] -│ └── oms-mqclient [required: Any, installed: 2.4.10] -│ └── wipac-dev-tools [required: Any, installed: 1.13.0] -│ ├── requests [required: Any, installed: 2.25.1] -│ └── typing_extensions [required: Any, installed: 4.12.2] ├── healpy [required: Any, installed: 1.15.0] ├── icecube-skyreader [required: Any, installed: 1.3.0] │ ├── astropy [required: Any, installed: 5.0.2] @@ -265,15 +256,16 @@ skymap-scanner ├── iminuit [required: <2.27.0, installed: 2.26.0] │ └── numpy [required: >=1.21, installed: 1.21.5] ├── numpy [required: <2.0.0, installed: 1.21.5] -├── oms-mqclient [required: ==2.4.10, installed: 2.4.10] -│ └── wipac-dev-tools [required: Any, installed: 1.13.0] -│ ├── requests [required: Any, installed: 2.25.1] -│ └── typing_extensions [required: Any, installed: 4.12.2] +├── oms-mqclient [required: Any, installed: 2.6.0] +│ ├── wipac-dev-tools [required: Any, installed: 1.13.0] +│ │ ├── requests [required: Any, installed: 2.25.1] +│ │ └── typing_extensions [required: Any, installed: 4.12.2] +│ └── zstd [required: Any, installed: 1.5.5.1] ├── scipy [required: Any, installed: 1.8.0] ├── wipac-dev-tools [required: Any, installed: 1.13.0] │ ├── requests [required: Any, installed: 2.25.1] │ └── typing_extensions [required: Any, installed: 4.12.2] -└── wipac-rest-tools [required: Any, installed: 1.8.0] +└── wipac-rest-tools [required: Any, installed: 1.8.2] ├── cachetools [required: Any, installed: 5.5.0] ├── PyJWT [required: !=2.6.0, installed: 2.9.0] ├── qrcode [required: Any, installed: 8.0] diff --git a/dependencies-from-Dockerfile_no_cvmfs.log b/dependencies-from-Dockerfile_no_cvmfs.log index f610426d7..26e64a383 100644 --- a/dependencies-from-Dockerfile_no_cvmfs.log +++ b/dependencies-from-Dockerfile_no_cvmfs.log @@ -33,13 +33,10 @@ decorator==4.4.2 defusedxml==0.7.1 docutils==0.20.1 et-xmlfile==1.0.1 -ewms-pilot==0.19.0 fonttools==4.29.1 fs==2.4.12 gast==0.5.2 healpy==1.15.0 -htchirp==3.0 -htcondor==23.10.1 html5lib==1.1 hypothesis==6.36.0 icecube-skyreader==1.3.0 @@ -70,7 +67,7 @@ numexpr==2.8.1 numpy==1.21.5 odfpy==1.4.2 olefile==0.46 -oms-mqclient==2.4.10 +oms-mqclient==2.6.0 openpyxl==3.0.9 packaging==21.3 pandas==1.3.5 @@ -136,9 +133,10 @@ urwid==2.1.2 wcwidth==0.2.5 webencodings==0.5.1 wipac-dev-tools==1.13.0 -wipac-rest-tools==1.8.0 +wipac-rest-tools==1.8.2 xlwt==1.3.0 zipp==1.0.0 +zstd==1.5.5.1 ######################################################################## # pipdeptree ######################################################################## @@ -155,7 +153,7 @@ breathe==4.35.0 ├── docutils [required: >=0.18.1,<0.21, installed: 0.20.1] ├── imagesize [required: >=1.3, installed: 1.4.1] ├── Jinja2 [required: >=3.0, installed: 3.0.3] - ├── packaging [required: >=21.0, installed: 24.1] + ├── packaging [required: >=21.0, installed: 24.2] ├── Pygments [required: >=2.14, installed: 2.17.2] ├── requests [required: >=2.25.0, installed: 2.25.1] ├── snowballstemmer [required: >=2.0, installed: 2.2.0] @@ -212,8 +210,8 @@ pickleshare==0.7.5 pika==1.3.2 Pillow==9.0.1 pipdeptree==2.23.4 -├── packaging [required: >=24.1, installed: 24.1] -└── pip [required: >=24.2, installed: 24.2] +├── packaging [required: >=24.1, installed: 24.2] +└── pip [required: >=24.2, installed: 24.3.1] pluggy==0.13.0 ply==3.11 prompt-toolkit==3.0.28 @@ -243,13 +241,6 @@ pyzmq==22.3.0 setuptools==59.6.0 six==1.16.0 skymap-scanner -├── ewms-pilot [required: ==0.19.0, installed: 0.19.0] -│ ├── htchirp [required: Any, installed: 3.0] -│ ├── htcondor [required: Any, installed: 23.10.1] -│ └── oms-mqclient [required: Any, installed: 2.4.10] -│ └── wipac-dev-tools [required: Any, installed: 1.13.0] -│ ├── requests [required: Any, installed: 2.25.1] -│ └── typing_extensions [required: Any, installed: 4.12.2] ├── healpy [required: Any, installed: 1.15.0] ├── icecube-skyreader [required: Any, installed: 1.3.0] │ ├── astropy [required: Any, installed: 5.0.2] @@ -265,15 +256,16 @@ skymap-scanner ├── iminuit [required: <2.27.0, installed: 2.26.0] │ └── numpy [required: >=1.21, installed: 1.21.5] ├── numpy [required: <2.0.0, installed: 1.21.5] -├── oms-mqclient [required: ==2.4.10, installed: 2.4.10] -│ └── wipac-dev-tools [required: Any, installed: 1.13.0] -│ ├── requests [required: Any, installed: 2.25.1] -│ └── typing_extensions [required: Any, installed: 4.12.2] +├── oms-mqclient [required: Any, installed: 2.6.0] +│ ├── wipac-dev-tools [required: Any, installed: 1.13.0] +│ │ ├── requests [required: Any, installed: 2.25.1] +│ │ └── typing_extensions [required: Any, installed: 4.12.2] +│ └── zstd [required: Any, installed: 1.5.5.1] ├── scipy [required: Any, installed: 1.8.0] ├── wipac-dev-tools [required: Any, installed: 1.13.0] │ ├── requests [required: Any, installed: 2.25.1] │ └── typing_extensions [required: Any, installed: 4.12.2] -└── wipac-rest-tools [required: Any, installed: 1.8.0] +└── wipac-rest-tools [required: Any, installed: 1.8.2] ├── cachetools [required: Any, installed: 5.5.0] ├── PyJWT [required: !=2.6.0, installed: 2.9.0] ├── qrcode [required: Any, installed: 8.0] diff --git a/resources/launch_scripts/docker/launch_client.sh b/resources/launch_scripts/docker/launch_client.sh deleted file mode 100755 index 0908e56b6..000000000 --- a/resources/launch_scripts/docker/launch_client.sh +++ /dev/null @@ -1,81 +0,0 @@ -#!/bin/bash -set -e - -######################################################################## -# -# Launch a Skymap Scanner client -# -# Pass in the arguments as if this were just the python sub-module -# -######################################################################## - - -# Get & transform arguments that are files/dirs for docker-mounting -# yes, this is simpler than a bash-native solution -export ARGS="$*" # all of the arguments stuck together into a single string -echo $ARGS -DOCKER_PY_ARGS=$(python3 -c ' -import os -py_args = os.getenv("ARGS") - -def extract_opt_path(py_args, opt): - if opt not in py_args: - return py_args, None - before, after = py_args.split(opt, 1) - before, after = before.strip(), after.strip() - if " " in after: - val, after = after.split(" ", 1) - else: # for arg at end of string - val, after = after, "" - if val: - return f"{before} {after}", os.path.abspath(val) - return f"{before} {after}", None - -py_args, debug_dir = extract_opt_path(py_args, "--debug-directory") -py_args, gcd = extract_opt_path(py_args, "--gcd-dir") -py_args, startup = extract_opt_path(py_args, "--client-startup-json") - -dockermount_args = "" -py_args += " " - -if debug_dir: - dockermount_args += f"--mount type=bind,source={debug_dir},target=/local/debug " - py_args += f"--debug-directory /local/debug " -if gcd: - dockermount_args += f"--mount type=bind,source={gcd},target=/local/gcd,readonly " - # - # NOTE: WE ARE NOT FORWARDING THIS ARG TO THE SCRIPT B/C ITS PASSED WITHIN THE STARTUP.JSON - # -if startup: - dockermount_args += f"--mount type=bind,source={os.path.dirname(startup)},target=/local/startup " - py_args += f"--client-startup-json /local/startup/{os.path.basename(startup)} " - -print(f"{dockermount_args}#{py_args}") -') -DOCKERMOUNT_ARGS="$(echo $DOCKER_PY_ARGS | awk -F "#" '{print $1}')" -PY_ARGS="$(echo $DOCKER_PY_ARGS | awk -F "#" '{print $2}')" - - -set -x - - -# Figure where to get image -# Figure pull policy -if [[ ${SKYSCAN_DOCKER_PULL_ALWAYS:-"1"} == "0" ]]; then - pull_policy="" -else - pull_policy="--pull=always" -fi - - -# Run -docker run --network="host" $pull_policy --rm -i \ - --shm-size=6gb \ - $DOCKERMOUNT_ARGS \ - --env PY_COLORS=1 \ - $(env | grep '^SKYSCAN_' | awk '$0="--env "$0') \ - $(env | grep '^EWMS_' | awk '$0="--env "$0') \ - --env "EWMS_PILOT_TASK_TIMEOUT=${EWMS_PILOT_TASK_TIMEOUT:-900}" \ - icecube/skymap_scanner:${SKYSCAN_DOCKER_IMAGE_TAG:-"latest"} \ - python -m skymap_scanner.client \ - $PY_ARGS \ No newline at end of file diff --git a/resources/launch_scripts/docker/launch_server.sh b/resources/launch_scripts/docker/launch_server.sh deleted file mode 100755 index 67efa3cd5..000000000 --- a/resources/launch_scripts/docker/launch_server.sh +++ /dev/null @@ -1,85 +0,0 @@ -#!/bin/bash -set -e - -######################################################################## -# -# Launch the Skymap Scanner server -# -# Pass in the arguments as if this were just the python sub-module -# -######################################################################## - - -# Get & transform arguments that are files/dirs for docker-mounting -# yes, this is simpler than a bash-native solution -export ARGS="$*" # all of the arguments stuck together into a single string -echo $ARGS -DOCKER_PY_ARGS=$(python3 -c ' -import os -py_args = os.getenv("ARGS") - -def extract_opt_path(py_args, opt): - if opt not in py_args: - return py_args, None - before, after = py_args.split(opt, 1) - before, after = before.strip(), after.strip() - if " " in after: - val, after = after.split(" ", 1) - else: # for arg at end of string - val, after = after, "" - if val: - return f"{before} {after}", os.path.abspath(val) - return f"{before} {after}", None - -py_args, event = extract_opt_path(py_args, "--event-file") -py_args, cache = extract_opt_path(py_args, "--cache-dir") -py_args, output = extract_opt_path(py_args, "--output-dir") -py_args, gcd = extract_opt_path(py_args, "--gcd-dir") -py_args, startup = extract_opt_path(py_args, "--client-startup-json") - -dockermount_args = "" -py_args += " " - -if event: - dockermount_args += f"--mount type=bind,source={os.path.dirname(event)},target=/local/event,readonly " - py_args += f"--event-file /local/event/{os.path.basename(event)} " -if cache: - dockermount_args += f"--mount type=bind,source={cache},target=/local/cache " - py_args += f"--cache-dir /local/cache " -if output: - dockermount_args += f"--mount type=bind,source={output},target=/local/output " - py_args += f"--output-dir /local/output " -if gcd: - dockermount_args += f"--mount type=bind,source={gcd},target=/local/gcd,readonly " - py_args += f"--gcd-dir /local/gcd " -if startup: - dockermount_args += f"--mount type=bind,source={os.path.dirname(startup)},target=/local/startup " - py_args += f"--client-startup-json /local/startup/{os.path.basename(startup)} " - -print(f"{dockermount_args}#{py_args}") -') -DOCKERMOUNT_ARGS="$(echo $DOCKER_PY_ARGS | awk -F "#" '{print $1}')" -PY_ARGS="$(echo $DOCKER_PY_ARGS | awk -F "#" '{print $2}')" - - -set -x - - -# Figure pull policy -if [[ ${SKYSCAN_DOCKER_PULL_ALWAYS:-"1"} == "0" ]]; then - pull_policy="" -else - pull_policy="--pull=always" -fi - - -# Run -docker run --network="host" $pull_policy --rm -i \ - $DOCKERMOUNT_ARGS \ - --env PY_COLORS=1 \ - $(env | grep '^SKYSCAN_' | awk '$0="--env "$0') \ - $(env | grep '^EWMS_' | awk '$0="--env "$0') \ - --env "EWMS_PILOT_TASK_TIMEOUT=${EWMS_PILOT_TASK_TIMEOUT:-900}" \ - icecube/skymap_scanner:${SKYSCAN_DOCKER_IMAGE_TAG:-"latest"} \ - python -m skymap_scanner.server \ - $PY_ARGS \ No newline at end of file diff --git a/resources/launch_scripts/ewms-v1/ewms-scan.sh b/resources/launch_scripts/ewms-v1/ewms-scan.sh new file mode 100755 index 000000000..fdd0d8b1c --- /dev/null +++ b/resources/launch_scripts/ewms-v1/ewms-scan.sh @@ -0,0 +1,501 @@ +#!/bin/bash +set -e + +######################################################################## +# +# Runs a scanner instance (server) and request to EWMS for workers +# +######################################################################## + +######################################################################## +# handle cl args + +if [ -z "$1" ] || [ -z "$2" ] || [ -z "$3" ] || [ -z "$4" ] || [ -z "$5" ] || [ -z "$6" ]; then + echo "Usage: ewms-scan.sh N_WORKERS EWMS_URL SKYSCAN_TAG RECO_ALGO N_SIDES PREDICTIVE_SCANNING_THRESHOLD" + exit 1 +else + N_WORKERS="$1" + EWMS_URL="$2" + SKYSCAN_TAG="$3" + RECO_ALGO="$4" + N_SIDES="$5" + PREDICTIVE_SCANNING_THRESHOLD="$6" +fi + +# now, validate... + +if [[ $N_WORKERS != +([[:digit:]]) ]]; then + echo "ERROR: N_WORKERS must be a number: $N_WORKERS" + exit 2 +fi + +if ! [[ "$PREDICTIVE_SCANNING_THRESHOLD" == "1" || "$PREDICTIVE_SCANNING_THRESHOLD" =~ ^(0?)\.[0-9]+$ ]]; then + echo "ERROR: PREDICTIVE_SCANNING_THRESHOLD must be '1' or a decimal." + exit 2 +fi + +if [ "$(curl --fail-with-body -s -o /dev/null -w "%{http_code}" "https://hub.docker.com/v2/repositories/icecube/skymap_scanner/tags/$SKYSCAN_TAG/")" -eq 200 ]; then + echo "Tag found on Docker Hub: $SKYSCAN_TAG" +else + echo "ERROR: Tag not found on Docker Hub: $SKYSCAN_TAG" + exit 2 +fi + +######################################################################## +# set up python virtual environment + +ENV_NAME="skyscan_ewms_pyvenv" +[ ! -d "$ENV_NAME" ] && virtualenv --python python3 "$ENV_NAME" +source "$ENV_NAME"/bin/activate +pip install --upgrade pip +echo "Virtual environment '$ENV_NAME' is now active." + +######################################################################## +# set up env vars + +export SKYSCAN_SKYDRIVER_SCAN_ID=$(uuidgen) + +check_and_export_env() { + if [ -z "${!1}" ]; then + echo "ERROR: Environment variable '$1' is not set." + exit 2 + fi + export "$1" +} + +check_and_export_env S3_URL +check_and_export_env S3_ACCESS_KEY_ID +check_and_export_env S3_SECRET_KEY +check_and_export_env S3_BUCKET + +export S3_OBJECT_DEST_FILE="${SKYSCAN_SKYDRIVER_SCAN_ID}-s3-json" # no dots allowed + +######################################################################## +# trap tools -- what to do if things go wrong + +add_cleanup() { + local new_cmd="$1" + local trap_signal="$2" + local existing_trap + + echo "Adding cleanup task for $trap_signal: '$new_cmd'" + + # Get the current trap for the specified signal, if any + existing_trap=$(trap -p "$trap_signal" | sed -E "s/^trap -- '(.*)' $trap_signal/\1/") + + # If empty -> set the new command, else append + if [[ -z "$existing_trap" ]]; then + set -x + trap "$new_cmd" "$trap_signal" + set +x + else + set -x + trap "$existing_trap; $new_cmd" "$trap_signal" + set +x + fi +} + +# add first cleanup actions +add_cleanup 'echo "Handling cleanup for exit..."' EXIT +add_cleanup 'echo "Handling cleanup for error..."' ERR +add_cleanup 'echo "Handling cleanup for ctrl+C..."' INT +add_cleanup 'echo "Handling cleanup for SIGTERM..."' TERM + +######################################################################## +# S3: Generate the GET pre-signed URL -- server will post here later, ewms needs it now + +echo "################################################################################" +echo "Connecting to S3 to get pre-signed GET URL..." && echo + +pip install boto3 +S3_OBJECT_URL=$(python3 -c ' +import os, boto3 + +s3_client = boto3.client( + "s3", + "us-east-1", + endpoint_url=os.environ["S3_URL"], + aws_access_key_id=os.environ["S3_ACCESS_KEY_ID"], + aws_secret_access_key=os.environ["S3_SECRET_KEY"], +) + +# get GET url +get_url = s3_client.generate_presigned_url( + "get_object", + Params={ + "Bucket": os.environ["S3_BUCKET"], + "Key": os.environ["S3_OBJECT_DEST_FILE"], + }, + ExpiresIn=24 * 60 * 60, # seconds +) +print(get_url) +') + +echo $S3_OBJECT_URL + +######################################################################## +# request workers on ewms + +echo "################################################################################" +echo "Requesting to EWMS..." && echo + +export POST_REQ=$( + cat </dev/null) + + # Step 2: Compare the current workflow response with the previous one + if [[ $wf_resp != "$prev_wf_response" ]]; then + echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + echo "~~ The EWMS workflow object has updated: ~~" + echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + date + # If the response has changed, print it and update the previous response + echo "$wf_resp" | jq . -M --indent 4 # Format JSON with 4 spaces + prev_wf_response="$wf_resp" + fi + + # Step 3: Get taskforces associated with the workflow_id + local tf_resp=$(python3 -c " +import os, rest_tools, json, pathlib +rc = rest_tools.client.SavedDeviceGrantAuth( + 'https://ewms-dev.icecube.aq', + token_url='https://keycloak.icecube.wisc.edu/auth/realms/IceCube', + filename=str(pathlib.Path('~/device-refresh-token').expanduser().resolve()), + client_id='ewms-dev-public', + retries=0, +) +# Perform POST request to get taskforces by workflow_id +res = rc.request_seq('POST', '/v0/query/taskforces', {'query':{'workflow_id': os.environ['WORKFLOW_ID']}}) +print(json.dumps(res)) +" WORKFLOW_ID=$workflow_id 2>/dev/null) + + # Step 4: Compare the current taskforces response with the previous one + if [[ $tf_resp != "$prev_tf_response" ]]; then + echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + echo "~~ The EWMS taskforce object(s) have updated: ~~" + echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + date + # If the response has changed, print it and update the previous response + echo "$tf_resp" | jq . -M --indent 4 # Format JSON with 4 spaces + prev_tf_response="$tf_resp" + fi + done +} + +check_ewms_changes "$WORKFLOW_ID" & +check_changes_pid=$! +add_cleanup 'kill $check_changes_pid 2>/dev/null' EXIT +add_cleanup 'kill $check_changes_pid 2>/dev/null' ERR +add_cleanup 'kill $check_changes_pid 2>/dev/null' INT +add_cleanup 'kill $check_changes_pid 2>/dev/null' TERM + +######################################################################## +# get queue connection info + +echo "################################################################################" +echo "Getting MQ info..." && echo + +mqprofiles=$(python3 -c ' +import os, rest_tools, pathlib, time, json +rc = rest_tools.client.SavedDeviceGrantAuth( + "https://ewms-dev.icecube.aq", + token_url="https://keycloak.icecube.wisc.edu/auth/realms/IceCube", + filename=str(pathlib.Path("~/device-refresh-token").expanduser().resolve()), + client_id="ewms-dev-public", + retries=0, +) + +workflow_id = os.environ["WORKFLOW_ID"] +mqprofiles: list[dict] = [] +# loop until mqprofiles is not empty and all "is_activated" fields are true +while not (mqprofiles and all(m["is_activated"] for m in mqprofiles)): + time.sleep(10) + mqprofiles = ( + rc.request_seq( + "GET", + f"/v0/mqs/workflows/{workflow_id}/mq-profiles/public", + ) + )["mqprofiles"] + +print(json.dumps(mqprofiles)) +') +echo "MQ info:" +echo "$mqprofiles" | jq . -M --indent 4 # Format JSON with 4 spaces + +# map mqprofiles from the queue names +mqprofile_toclient=$(echo "$mqprofiles" | jq --arg mqid "$QUEUE_TOCLIENT" '.[] | select(.mqid == $mqid)') +mqprofile_fromclient=$(echo "$mqprofiles" | jq --arg mqid "$QUEUE_FROMCLIENT" '.[] | select(.mqid == $mqid)') + +# set env vars for vals from the mqprofiles +export SKYSCAN_MQ_TOCLIENT=$(echo "$mqprofile_toclient" | jq -r '.mqid') +export SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN=$(echo "$mqprofile_toclient" | jq -r '.auth_token') +export SKYSCAN_MQ_TOCLIENT_BROKER_TYPE=$(echo "$mqprofile_toclient" | jq -r '.broker_type') +export SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS=$(echo "$mqprofile_toclient" | jq -r '.broker_address') +# +export SKYSCAN_MQ_FROMCLIENT=$(echo "$mqprofile_fromclient" | jq -r '.mqid') +export SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN=$(echo "$mqprofile_fromclient" | jq -r '.auth_token') +export SKYSCAN_MQ_FROMCLIENT_BROKER_TYPE=$(echo "$mqprofile_fromclient" | jq -r '.broker_type') +export SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS=$(echo "$mqprofile_fromclient" | jq -r '.broker_address') + +######################################################################## +# start server + +echo "################################################################################" +echo "Starting local scanner server..." && echo + +SCANNER_SERVER_DIR="./scan-dir-$WORKFLOW_ID/" +mkdir $SCANNER_SERVER_DIR + +# look at env vars before running +env | grep -E '^(SKYSCAN_|_SKYSCAN_)' | sort || true +env | grep -E '^(EWMS_|_EWMS_)' | sort || true + +set -x # let's see this command +sudo -E docker run --network="host" --rm -i \ + $DOCKERMOUNT_ARGS \ + --mount type=bind,source=$(realpath $SCANNER_SERVER_DIR),target=/local/$(basename $SCANNER_SERVER_DIR) \ + $(env | grep -E '^(SKYSCAN_|_SKYSCAN_)' | sort | cut -d'=' -f1 | sed 's/^/--env /') \ + $(env | grep -E '^(EWMS_|_EWMS_)' | sort | cut -d'=' -f1 | sed 's/^/--env /') \ + icecube/skymap_scanner:${SKYSCAN_SERVER_TAG:-$SKYSCAN_TAG} \ + python -m skymap_scanner.server \ + --client-startup-json /local/$(basename $SCANNER_SERVER_DIR)/startup.json \ + --cache-dir /local/$(basename $SCANNER_SERVER_DIR)/cache-dir/ \ + --output-dir /local/$(basename $SCANNER_SERVER_DIR)/results/ \ + --reco-algo "$RECO_ALGO" \ + --predictive-scanning-threshold "$PREDICTIVE_SCANNING_THRESHOLD" \ + --event-file /local/tests/data/realtime_events/run00136766-evt000007637140-GOLD.pkl --real-event \ + --nsides $N_SIDES | + tee "$SCANNER_SERVER_DIR/server.out" 2>&1 \ + & +server_pid=$! +set +x + +sleep 3 # for stdout ordering + +export S3_FILE_TO_UPLOAD="$SCANNER_SERVER_DIR/startup.json" + +######################################################################## +# get startup.json -> put in S3 + +echo "################################################################################" +echo "Waiting for file $S3_FILE_TO_UPLOAD..." && echo + +# wait until the file exists (with a timeout) +found="false" +endtime=$(date -ud "120 seconds" +%s) # wait this long +while [[ $(date -u +%s) -le $endtime ]]; do + if [[ -e $S3_FILE_TO_UPLOAD ]]; then + echo "Success, file found!" + found="true" + break + fi + echo "waiting for file..." + echo "ls: $SCANNER_SERVER_DIR" + ls "$SCANNER_SERVER_DIR" + sleep 5 +done +if [[ $found != "true" ]]; then + echo "ERROR: file not found: $S3_FILE_TO_UPLOAD" + exit 1 +fi + +echo && echo "Uploading file ($S3_FILE_TO_UPLOAD) to S3..." + +out=$(python3 -c ' +import os, boto3, requests, pathlib + +s3_client = boto3.client( + "s3", + "us-east-1", + endpoint_url=os.environ["S3_URL"], + aws_access_key_id=os.environ["S3_ACCESS_KEY_ID"], + aws_secret_access_key=os.environ["S3_SECRET_KEY"], +) + +# POST +upload_details = s3_client.generate_presigned_post( + os.environ["S3_BUCKET"], + os.environ["S3_OBJECT_DEST_FILE"] +) +filepath = pathlib.Path(os.environ["S3_FILE_TO_UPLOAD"]) +with open(filepath, "rb") as f: + response = requests.post( + upload_details["url"], + data=upload_details["fields"], + files={"file": (filepath.name, f)}, # maps filename to obj + ) +print(f"Upload response: {response.status_code}") +print(str(response.content)) +') +echo $out + +######################################################################## +# wait for scan to finish + +echo "################################################################################" +echo "Waiting for scan to finish..." && echo + +# dump all content, then dump new content in realtime +tail -n +1 -f "$SCANNER_SERVER_DIR/server.out" & + +wait $server_pid +echo "The scan finished!" + +######################################################################## +# deactivate ewms workflow + +echo "################################################################################" +echo "Deactivating the workflow..." && echo + +POST_RESP=$(python3 -c " +import os, rest_tools, json, pathlib +rc = rest_tools.client.SavedDeviceGrantAuth( + 'https://ewms-dev.icecube.aq', + token_url='https://keycloak.icecube.wisc.edu/auth/realms/IceCube', + filename=str(pathlib.Path('~/device-refresh-token').expanduser().resolve()), + client_id='ewms-dev-public', + retries=0, +) +workflow_id = os.environ['WORKFLOW_ID'] +res = rc.request_seq('POST', f'/v0/workflows/{workflow_id}/actions/finished') +print(json.dumps(res)) +" WORKFLOW_ID=$workflow_id) +echo "$POST_RESP" | jq . -M --indent 4 # Format JSON with 4 spaces +sleep 120 # TODO - use smarter logic + +######################################################################## +# look at result + +echo "################################################################################" +echo "The scan was a success!" && echo + +echo "The results:" +ls "$SCANNER_SERVER_DIR/results/" + +echo "Script finished." diff --git a/resources/launch_scripts/launch_worker.sh b/resources/launch_scripts/launch_worker.sh new file mode 100755 index 000000000..e53ff77bf --- /dev/null +++ b/resources/launch_scripts/launch_worker.sh @@ -0,0 +1,62 @@ +#!/bin/bash +set -ex + +######################################################################## +# +# Launch a Skymap Scanner worker +# +# Run worker on ewms pilot +# +######################################################################## + +# establish pilot's root path +tmp_rootdir="$(pwd)/pilot-$(uuidgen)" +mkdir $tmp_rootdir +cd $tmp_rootdir +export EWMS_PILOT_DATA_DIR_PARENT_PATH_ON_HOST="$tmp_rootdir" + +# mark startup.json's dir to be bind-mounted into the task container (by the pilot) +# -> check that the dir only has one file, otherwise we may end up binding extra dirs +python -c 'import os; assert os.listdir(os.path.dirname(os.environ["CI_SKYSCAN_STARTUP_JSON"])) == ["startup.json"]' +export EWMS_PILOT_EXTERNAL_DIRECTORIES="$(dirname "$CI_SKYSCAN_STARTUP_JSON")" + +# task image, args, env +if [ -n "$_RUN_THIS_SINGULARITY_IMAGE" ]; then + # place a duplicate of the file b/c the pilot transforms this into another format, so there are issues w/ parallelizing + export EWMS_PILOT_TASK_IMAGE="$tmp_rootdir/$(basename "$_RUN_THIS_SINGULARITY_IMAGE")" + cp "$_RUN_THIS_SINGULARITY_IMAGE" "$EWMS_PILOT_TASK_IMAGE" + export _EWMS_PILOT_CONTAINER_PLATFORM="apptainer" +else + export EWMS_PILOT_TASK_IMAGE="$CI_DOCKER_IMAGE_TAG" + export _EWMS_PILOT_CONTAINER_PLATFORM="docker" # NOTE: technically not needed b/c this is the default value + export _EWMS_PILOT_DOCKER_SHM_SIZE="6gb" # this only needed in ci--the infra would set this in prod +fi +export EWMS_PILOT_TASK_ARGS="python -m skymap_scanner.client --infile {{INFILE}} --outfile {{OUTFILE}} --client-startup-json $CI_SKYSCAN_STARTUP_JSON" +json_var=$(env | grep -E '^(SKYSCAN_|_SKYSCAN_)' | awk -F= '{printf "\"%s\":\"%s\",", $1, $2}' | sed 's/,$//') # must remove last comma +json_var="{$json_var}" +export EWMS_PILOT_TASK_ENV_JSON="$json_var" + +# file types -- controls intermittent serialization +export EWMS_PILOT_INFILE_EXT="JSON" +export EWMS_PILOT_OUTFILE_EXT="JSON" + +# to-client queue +export EWMS_PILOT_QUEUE_INCOMING="$SKYSCAN_MQ_TOCLIENT" +export EWMS_PILOT_QUEUE_INCOMING_AUTH_TOKEN="$SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN" +export EWMS_PILOT_QUEUE_INCOMING_BROKER_TYPE="$SKYSCAN_MQ_TOCLIENT_BROKER_TYPE" +export EWMS_PILOT_QUEUE_INCOMING_BROKER_ADDRESS="$SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS" +# +# from-client queue +export EWMS_PILOT_QUEUE_OUTGOING="$SKYSCAN_MQ_FROMCLIENT" +export EWMS_PILOT_QUEUE_OUTGOING_AUTH_TOKEN="$SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN" +export EWMS_PILOT_QUEUE_OUTGOING_BROKER_TYPE="$SKYSCAN_MQ_FROMCLIENT_BROKER_TYPE" +export EWMS_PILOT_QUEUE_OUTGOING_BROKER_ADDRESS="$SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS" + +# run! +ENV="$(dirname $tmp_rootdir)/pyenv-$(basename $tmp_rootdir)" +pip install virtualenv +virtualenv --python python3 "$ENV" +. "$ENV"/bin/activate +pip install --upgrade pip +pip install ewms-pilot[rabbitmq] +python -m ewms_pilot diff --git a/resources/launch_scripts/local-scan.sh b/resources/launch_scripts/local-scan.sh index 177dd584f..3b427c239 100755 --- a/resources/launch_scripts/local-scan.sh +++ b/resources/launch_scripts/local-scan.sh @@ -3,42 +3,47 @@ set -ex ######################################################################## # -# Runs a scanner instance (server & clients) all on the same machine +# Runs a scanner instance (server & workers) all on the same machine # ######################################################################## - -if [[ $(basename `pwd`) != "launch_scripts" ]]; then +if [[ $(basename $(pwd)) != "launch_scripts" ]]; then echo "script must be executed within 'resources/launch_scripts' directory" exit 1 fi +######################################################################## +# Validate args if [ -z "$1" ] || [ -z "$2" ]; then - echo "Usage: local-scan.sh N_CLIENTS OUTPUT_DIR" + echo "Usage: local-scan.sh N_WORKERS OUTPUT_DIR" exit 1 fi -if [[ "$1" != +([[:digit:]]) ]]; then - echo "N_CLIENTS must be a number: $1" +if [[ $1 != +([[:digit:]]) ]]; then + echo "N_WORKERS must be a number: $1" exit 2 fi -nclients="$1" +nworkers="$1" if [ ! -d $(dirname $2) ]; then echo "Directory Not Found: $(dirname $1)" exit 2 fi -outdir="$2" +outdir="$(realpath $2)" mkdir -p "$outdir" +######################################################################## +# Check required env vars -if [ -z "$SKYSCAN_CACHE_DIR" ] || [ -z "$SKYSCAN_OUTPUT_DIR" ] || [ -z "$SKYSCAN_DEBUG_DIR" ]; then - echo "required env vars: SKYSCAN_CACHE_DIR, SKYSCAN_OUTPUT_DIR, SKYSCAN_DEBUG_DIR" - # will fail in mkdirs below... +if [ -z "$CI_SKYSCAN_CACHE_DIR" ] || [ -z "$CI_SKYSCAN_OUTPUT_DIR" ] || [ -z "$CI_SKYSCAN_DEBUG_DIR" ]; then + echo "required env vars: CI_SKYSCAN_CACHE_DIR, CI_SKYSCAN_OUTPUT_DIR, CI_SKYSCAN_DEBUG_DIR" + exit 2 fi -mkdir $SKYSCAN_CACHE_DIR -mkdir $SKYSCAN_OUTPUT_DIR -mkdir $SKYSCAN_DEBUG_DIR +mkdir $CI_SKYSCAN_CACHE_DIR +mkdir $CI_SKYSCAN_OUTPUT_DIR +mkdir $CI_SKYSCAN_DEBUG_DIR +######################################################################## +# Misc setup if [ -z "$_PREDICTIVE_SCANNING_THRESHOLD" ]; then arg_predictive_scanning_threshold="" @@ -46,40 +51,131 @@ else arg_predictive_scanning_threshold="--predictive-scanning-threshold $_PREDICTIVE_SCANNING_THRESHOLD" fi +declare -A pidmap # map of background pids to wait on +export CI_SKYSCAN_STARTUP_JSON="$(pwd)/dir-for-startup-json/startup.json" # export for launch_worker.sh +mkdir "$(dirname "$CI_SKYSCAN_STARTUP_JSON")" + +######################################################################## # Launch Server -./docker/launch_server.sh \ - --reco-algo $_RECO_ALGO \ - --event-file $_EVENTS_FILE \ - --cache-dir $SKYSCAN_CACHE_DIR \ - --output-dir $SKYSCAN_OUTPUT_DIR \ - --client-startup-json ./startup.json \ - --nsides $_NSIDES \ - $arg_predictive_scanning_threshold \ - --real-event \ - 2>&1 | tee "$outdir"/server.out \ - & -server_pid=$! +if [ -n "$_RUN_THIS_SINGULARITY_IMAGE" ]; then + # SINGULARITY + singularity run "$_RUN_THIS_SINGULARITY_IMAGE" \ + python -m skymap_scanner.server \ + --reco-algo $_RECO_ALGO \ + --event-file $_EVENTS_FILE \ + --cache-dir $CI_SKYSCAN_CACHE_DIR \ + --output-dir $CI_SKYSCAN_OUTPUT_DIR \ + --client-startup-json $CI_SKYSCAN_STARTUP_JSON \ + --nsides $_NSIDES \ + --simulated-event \ + 2>&1 | tee "$outdir"/server.out \ + & +else + # DOCKER + docker run --network="host" --rm \ + --mount type=bind,source="$(dirname "$_EVENTS_FILE")",target=/local/event,readonly \ + --mount type=bind,source="$CI_SKYSCAN_CACHE_DIR",target=/local/cache \ + --mount type=bind,source="$CI_SKYSCAN_OUTPUT_DIR",target=/local/output \ + --mount type=bind,source="$(dirname "$CI_SKYSCAN_STARTUP_JSON")",target=/local/startup \ + --env PY_COLORS=1 \ + $(env | grep -E '^(SKYSCAN_|_SKYSCAN_)' | cut -d'=' -f1 | sed 's/^/--env /') \ + $(env | grep -E '^(EWMS_|_EWMS_)' | cut -d'=' -f1 | sed 's/^/--env /') \ + "$CI_DOCKER_IMAGE_TAG" \ + python -m skymap_scanner.server \ + --reco-algo $_RECO_ALGO \ + --event-file "/local/event/$(basename "$_EVENTS_FILE")" \ + --cache-dir /local/cache \ + --output-dir /local/output \ + --client-startup-json "/local/startup/$(basename $CI_SKYSCAN_STARTUP_JSON)" \ + --nsides $_NSIDES \ + $arg_predictive_scanning_threshold \ + --real-event \ + 2>&1 | tee "$outdir"/server.out \ + & +fi +pidmap["$!"]="central server" +######################################################################## # Wait for startup.json -./wait_for_file.sh ./startup.json $CLIENT_STARTER_WAIT_FOR_STARTUP_JSON +./wait_for_file.sh $CI_SKYSCAN_STARTUP_JSON 60 -# Launch Clients -echo "Launching $nclients clients" -export EWMS_PILOT_TASK_TIMEOUT=${EWMS_PILOT_TASK_TIMEOUT:-"1800"} # 30 mins -for i in $( seq 1 $nclients ); do - ./docker/launch_client.sh \ - --client-startup-json ./startup.json \ - --debug-directory $SKYSCAN_DEBUG_DIR \ - 2>&1 | tee "$outdir"/client-$i.out \ - & - echo -e "\tclient #$i launched" +######################################################################## +# Launch Workers that each run a Pilot which each run Skyscan Clients + +launch_scripts_dir=$(pwd) +echo "Launching $nworkers workers" +export EWMS_PILOT_TASK_TIMEOUT=${EWMS_PILOT_TASK_TIMEOUT:-"1800"} # 30 mins +for i in $(seq 1 $nworkers); do + dir="$outdir/worker-$i/" + mkdir -p $dir + cd $dir + $launch_scripts_dir/launch_worker.sh >>$dir/pilot.out 2>&1 & + pidmap["$!"]="worker #$i" + echo -e "\tworker #$i launched" +done + +######################################################################## +# Wait for scan components to finish + +set +x +echo "pidmap:" +for pid in "${!pidmap[@]}"; do + echo "PID: $pid, Identifier: ${pidmap[$pid]}" done +######################################################################## +# Wait for all background processes to complete, looping in reverse order + +# helper function to find the finished process +find_finished_pid() { + local running_pids=("$@") + local is_running + for pid in "${pids[@]}"; do + is_running=false + local running_pid + for running_pid in "${running_pids[@]}"; do + if [[ $pid == "$running_pid" ]]; then + is_running=true + break + fi + done + if ! $is_running; then + echo "$pid" + return + fi + done +} + +# Loop over the number of background tasks -- each time, we'll wait on the FIRST to finish +echo "Waiting on components..." +pids=("${!pidmap[@]}") # get keys +for ((i = 0; i < ${#pids[@]}; i++)); do + sleep 10 + set -x + wait -n # wait for the FIRST to finish + exit_status=$? + #set +x + sleep 5 # for our logs + + # find the finished process PID by checking jobs + running_pids=($(jobs -pr)) + finished_pid=$(find_finished_pid "${running_pids[@]}") + echo "Process $finished_pid (${pidmap[$finished_pid]}) finished with $exit_status." + + # check if that proc failed + if [ $exit_status -ne 0 ]; then + echo "ERROR: A process exited with status $exit_status. Exiting and killing remaining processes." + # Kill all remaining background processes + for pid in "${pids[@]}"; do + sleep 1 + kill "$pid" 2>/dev/null + done + sleep 10 # for proc logs + exit 1 + fi +done -# Wait for scan -# -- we don't actually care about the clients, if they fail or not -# -- if all the clients fail, then the sever times out and we can look at client logs -wait $server_pid \ No newline at end of file +echo "All components finished successfully" diff --git a/resources/launch_scripts/wait_for_file.sh b/resources/launch_scripts/wait_for_file.sh index 7db95d467..ffeccb158 100755 --- a/resources/launch_scripts/wait_for_file.sh +++ b/resources/launch_scripts/wait_for_file.sh @@ -4,7 +4,7 @@ set -ex ######################################################################## # # Wait for $1 after launching a Skymap Scanner server -# and before launching any clients +# and before launching any workers # # Pass in two arguments, the filepath to the file & wait duration # @@ -18,7 +18,7 @@ if [ ! -d $(dirname $1) ]; then echo "Directory Not Found: $(dirname $1)" exit 2 fi -if [[ "$2" != +([[:digit:]]) ]]; then +if [[ $2 != +([[:digit:]]) ]]; then echo "Wait duration must be a number (seconds): $2" exit 2 fi @@ -28,12 +28,12 @@ timeout="$2" echo "Will wait for '$1' for $timeout seconds in $waitsec second intervals" # wait until the file exists (with a timeout) -endtime=$(date -ud "$timeout seconds" +%s) # wait this long +endtime=$(date -ud "$timeout seconds" +%s) # wait this long while [[ $(date -u +%s) -le $endtime ]]; do - if [[ -e "$1" ]]; then + if [[ -e $1 ]]; then echo "Success! '$1' file found:" ls $1 - exit 0 # Done! + exit 0 # Done! fi echo "waiting for '$1' ($waitsec second intervals)..." sleep $waitsec @@ -41,4 +41,4 @@ done echo "Failed. '$1' not found within time limit ($timeout seconds):" ls $1 -exit 62 # Timer expired \ No newline at end of file +exit 62 # Timer expired diff --git a/setup.cfg b/setup.cfg index 5f6c8d0d8..bae8102d5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,12 +21,11 @@ branch = main [options] # generated by wipac:cicd_setup_builder: python_requires, packages install_requires = - ewms-pilot==0.19.0 # the extras here are the same as 'oms-mqclient', so just define one place healpy icecube-skyreader iminuit<2.27.0 numpy<2.0.0 - oms-mqclient==2.4.10 + oms-mqclient scipy wipac-dev-tools wipac-rest-tools @@ -34,8 +33,6 @@ python_requires = >=3.9, <3.12 packages = find: [options.extras_require] -client-starter = - htcondor pulsar = oms-mqclient[pulsar] rabbitmq = @@ -45,7 +42,6 @@ nats = all = oms-mqclient[all] mypy = - %(client-starter)s %(all)s [options.package_data] # generated by wipac:cicd_setup_builder: '*' diff --git a/skymap_scanner/client/__init__.py b/skymap_scanner/client/__init__.py index e69de29bb..8408a5da0 100644 --- a/skymap_scanner/client/__init__.py +++ b/skymap_scanner/client/__init__.py @@ -0,0 +1,31 @@ +"""The Skymap Scanner Client.""" + +import dataclasses as dc + +from wipac_dev_tools import from_environment_as_dataclass + +from .. import config as cfg + + +# +# Env var constants: set as constants & typecast +# + + +@dc.dataclass(frozen=True) +class EnvConfig: + """For storing environment variables, typed.""" + + # LOGGING VARS + SKYSCAN_LOG: str = cfg.LOG_LEVEL_DEFAULT + SKYSCAN_LOG_THIRD_PARTY: str = cfg.LOG_THIRD_PARTY_LEVEL_DEFAULT + SKYSCAN_EWMS_PILOT_LOG: str = cfg.LOG_LEVEL_DEFAULT + SKYSCAN_MQ_CLIENT_LOG: str = cfg.LOG_LEVEL_DEFAULT + + # TESTING/DEBUG VARS + # NOTE - these are accessed via `os.getenv` -- ctrl+F for usages + # _SKYSCAN_CI_MINI_TEST: bool = False # run minimal variations for testing (mini-scale) + # _SKYSCAN_CI_CRASH_DUMMY_PROBABILITY: float = 0.5 # for reco algo: crash-dummy + + +ENV = from_environment_as_dataclass(EnvConfig) diff --git a/skymap_scanner/client/__main__.py b/skymap_scanner/client/__main__.py index 5ab089393..17848c5f2 100644 --- a/skymap_scanner/client/__main__.py +++ b/skymap_scanner/client/__main__.py @@ -1,6 +1,7 @@ """Entry-point to start up client service.""" -from . import client +from . import reco_icetray + if __name__ == "__main__": - client.main() + reco_icetray.main() diff --git a/skymap_scanner/client/client.py b/skymap_scanner/client/client.py deleted file mode 100644 index dcd3d271a..000000000 --- a/skymap_scanner/client/client.py +++ /dev/null @@ -1,102 +0,0 @@ -"""The Client service.""" - -import argparse -import asyncio -import json -import logging -from pathlib import Path - -import ewms_pilot -from wipac_dev_tools import argparse_tools, logging_tools - -from .. import config as cfg -from ..utils.data_handling import get_gcd_datastager - -LOGGER = logging.getLogger(__name__) - - -def main() -> None: - """Start up Client service.""" - parser = argparse.ArgumentParser( - description=( - "Start up client daemon to perform reco scans on pixels " - "received from the server for a given event." - ), - epilog="", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - - # startup.json - parser.add_argument( - "--client-startup-json", - help=( - "The filepath to the JSON file to startup the client " - "(has keys 'mq_basename', 'baseline_GCD_file', and 'GCDQp_packet')" - ), - type=lambda x: argparse_tools.validate_arg( - Path(x), - Path(x).parent.is_dir(), - NotADirectoryError(Path(x).parent), - ), - ) - - # testing/debugging args - parser.add_argument( - "--debug-directory", - default="", - type=argparse_tools.create_dir, - help="a directory to write all the incoming/outgoing files " - "(useful for debugging)", - ) - - args = parser.parse_args() - cfg.configure_loggers() - logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") - - # read startup.json - with open(args.client_startup_json, "rb") as f: - startup_json_dict = json.load(f) - with open("GCDQp_packet.json", "w") as f: - json.dump(startup_json_dict[cfg.STATEDICT_GCDQP_PACKET], f) - - datastager = get_gcd_datastager() - - baseline_gcd_file = Path(startup_json_dict["baseline_GCD_file"]) - - datastager.stage_files([baseline_gcd_file.name]) - - baseline_gcd_file = Path(datastager.get_filepath(baseline_gcd_file.name)) - - # check if baseline GCD file is reachable - if not baseline_gcd_file.exists(): - raise FileNotFoundError(baseline_gcd_file) - - cmd = ( - "python -m skymap_scanner.client.reco_icetray " - " --infile {{INFILE}}" # no f-string b/c want to preserve '{{..}}' - " --outfile {{OUTFILE}}" # ^^^ - " --gcdqp-packet-json GCDQp_packet.json" - f" --baseline-gcd-file {baseline_gcd_file}" - ) - - # go! - LOGGER.info( - f"Starting up a Skymap Scanner client for event: {startup_json_dict['mq_basename']=}" - ) - asyncio.run( - ewms_pilot.consume_and_reply( - cmd=cmd, - broker_client=cfg.ENV.SKYSCAN_BROKER_CLIENT, - broker_address=cfg.ENV.SKYSCAN_BROKER_ADDRESS, - auth_token=cfg.ENV.SKYSCAN_BROKER_AUTH, - queue_incoming=f"to-clients-{startup_json_dict['mq_basename']}", - queue_outgoing=f"from-clients-{startup_json_dict['mq_basename']}", - ftype_to_subproc=".json", - ftype_from_subproc=".json", - timeout_incoming=cfg.ENV.SKYSCAN_MQ_TIMEOUT_TO_CLIENTS, - timeout_wait_for_first_message=cfg.ENV.SKYSCAN_MQ_CLIENT_TIMEOUT_WAIT_FOR_FIRST_MESSAGE, - debug_dir=args.debug_directory, - task_timeout=cfg.ENV.EWMS_PILOT_TASK_TIMEOUT, - ) - ) - LOGGER.info("Done.") diff --git a/skymap_scanner/client/reco_icetray.py b/skymap_scanner/client/reco_icetray.py index 7db502f6d..813c0b7fb 100644 --- a/skymap_scanner/client/reco_icetray.py +++ b/skymap_scanner/client/reco_icetray.py @@ -1,7 +1,5 @@ """Reco a single pixel.""" -# pylint: skip-file - import argparse import datetime import json @@ -25,7 +23,9 @@ from wipac_dev_tools import argparse_tools, logging_tools from .. import config as cfg, recos +from . import ENV from ..utils import messages +from ..utils.data_handling import get_gcd_datastager from ..utils.load_scan_state import get_baseline_gcd_frames from ..utils.pixel_classes import RecoPixelVariation, pframe_tuple from ..utils.utils import save_GCD_frame_packet_to_file @@ -245,34 +245,42 @@ def main() -> None: ), ) - # extra "physics" args + # startup.json parser.add_argument( - "--gcdqp-packet-json", - dest="GCDQp_packet_json", - required=True, - help="a JSON file containing the GCDQp_packet (list of I3Frames)", - type=lambda x: argparse_tools.validate_arg( - Path(x), - Path(x).is_file(), - FileNotFoundError(x), + "--client-startup-json", + help=( + "The filepath to the JSON file with data needed to reco " + "(contains baseline_GCD_file and GCDQp_packet)" ), - ) - parser.add_argument( - "--baseline-gcd-file", - dest="baseline_GCD_file", - required=True, - help="the baseline GCD file", type=lambda x: argparse_tools.validate_arg( Path(x), - Path(x).is_file(), - FileNotFoundError(x), + Path(x).parent.is_dir(), + NotADirectoryError(Path(x).parent), ), ) args = parser.parse_args() - cfg.configure_loggers() + logging_tools.set_level( + ENV.SKYSCAN_LOG, # type: ignore[arg-type] + first_party_loggers=__name__.split(".", maxsplit=1)[0], + third_party_level=ENV.SKYSCAN_LOG_THIRD_PARTY, # type: ignore[arg-type] + formatter=logging_tools.WIPACDevToolsFormatter(), + ) logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") + # read startup.json + with open(args.client_startup_json, "rb") as f: + startup_json_dict = json.load(f) + + # stage file(s) + datastager = get_gcd_datastager() + baseline_gcd_file = Path(startup_json_dict["baseline_GCD_file"]) + datastager.stage_files([baseline_gcd_file.name]) + baseline_gcd_file = Path(datastager.get_filepath(baseline_gcd_file.name)) + # check if baseline GCD file is reachable + if not baseline_gcd_file.exists(): + raise FileNotFoundError(baseline_gcd_file) + # get PFrame LOGGER.info(f"Reading {args.infile}...") with open(args.infile, "r") as f: @@ -281,11 +289,10 @@ def main() -> None: pframe = messages.Serialization.decode_pkl_b64(msg[cfg.MSG_KEY_PFRAME_PKL_B64]) # get GCDQp_packet - LOGGER.info(f"Reading {args.GCDQp_packet_json}...") - with open(args.GCDQp_packet_json, "r") as f: - GCDQp_packet = full_event_followup.i3live_json_to_frame_packet( - f.read(), pnf_framing=False - ) + GCDQp_packet = full_event_followup.i3live_json_to_frame_packet( + json.dumps(startup_json_dict[cfg.STATEDICT_GCDQP_PACKET]), + pnf_framing=False, + ) # go! LOGGER.info(f"Starting {reco_algo}...") @@ -293,12 +300,7 @@ def main() -> None: reco_algo, pframe, GCDQp_packet, - str(args.baseline_GCD_file), + str(baseline_gcd_file), args.outfile, ) LOGGER.info("Done reco'ing pixel.") - - -# This entrypoint is only used in CI testing -if __name__ == "__main__": - main() diff --git a/skymap_scanner/config.py b/skymap_scanner/config.py index ed161ef55..d84775ca4 100644 --- a/skymap_scanner/config.py +++ b/skymap_scanner/config.py @@ -1,16 +1,8 @@ """Configuration constants.""" -import dataclasses as dc -import logging from pathlib import Path from typing import Final, List -import ewms_pilot -import mqclient -from wipac_dev_tools import from_environment_as_dataclass, logging_tools - -# pylint:disable=invalid-name - # # True constants # @@ -57,9 +49,9 @@ "2024a": "SplitInIcePulses", } DEFAULT_INPUT_PULSES_NAME: Final = "SplitUncleanedInIcePulses" - +# INPUT_PULSES_NAME = "SplitUncleanedInIcePulses" - +# INPUT_TIME_NAME: Final = "SeedVertexTime" INPUT_POS_NAME: Final = "SeedVertexPos" OUTPUT_PARTICLE_NAME: Final = "MillipedeSeedParticle" @@ -80,14 +72,18 @@ MSG_KEY_RECO_PIXEL_VARIATION_PKL_B64: Final = "reco_pixel_variation_pkl_b64" MSG_KEY_RUNTIME: Final = "runtime" +# default filepaths BASELINE_GCD_FILENAME = "base_GCD_for_diff.i3" SOURCE_BASELINE_GCD_METADATA = "original_base_GCD_for_diff_filename.txt" GCDQp_FILENAME = "GCDQp.i3" +# predictive scanning config PREDICTIVE_SCANNING_THRESHOLD_MIN = 0.1 PREDICTIVE_SCANNING_THRESHOLD_MAX = 1.0 PREDICTIVE_SCANNING_THRESHOLD_DEFAULT = 1.0 +COLLECTOR_BASE_THRESHOLDS = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] +# reporter config REPORTER_TIMELINE_PERCENTAGES = [ 0.1, 0.2, @@ -104,96 +100,7 @@ 0.99999, 1.0, ] -COLLECTOR_BASE_THRESHOLDS = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] - - -# -# Env var constants: set as constants & typecast -# -@dc.dataclass(frozen=True) -class EnvConfig: - """For storing environment variables, typed.""" - - SKYSCAN_PROGRESS_INTERVAL_SEC: int = 1 * 60 - SKYSCAN_RESULT_INTERVAL_SEC: int = 2 * 60 - - SKYSCAN_KILL_SWITCH_CHECK_INTERVAL: int = 5 * 60 - - # BROKER/MQ VARS - SKYSCAN_BROKER_CLIENT: str = "rabbitmq" - SKYSCAN_BROKER_ADDRESS: str = "" # broker / mq address - SKYSCAN_BROKER_AUTH: str = "" # broker / mq auth token - - # TIMEOUTS - # - # seconds -- how long client waits between receiving pixels before thinking event scan is 100% done - # - set to `max(reco duration) + max(subsequent iteration startup time)` - # - think about starved clients - # - normal expiration scenario: the scan is done, no more pixels to scan (alternative: manually kill client process) - SKYSCAN_MQ_TIMEOUT_TO_CLIENTS: int = 60 * 30 # 30 mins - # - # seconds -- how long server waits before thinking all clients are dead - # - set to duration of first reco + client launch (condor) - # - important if clients launch *AFTER* server - # - normal expiration scenario: all clients died (bad condor submit file), otherwise never (server knows when all recos are done) - SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS: int = 3 * 24 * 60 * 60 # 3 days - # - # seconds -- how long client waits before first message (set to duration of server startup) - # - important if clients launch *BEFORE* server - # - normal expiration scenario: server died (ex: tried to read corrupted event file), otherwise never - SKYSCAN_MQ_CLIENT_TIMEOUT_WAIT_FOR_FIRST_MESSAGE: int = 60 * 60 # 60 mins - - EWMS_PILOT_TASK_TIMEOUT: int = 60 * 30 - - # SKYDRIVER VARS - SKYSCAN_SKYDRIVER_ADDRESS: str = "" # SkyDriver REST interface address - SKYSCAN_SKYDRIVER_AUTH: str = "" # SkyDriver REST interface auth token - SKYSCAN_SKYDRIVER_SCAN_ID: str = "" # globally unique suffix for queue names - - # LOGGING VARS - SKYSCAN_LOG: str = "INFO" - SKYSCAN_LOG_THIRD_PARTY: str = "WARNING" - SKYSCAN_EWMS_PILOT_LOG: str = "INFO" - SKYSCAN_MQ_CLIENT_LOG: str = "INFO" - - # TESTING/DEBUG VARS - SKYSCAN_MINI_TEST: bool = False # run minimal variations for testing (mini-scale) - SKYSCAN_CRASH_DUMMY_PROBABILITY: float = 0.5 # for reco algo: crash-dummy - - def __post_init__(self) -> None: - """Check values.""" - if self.SKYSCAN_PROGRESS_INTERVAL_SEC <= 0: - raise ValueError( - f"Env Var: SKYSCAN_PROGRESS_INTERVAL_SEC is not positive: {self.SKYSCAN_PROGRESS_INTERVAL_SEC}" - ) - if self.SKYSCAN_RESULT_INTERVAL_SEC <= 0: - raise ValueError( - f"Env Var: SKYSCAN_RESULT_INTERVAL_SEC is not positive: {self.SKYSCAN_RESULT_INTERVAL_SEC}" - ) - - -ENV = from_environment_as_dataclass(EnvConfig) - - -def configure_loggers() -> None: - """Set up loggers with common configurations.""" - hand = logging.StreamHandler() - hand.setFormatter( - logging.Formatter( - "%(asctime)s.%(msecs)03d [%(levelname)8s] %(name)s[%(process)d] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - ) - logging.getLogger().addHandler(hand) - logging_tools.set_level( - ENV.SKYSCAN_LOG, # type: ignore[arg-type] - first_party_loggers=__name__.split(".", maxsplit=1)[0], - third_party_level=ENV.SKYSCAN_LOG_THIRD_PARTY, # type: ignore[arg-type] - future_third_parties=["google", "pika"], - specialty_loggers={ - ewms_pilot.pilot.LOGGER: ENV.SKYSCAN_EWMS_PILOT_LOG, # type: ignore[attr-defined, dict-item] - mqclient.queue.LOGGER: ENV.SKYSCAN_MQ_CLIENT_LOG, # type: ignore[dict-item] - }, - ) +LOG_LEVEL_DEFAULT = "INFO" +LOG_THIRD_PARTY_LEVEL_DEFAULT = "WARNING" diff --git a/skymap_scanner/recos/__init__.py b/skymap_scanner/recos/__init__.py index 9a9353d80..b07773198 100644 --- a/skymap_scanner/recos/__init__.py +++ b/skymap_scanner/recos/__init__.py @@ -1,5 +1,8 @@ """Tools for conducting & representing a pixel reconstruction.""" +# mypy: ignore-errors +# fmt: off + from abc import ABC, abstractmethod import importlib import pkgutil @@ -14,8 +17,8 @@ try: # these are only used for typehints, so mock imports are fine from icecube.dataclasses import I3Position # type: ignore[import] from icecube.icetray import I3Frame # type: ignore[import] - from icecube import astro # type: ignore[import] -except ImportError: # type: ignore[import] + from icecube import astro # type: ignore[import] +except ImportError: # type: ignore[import] I3Position = Any I3Frame = Any @@ -81,11 +84,11 @@ def setup_reco(self): def traysegment(self, tray, name, logger, **kwargs: Any) -> None: """Performs the reconstruction.""" pass - + @staticmethod @abstractmethod def to_recopixelvariation( - frame: I3Frame, geometry: I3Frame + frame: I3Frame, geometry: I3Frame ) -> "RecoPixelVariation": raise NotImplementedError() @@ -120,9 +123,10 @@ def get_reco_spline_requirements(name: str) -> List[str]: raise UnsupportedRecoAlgoException(name) from e raise # something when wrong AFTER accessing sub-module + def set_pointing_ra_dec( - particle_name_possibilities: Union[List[str], None], - p_frame: I3Frame + particle_name_possibilities: Union[List[str], None], + p_frame: I3Frame ) -> Union[Tuple[float, float], None]: """Retrieves the direction for a pointed scan""" pointing_ra_dec = None @@ -135,4 +139,4 @@ def set_pointing_ra_dec( pointing_dir.azimuth, p_frame["I3EventHeader"].start_time.mod_julian_day_double ) - return pointing_ra_dec \ No newline at end of file + return pointing_ra_dec diff --git a/skymap_scanner/recos/crash_dummy.py b/skymap_scanner/recos/crash_dummy.py index d2327b64a..dce870e89 100644 --- a/skymap_scanner/recos/crash_dummy.py +++ b/skymap_scanner/recos/crash_dummy.py @@ -1,12 +1,12 @@ """IceTray segment for a dummy reco (will crash w/in a given probability).""" - +import os import random +import time from typing import Final from icecube import icetray # type: ignore[import] # noqa: F401 -from ..config import ENV from . import RecoInterface, dummy @@ -21,9 +21,10 @@ def traysegment(tray, name, logger, **kwargs): def crash(frame): rand = random.random() - logger.debug(f"crash probability: {ENV.SKYSCAN_CRASH_DUMMY_PROBABILITY=}") + prob = float(os.getenv("_SKYSCAN_CI_CRASH_DUMMY_PROBABILITY", 0.5)) + logger.debug(f"crash probability: {prob}") - if rand < ENV.SKYSCAN_CRASH_DUMMY_PROBABILITY: + if rand < prob: logger.debug(f"crash! {rand=}") # now, pick what to fail with @@ -31,6 +32,7 @@ def crash(frame): logger.debug(f"crashing with '{fail}'") if fail == "infinite-loop": while True: # to infinity! + time.sleep(1) continue elif fail == "error": raise KeyError("intentional crash-dummy error") diff --git a/skymap_scanner/recos/millipede_original.py b/skymap_scanner/recos/millipede_original.py index dd22abadc..a3fe63b6e 100644 --- a/skymap_scanner/recos/millipede_original.py +++ b/skymap_scanner/recos/millipede_original.py @@ -4,8 +4,8 @@ # pylint: skip-file # mypy: ignore-errors -import copy import datetime +import os from typing import Final, List, Tuple import numpy @@ -43,7 +43,7 @@ def get_vertex_variations() -> List[dataclasses.I3Position]: """ variation_distance = 20.*I3Units.m - if cfg.ENV.SKYSCAN_MINI_TEST: + if os.getenv('_SKYSCAN_CI_MINI_TEST'): return VertexGenerator.mini_test(variation_distance=variation_distance) else: return VertexGenerator.octahedron(radius=variation_distance) diff --git a/skymap_scanner/recos/splinempe.py b/skymap_scanner/recos/splinempe.py index a77b6aa7c..d797c403f 100644 --- a/skymap_scanner/recos/splinempe.py +++ b/skymap_scanner/recos/splinempe.py @@ -1,5 +1,7 @@ """IceTray segment for a dummy reco.""" +# mypy: ignore-errors +# fmt: off import datetime import numpy as np diff --git a/skymap_scanner/recos/splinempe_pointed.py b/skymap_scanner/recos/splinempe_pointed.py index 1121a8e14..0dbd1100e 100644 --- a/skymap_scanner/recos/splinempe_pointed.py +++ b/skymap_scanner/recos/splinempe_pointed.py @@ -1,11 +1,12 @@ """IceTray segment for a pointed splinempe reco.""" # mypy: ignore-errors - +# fmt: off + from typing import Final -from . import splinempe -from . import RecoInterface +from . import splinempe +from . import RecoInterface class SplineMPEPointed(splinempe.SplineMPE): @@ -13,4 +14,4 @@ def __init__(self): super().__init__() self.pointing_dir_names = ["OnlineL2_SplineMPE", "l2_online_SplineMPE"] -RECO_CLASS: Final[type[RecoInterface]] = SplineMPEPointed \ No newline at end of file +RECO_CLASS: Final[type[RecoInterface]] = SplineMPEPointed diff --git a/skymap_scanner/server/__init__.py b/skymap_scanner/server/__init__.py index e69de29bb..675645a75 100644 --- a/skymap_scanner/server/__init__.py +++ b/skymap_scanner/server/__init__.py @@ -0,0 +1,93 @@ +"""The Skymap Scanner Central Server.""" + +import dataclasses as dc + +from wipac_dev_tools import from_environment_as_dataclass + +from .. import config as cfg + + +# +# Env var constants: set as constants & typecast +# + + +@dc.dataclass(frozen=True) +class EnvConfig: + """For storing environment variables, typed.""" + + # + # REQUIRED + # + + SKYSCAN_SKYDRIVER_SCAN_ID: str # globally unique ID + + # to-client queue + SKYSCAN_MQ_TOCLIENT: str + SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN: str + SKYSCAN_MQ_TOCLIENT_BROKER_TYPE: str + SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS: str + # + # from-client queue + SKYSCAN_MQ_FROMCLIENT: str + SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN: str + SKYSCAN_MQ_FROMCLIENT_BROKER_TYPE: str + SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS: str + + # + # OPTIONAL + # + + SKYSCAN_PROGRESS_INTERVAL_SEC: int = 1 * 60 + SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_RATIO: float = ( + # The size of the sample window (a percentage of the collected/finished recos) + # used to calculate the most recent runtime rate (sec/reco), then used to make + # predictions for overall runtimes: i.e. amount of time left. + # Also, see SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_MIN. + 0.25 + ) + SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_MIN: int = ( + # WARNING! + # THIS IS A MINIMUM!!! -- it's only useful for the first + # `val/SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_RATIO` num of recos; + # then, SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_RATIO is used. + # NOTE: val should not be (too) below the num of workers (which is unknown, so make a good guess). + # In other words, if val is too low, then the rate is not representative of the + # worker-pool's concurrency; if val is TOO HIGH, then the window is TOO LARGE. + 400 + ) + SKYSCAN_RESULT_INTERVAL_SEC: int = 2 * 60 + + SKYSCAN_KILL_SWITCH_CHECK_INTERVAL: int = 5 * 60 + + # TIMEOUTS + # + # seconds -- how long server waits before thinking all clients are dead + # - set to duration of first reco + client launch (condor) + # - important if clients launch *AFTER* server + # - normal expiration scenario: all clients died (bad condor submit file), otherwise never (server knows when all recos are done) + SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS: int = 3 * 24 * 60 * 60 # 3 days + + # SKYDRIVER VARS + SKYSCAN_SKYDRIVER_ADDRESS: str = "" # SkyDriver REST interface address + SKYSCAN_SKYDRIVER_AUTH: str = "" # SkyDriver REST interface auth token + + # LOGGING VARS + SKYSCAN_LOG: str = cfg.LOG_LEVEL_DEFAULT + SKYSCAN_LOG_THIRD_PARTY: str = cfg.LOG_THIRD_PARTY_LEVEL_DEFAULT + SKYSCAN_EWMS_PILOT_LOG: str = cfg.LOG_LEVEL_DEFAULT + SKYSCAN_MQ_CLIENT_LOG: str = cfg.LOG_LEVEL_DEFAULT + + def __post_init__(self) -> None: + """Check values.""" + if self.SKYSCAN_PROGRESS_INTERVAL_SEC <= 0: + raise ValueError( + f"Env Var: SKYSCAN_PROGRESS_INTERVAL_SEC is not positive: {self.SKYSCAN_PROGRESS_INTERVAL_SEC}" + ) + if self.SKYSCAN_RESULT_INTERVAL_SEC <= 0: + raise ValueError( + f"Env Var: SKYSCAN_RESULT_INTERVAL_SEC is not positive: {self.SKYSCAN_RESULT_INTERVAL_SEC}" + ) + + +ENV = from_environment_as_dataclass(EnvConfig) diff --git a/skymap_scanner/server/collector.py b/skymap_scanner/server/collector.py index c0960da22..93efff8ea 100644 --- a/skymap_scanner/server/collector.py +++ b/skymap_scanner/server/collector.py @@ -1,6 +1,5 @@ """The Skymap Scanner Server.""" - import itertools import logging import time @@ -9,6 +8,7 @@ import numpy +from .reporter import Reporter from .. import config as cfg from ..utils.pixel_classes import ( NSidesDict, @@ -17,7 +17,6 @@ RecoPixelVariation, SentPixelVariation, ) -from .reporter import Reporter LOGGER = logging.getLogger(__name__) @@ -218,7 +217,7 @@ async def register_sent_pixvars( async def collect( self, reco_pixel_variation: RecoPixelVariation, - reco_runtime: float, + on_worker_runtime: float, ) -> None: """Cache RecoPixelVariation until we can save the pixel's best received reco (RecoPixelFinal).""" @@ -276,9 +275,9 @@ async def collect( # report after potential save await self.reporter.record_reco( reco_pixel_variation.nside, - reco_runtime, - roundtrip_start=sent_pixvar.sent_time, - roundtrip_end=time.time(), + on_worker_runtime, + on_server_roundtrip_start=sent_pixvar.sent_time, + on_server_roundtrip_end=time.time(), ) def has_collected_all_sent(self) -> bool: diff --git a/skymap_scanner/server/reporter.py b/skymap_scanner/server/reporter.py index f50239775..9c7a1726b 100644 --- a/skymap_scanner/server/reporter.py +++ b/skymap_scanner/server/reporter.py @@ -1,6 +1,5 @@ """Contains a class for reporting progress and result for a scan.""" - import bisect import dataclasses as dc import datetime as dt @@ -16,11 +15,12 @@ from rest_tools.client import RestClient from skyreader import EventMetadata, SkyScanResult +from . import ENV +from .utils import NSideProgression, connect_to_skydriver, nonurgent_request from .. import config as cfg from ..utils import to_skyscan_result from ..utils.pixel_classes import NSidesDict from ..utils.utils import pyobj_to_string_repr -from .utils import NSideProgression, connect_to_skydriver, nonurgent_request LOGGER = logging.getLogger(__name__) @@ -38,37 +38,42 @@ class WorkerStats: def __init__( self, - worker_runtimes: Optional[List[float]] = None, - roundtrips: Optional[List[float]] = None, - roundtrip_start: float = float("inf"), - roundtrip_end: float = float("-inf"), - ends: Optional[List[float]] = None, + on_worker_runtimes: list[float], + on_server_roundtrips: list[float], + on_server_roundtrip_starts: list[float], + on_server_roundtrip_ends: list[float], ) -> None: - self.roundtrip_start = roundtrip_start - self.roundtrip_end = roundtrip_end - - self.worker_runtimes: List[float] = worker_runtimes if worker_runtimes else [] - self.worker_runtimes.sort() # speed up stats + self.on_worker_runtimes = on_worker_runtimes + self.on_worker_runtimes.sort() # speed up stats # - self.roundtrips: List[float] = roundtrips if roundtrips else [] - self.roundtrips.sort() # speed up stats + self.on_server_roundtrips = on_server_roundtrips + self.on_server_roundtrips.sort() # speed up stats # - self.ends: List[float] = ends if ends else [] - self.ends.sort() # speed up stats + self.on_server_roundtrip_starts: List[float] = on_server_roundtrip_starts + self.on_server_roundtrip_starts.sort() # speed up stats + self.on_server_first_roundtrip_start = lambda: min( + self.on_server_roundtrip_starts + ) + # + self.on_server_roundtrip_ends = on_server_roundtrip_ends + self.on_server_roundtrip_ends.sort() # speed up stats + self.on_server_last_roundtrip_end = lambda: max(self.on_server_roundtrip_ends) - self.fastest_worker = lambda: min(self.worker_runtimes) - self.fastest_roundtrip = lambda: min(self.roundtrips) + self.fastest_worker = lambda: min(self.on_worker_runtimes) + self.fastest_roundtrip = lambda: min(self.on_server_roundtrips) - self.slowest_worker = lambda: max(self.worker_runtimes) - self.slowest_roundtrip = lambda: max(self.roundtrips) + self.slowest_worker = lambda: max(self.on_worker_runtimes) + self.slowest_roundtrip = lambda: max(self.on_server_roundtrips) # Fast, floating point arithmetic mean. - self.mean_worker = lambda: statistics.fmean(self.worker_runtimes) - self.mean_roundtrip = lambda: statistics.fmean(self.roundtrips) + self.mean_worker = lambda: statistics.fmean(self.on_worker_runtimes) + self.mean_roundtrip = lambda: statistics.fmean(self.on_server_roundtrips) # Median (middle value) of data. - self.median_worker = lambda: float(statistics.median(self.worker_runtimes)) - self.median_roundtrip = lambda: float(statistics.median(self.roundtrips)) + self.median_worker = lambda: float(statistics.median(self.on_worker_runtimes)) + self.median_roundtrip = lambda: float( + statistics.median(self.on_server_roundtrips) + ) # other statistics functions... # geometric_mean Geometric mean of data. @@ -84,16 +89,27 @@ def __init__( def update( self, - worker_runtime: float, - roundtrip_start: float, - roundtrip_end: float, + on_worker_runtime: float, + on_server_roundtrip_start: float, + on_server_roundtrip_end: float, ) -> "WorkerStats": """Insert the runtime and recalculate round-trip start/end times.""" - bisect.insort(self.worker_runtimes, worker_runtime) - bisect.insort(self.roundtrips, roundtrip_end - roundtrip_start) - bisect.insort(self.ends, roundtrip_end) - self.roundtrip_start = min(self.roundtrip_start, roundtrip_start) - self.roundtrip_end = max(self.roundtrip_end, roundtrip_end) + bisect.insort( + self.on_worker_runtimes, + on_worker_runtime, + ) + bisect.insort( + self.on_server_roundtrips, + on_server_roundtrip_end - on_server_roundtrip_start, + ) + bisect.insort( + self.on_server_roundtrip_starts, + on_server_roundtrip_start, + ) + bisect.insort( + self.on_server_roundtrip_ends, + on_server_roundtrip_end, + ) return self def get_summary(self) -> Dict[str, Dict[str, str]]: @@ -129,16 +145,30 @@ def get_summary(self) -> Dict[str, Dict[str, str]]: ), }, "wall time": { - "start": str(dt.datetime.fromtimestamp(int(self.roundtrip_start))), - "end": str(dt.datetime.fromtimestamp(int(self.roundtrip_end))), + "start": str( + dt.datetime.fromtimestamp( + int(self.on_server_first_roundtrip_start()) + ) + ), + "end": str( + dt.datetime.fromtimestamp(int(self.on_server_last_roundtrip_end())) + ), "runtime": str( - dt.timedelta(seconds=int(self.roundtrip_end - self.roundtrip_start)) + dt.timedelta( + seconds=int( + self.on_server_last_roundtrip_end() + - self.on_server_first_roundtrip_start() + ) + ) ), "mean reco": str( dt.timedelta( seconds=int( - (self.roundtrip_end - self.roundtrip_start) - / len(self.worker_runtimes) + ( + self.on_server_last_roundtrip_end() + - self.on_server_first_roundtrip_start() + ) + / len(self.on_worker_runtimes) ) ) ), @@ -153,54 +183,120 @@ def __init__(self) -> None: self._worker_stats_by_nside: Dict[int, WorkerStats] = {} self._aggregate: Optional[WorkerStats] = None + def get_runtime_prediction_technique(self) -> str: + """Get a human-readable string of what technique is used for predicting runtimes.""" + if ( + self.runtime_sample_window_size + == ENV.SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_MIN + ): + return ( + f"simple average over entire scan runtime " + f"(a moving average with a window of " + f"{ENV.SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_RATIO} " + f"will be used after " + f"{int(ENV.SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_MIN/ENV.SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_RATIO)} " + f"recos have finished)" + ) + else: + return f"simple moving average (window={self.runtime_sample_window_size})" + + @property + def _runtime_sample_window_size_candidate(self) -> int: + """The window size that would be used if not for a minimum.""" + return int(self.total_ct * ENV.SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_RATIO) + + @property + def runtime_sample_window_size(self) -> int: + """The size of the window used for predicting runtimes.""" + return max( + self._runtime_sample_window_size_candidate, + ENV.SKYSCAN_PROGRESS_RUNTIME_PREDICTION_WINDOW_MIN, + ) + + def on_server_recent_sec_per_reco_rate(self) -> float: + """The sec/reco rate from server pov within a moving window.""" + try: + # look at a window, so don't use the first start time + # psst, we know that this list is sorted, ascending + nth_most_recent_start = self.aggregate.on_server_roundtrip_starts[ + -self.runtime_sample_window_size + ] + n_recos = self.runtime_sample_window_size + except IndexError: + # not enough recos to sample, so take all of them + nth_most_recent_start = self.aggregate.on_server_first_roundtrip_start() + n_recos = self.total_ct + + return ( + self.aggregate.on_server_last_roundtrip_end() - nth_most_recent_start + ) / n_recos + def ct_by_nside(self, nside: int) -> int: """Get length per given nside.""" try: - return len(self._worker_stats_by_nside[nside].worker_runtimes) + return len(self._worker_stats_by_nside[nside].on_worker_runtimes) except KeyError: return 0 @property def total_ct(self) -> int: """O(n) b/c len is O(1), n < 10.""" - return sum(len(w.worker_runtimes) for w in self._worker_stats_by_nside.values()) + return sum( + len(w.on_worker_runtimes) for w in self._worker_stats_by_nside.values() + ) @property def first_roundtrip_start(self) -> float: - """O(n), n < 10.""" - return min(w.roundtrip_start for w in self._worker_stats_by_nside.values()) + """Get the first roundtrip start time from server pov.""" + return self.aggregate.on_server_first_roundtrip_start() def update( self, nside: int, - runtime: float, - roundtrip_start: float, - roundtrip_end: float, + on_worker_runtime: float, + on_server_roundtrip_start: float, + on_server_roundtrip_end: float, ) -> int: """Return reco-count of nside's list after updating.""" self._aggregate = None # clear try: worker_stats = self._worker_stats_by_nside[nside] + worker_stats.update( + on_worker_runtime, + on_server_roundtrip_start, + on_server_roundtrip_end, + ) except KeyError: - worker_stats = self._worker_stats_by_nside[nside] = WorkerStats() - worker_stats.update(runtime, roundtrip_start, roundtrip_end) - return len(worker_stats.worker_runtimes) + worker_stats = self._worker_stats_by_nside[nside] = WorkerStats( + on_worker_runtimes=[on_worker_runtime], + on_server_roundtrips=[ + on_server_roundtrip_end - on_server_roundtrip_start + ], + on_server_roundtrip_starts=[on_server_roundtrip_start], + on_server_roundtrip_ends=[on_server_roundtrip_end], + ) + return len(worker_stats.on_worker_runtimes) @property def aggregate(self) -> WorkerStats: - """Cached `WorkerStats` aggregate.""" + """An aggregate (`WorkerStats` obj) of all recos (all nsides).""" if not self._aggregate: instances = self._worker_stats_by_nside.values() if not instances: - return WorkerStats() + return WorkerStats([], [], [], []) self._aggregate = WorkerStats( - worker_runtimes=list( - itertools.chain(*[i.worker_runtimes for i in instances]) + on_worker_runtimes=list( + itertools.chain(*[i.on_worker_runtimes for i in instances]) + ), + on_server_roundtrips=list( + itertools.chain(*[i.on_server_roundtrips for i in instances]) + ), + on_server_roundtrip_starts=list( + itertools.chain(*[i.on_server_roundtrip_starts for i in instances]) + ), + on_server_roundtrip_ends=list( + itertools.chain(*[i.on_server_roundtrip_ends for i in instances]) ), - roundtrips=list(itertools.chain(*[i.roundtrips for i in instances])), - roundtrip_start=min(i.roundtrip_start for i in instances), - roundtrip_end=max(i.roundtrip_end for i in instances), - ends=list(itertools.chain(*[i.ends for i in instances])), ) return self._aggregate @@ -217,7 +313,6 @@ class Reporter: def __init__( self, - scan_id: str, global_start_time: float, nsides_dict: NSidesDict, n_posvar: int, @@ -228,8 +323,6 @@ def __init__( ) -> None: """ Arguments: - `scan_id` - - the unique id of this scan `global_start_time` - the start time (epoch) of the entire scan `nsides_dict` @@ -254,7 +347,6 @@ def __init__( self.is_event_scan_done = False self.predictive_scanning_threshold = predictive_scanning_threshold - self.scan_id = scan_id self.global_start = global_start_time self.nsides_dict = nsides_dict @@ -265,7 +357,7 @@ def __init__( self._n_sent_by_nside: Dict[int, int] = {} - if not cfg.ENV.SKYSCAN_SKYDRIVER_ADDRESS: + if not ENV.SKYSCAN_SKYDRIVER_ADDRESS: self.skydriver_rc_nonurgent: Optional[RestClient] = None self.skydriver_rc_urgent: Optional[RestClient] = None else: @@ -314,26 +406,28 @@ async def precomputing_report(self) -> None: async def record_reco( self, nside: int, - runtime: float, - roundtrip_start: float, - roundtrip_end: float, + on_worker_runtime: float, + on_server_roundtrip_start: float, + on_server_roundtrip_end: float, ) -> None: """Send reports/logs/plots if needed.""" self._check_call_order(self.record_reco) if not self.time_of_first_reco_start_on_client: - # timeline: roundtrip_start -> pre-reco queue time -> (runtime) -> post-reco queue time -> roundtrip_end + # timeline: on_server_roundtrip_start -> pre-reco queue time -> (runtime) -> post-reco queue time -> on_server_roundtrip_end # since worker nodes need to startup & a pixel may fail several times before being reco'd, # we know "pre-reco queue time" >>> "post-reco queue time" # if we assume "post-reco queue time" ~= 0.0, then the reco started here: - self.time_of_first_reco_start_on_client = roundtrip_end - (runtime + 0.0) + self.time_of_first_reco_start_on_client = on_server_roundtrip_end - ( + on_worker_runtime + 0.0 + ) # update stats nside_ct = self.worker_stats_collection.update( nside, - runtime, - roundtrip_start, - roundtrip_end, + on_worker_runtime, + on_server_roundtrip_start, + on_server_roundtrip_end, ) # make report(s) @@ -354,8 +448,7 @@ async def make_reports_if_needed( # check if we need to send a report to the logger current_time = time.time() if bypass_timers or ( - current_time - self.last_time_reported - > cfg.ENV.SKYSCAN_PROGRESS_INTERVAL_SEC + current_time - self.last_time_reported > ENV.SKYSCAN_PROGRESS_INTERVAL_SEC ): self.last_time_reported = current_time if self.worker_stats_collection.total_ct == 0: @@ -363,7 +456,7 @@ async def make_reports_if_needed( else: epilogue_msg = ( f"I will report back again in " - f"{cfg.ENV.SKYSCAN_PROGRESS_INTERVAL_SEC} seconds if I have an update." + f"{ENV.SKYSCAN_PROGRESS_INTERVAL_SEC} seconds if I have an update." ) await self._send_progress(summary_msg, epilogue_msg) @@ -371,7 +464,7 @@ async def make_reports_if_needed( current_time = time.time() if bypass_timers or ( current_time - self.last_time_reported_skymap - > cfg.ENV.SKYSCAN_RESULT_INTERVAL_SEC + > ENV.SKYSCAN_RESULT_INTERVAL_SEC ): self.last_time_reported_skymap = current_time await self._send_result() @@ -465,28 +558,25 @@ def _get_processing_progress(self) -> StrDict: proc_stats["finished"] = True else: # MAKE PREDICTIONS - # NOTE: this is a simple mean, may want to visit more sophisticated methods - secs_predicted = elapsed_reco_server_walltime / ( - self.worker_stats_collection.total_ct / self.predicted_total_recos() + n_recos_left = ( + self.predicted_total_recos() - self.worker_stats_collection.total_ct + ) + time_left = ( # this uses a moving window average + self.worker_stats_collection.on_server_recent_sec_per_reco_rate() + * n_recos_left # (sec/recos) * (recos/1) -> sec ) proc_stats["predictions"] = { - "time left": str( - dt.timedelta( - seconds=int(secs_predicted - elapsed_reco_server_walltime) - ) - ), + "time left": str(dt.timedelta(seconds=int(time_left))), "total runtime at finish": str( - dt.timedelta(seconds=int(secs_predicted + startup_runtime)) - ), - "total # of reconstructions": self.predicted_total_recos(), - "end": str( - dt.datetime.fromtimestamp( - int( - time.time() - + (secs_predicted - elapsed_reco_server_walltime) + dt.timedelta( + seconds=int( + startup_runtime + elapsed_reco_server_walltime + time_left ) ) ), + "total # of reconstructions": self.predicted_total_recos(), + "end": str(dt.datetime.fromtimestamp(int(time.time() + time_left))), + "technique": self.worker_stats_collection.get_runtime_prediction_technique(), } return proc_stats @@ -545,7 +635,9 @@ def pixels_percent_string(nside: int) -> str: index = math.ceil(predicted_total * i) - 1 name = str(i) try: - when = self.worker_stats_collection.aggregate.ends[index] + when = self.worker_stats_collection.aggregate.on_server_roundtrip_ends[ + index + ] timeline[name] = str( dt.timedelta(seconds=int(when - self.global_start)) ) @@ -590,7 +682,7 @@ async def _send_progress( "last_updated": str(dt.datetime.fromtimestamp(int(time.time()))), } scan_metadata = { - "scan_id": self.scan_id, + "scan_id": ENV.SKYSCAN_SKYDRIVER_SCAN_ID, "nside_progression": self.nside_progression, "position_variations": self.n_posvar, } @@ -604,7 +696,11 @@ async def _send_progress( } # skydriver - sd_args = dict(method="PATCH", path=f"/scan/{self.scan_id}/manifest", args=body) + sd_args = dict( + method="PATCH", + path=f"/scan/{ENV.SKYSCAN_SKYDRIVER_SCAN_ID}/manifest", + args=body, + ) if not self.is_event_scan_done and self.skydriver_rc_nonurgent: await nonurgent_request(self.skydriver_rc_nonurgent, sd_args) elif self.is_event_scan_done and self.skydriver_rc_urgent: @@ -628,7 +724,11 @@ async def _send_result(self) -> SkyScanResult: # skydriver body = {"skyscan_result": serialized, "is_final": self.is_event_scan_done} - sd_args = dict(method="PUT", path=f"/scan/{self.scan_id}/result", args=body) + sd_args = dict( + method="PUT", + path=f"/scan/{ENV.SKYSCAN_SKYDRIVER_SCAN_ID}/result", + args=body, + ) if not self.is_event_scan_done and self.skydriver_rc_nonurgent: await nonurgent_request(self.skydriver_rc_nonurgent, sd_args) elif self.is_event_scan_done and self.skydriver_rc_urgent: diff --git a/skymap_scanner/server/start_scan.py b/skymap_scanner/server/start_scan.py index 46e7ffd86..ad177182d 100644 --- a/skymap_scanner/server/start_scan.py +++ b/skymap_scanner/server/start_scan.py @@ -1,7 +1,6 @@ """The Skymap Scanner Server.""" # pylint: disable=invalid-name,import-error -# fmt:quotes-ok import argparse import asyncio @@ -25,6 +24,7 @@ from skyreader import EventMetadata from wipac_dev_tools import argparse_tools, logging_tools +from . import ENV from .collector import Collector, ExtraRecoPixelVariationException from .pixels import choose_pixels_to_reconstruct from .reporter import Reporter @@ -32,6 +32,7 @@ NSideProgression, fetch_event_contents_from_file, fetch_event_contents_from_skydriver, + get_mqclient_connections, kill_switch_check_from_skydriver, ) from .. import config as cfg, recos @@ -317,7 +318,6 @@ def _gen_pframes( # fmt: on async def scan( - scan_id: str, reco_algo: str, event_metadata: EventMetadata, nsides_dict: Optional[NSidesDict], @@ -350,7 +350,6 @@ async def scan( ) reporter = Reporter( - scan_id, global_start_time, nsides_dict, len(pixeler.pos_variations), @@ -518,27 +517,11 @@ async def _serve_and_collect( def write_startup_json( client_startup_json: Path, - event_metadata: EventMetadata, - nside_progression: NSideProgression, baseline_GCD_file: str, GCDQp_packet: List[icetray.I3Frame], -) -> str: - """Write startup JSON file for client-spawning. - - Return the scan_id string. - """ - if cfg.ENV.SKYSCAN_SKYDRIVER_SCAN_ID: - scan_id = cfg.ENV.SKYSCAN_SKYDRIVER_SCAN_ID - else: - scan_id = ( - f"{event_metadata.event_id}-" - f"{'-'.join(f'{n}:{x}' for n,x in nside_progression.items())}-" - f"{int(time.time())}" - ) - +) -> None: + """Write startup JSON file for client-spawning.""" json_dict = { - "scan_id": scan_id, - "mq_basename": scan_id, "baseline_GCD_file": baseline_GCD_file, "GCDQp_packet": json.loads( full_event_followup.frame_packet_to_i3live_json( @@ -553,8 +536,6 @@ def write_startup_json( f"Startup JSON: {client_startup_json} ({client_startup_json.stat().st_size} bytes)" ) - return json_dict["scan_id"] # type: ignore[no-any-return] - def main() -> None: """Get command-line arguments and perform event scan via clients.""" @@ -563,6 +544,14 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]: nside, ext = val.split(":") return int(nside), int(ext) + def _mkdir_if_not_exists(val: str, is_file: bool = False) -> Path: + fpath = Path(val) + if is_file: + fpath.parent.mkdir(parents=True, exist_ok=True) + else: + fpath.mkdir(parents=True, exist_ok=True) + return fpath + parser = argparse.ArgumentParser( description=( "Start up server to serve pixels to clients and save pixel-reconstructions " @@ -577,33 +566,21 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]: "--client-startup-json", required=True, help="The filepath to save the JSON needed to spawn clients (the parent directory must already exist)", - type=lambda x: argparse_tools.validate_arg( - Path(x), - Path(x).parent.is_dir(), - NotADirectoryError(Path(x).parent), - ), + type=lambda x: _mkdir_if_not_exists(x, is_file=True), ) parser.add_argument( "-c", "--cache-dir", required=True, help="The cache directory to use", - type=lambda x: argparse_tools.validate_arg( - Path(x), - Path(x).is_dir(), - NotADirectoryError(x), - ), + type=lambda x: _mkdir_if_not_exists(x), ) parser.add_argument( "-o", "--output-dir", default=None, help="The directory to write out the .npz file", - type=lambda x: argparse_tools.validate_arg( - Path(x), - Path(x).is_dir(), - NotADirectoryError(x), - ), + type=lambda x: _mkdir_if_not_exists(x), ) # "physics" args @@ -680,7 +657,16 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]: ) args = parser.parse_args() - cfg.configure_loggers() + logging_tools.set_level( + ENV.SKYSCAN_LOG, # type: ignore[arg-type] + first_party_loggers=__name__.split(".", maxsplit=1)[0], + third_party_level=ENV.SKYSCAN_LOG_THIRD_PARTY, # type: ignore[arg-type] + future_third_parties=["google", "pika"], + specialty_loggers={ + mq.queue.LOGGER: ENV.SKYSCAN_MQ_CLIENT_LOG, # type: ignore[dict-item] + }, + formatter=logging_tools.WIPACDevToolsFormatter(), + ) logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") # nsides -- the class needs the whole list to validate, so this logic can't be outsourced to argparse's `type=` @@ -691,13 +677,13 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]: raise NotADirectoryError(args.gcd_dir) # check output status - if not cfg.ENV.SKYSCAN_SKYDRIVER_ADDRESS and not args.output_dir: + if not ENV.SKYSCAN_SKYDRIVER_ADDRESS and not args.output_dir: raise RuntimeError( "Must include either --output-dir or SKYSCAN_SKYDRIVER_ADDRESS (env var), " "otherwise you won't see your results!" ) # read event file - if cfg.ENV.SKYSCAN_SKYDRIVER_ADDRESS: + if ENV.SKYSCAN_SKYDRIVER_ADDRESS: event_contents = asyncio.run(fetch_event_contents_from_skydriver()) else: event_contents = fetch_event_contents_from_file(args.event_file) @@ -714,30 +700,15 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]: # write startup files for client-spawning LOGGER.info("Writing startup JSON...") - scan_id = write_startup_json( + write_startup_json( args.client_startup_json, - event_metadata, - args.nside_progression, state_dict[cfg.STATEDICT_BASELINE_GCD_FILE], state_dict[cfg.STATEDICT_GCDQP_PACKET], ) # make mq connections LOGGER.info("Making MQClient queue connections...") - to_clients_queue = mq.Queue( - cfg.ENV.SKYSCAN_BROKER_CLIENT, - address=cfg.ENV.SKYSCAN_BROKER_ADDRESS, - name=f"to-clients-{scan_id}", - auth_token=cfg.ENV.SKYSCAN_BROKER_AUTH, - timeout=cfg.ENV.SKYSCAN_MQ_TIMEOUT_TO_CLIENTS, - ) - from_clients_queue = mq.Queue( - cfg.ENV.SKYSCAN_BROKER_CLIENT, - address=cfg.ENV.SKYSCAN_BROKER_ADDRESS, - name=f"from-clients-{scan_id}", - auth_token=cfg.ENV.SKYSCAN_BROKER_AUTH, - timeout=cfg.ENV.SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS, - ) + to_clients_queue, from_clients_queue = get_mqclient_connections() # create background thread for checking whether to abort -- fire & forget threading.Thread( @@ -749,7 +720,6 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]: # go! asyncio.run( scan( - scan_id=scan_id, reco_algo=args.reco_algo, event_metadata=event_metadata, nsides_dict=state_dict.get(cfg.STATEDICT_NSIDES), diff --git a/skymap_scanner/server/utils.py b/skymap_scanner/server/utils.py index e93fdb185..e16543cf6 100644 --- a/skymap_scanner/server/utils.py +++ b/skymap_scanner/server/utils.py @@ -1,6 +1,5 @@ """Server-specific utils.""" - import asyncio import json import logging @@ -12,27 +11,54 @@ from typing import Any, Dict, List, Optional, Tuple import cachetools.func +import mqclient as mq from rest_tools.client import CalcRetryFromWaittimeMax, RestClient -from .. import config as cfg +from . import ENV LOGGER = logging.getLogger(__name__) +######################################################################################## + + +def get_mqclient_connections() -> tuple[mq.Queue, mq.Queue]: + """Establish connections to message queues.""" + to_clients_queue = mq.Queue( + ENV.SKYSCAN_MQ_TOCLIENT_BROKER_TYPE, + address=ENV.SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS, + name=ENV.SKYSCAN_MQ_TOCLIENT, + auth_token=ENV.SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN, + # timeout=-1, # NOTE: this mq only sends messages so no timeout needed + ) + from_clients_queue = mq.Queue( + ENV.SKYSCAN_MQ_FROMCLIENT_BROKER_TYPE, + address=ENV.SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS, + name=ENV.SKYSCAN_MQ_FROMCLIENT, + auth_token=ENV.SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN, + timeout=ENV.SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS, + ) + + return to_clients_queue, from_clients_queue + + +######################################################################################## + + def connect_to_skydriver(urgent: bool) -> RestClient: """Get REST client for SkyDriver depending on the urgency.""" if urgent: return RestClient( - cfg.ENV.SKYSCAN_SKYDRIVER_ADDRESS, - token=cfg.ENV.SKYSCAN_SKYDRIVER_AUTH, + ENV.SKYSCAN_SKYDRIVER_ADDRESS, + token=ENV.SKYSCAN_SKYDRIVER_AUTH, timeout=60.0, retries=CalcRetryFromWaittimeMax(waittime_max=1 * 60 * 60), # backoff_factor=0.3, ) else: return RestClient( - cfg.ENV.SKYSCAN_SKYDRIVER_ADDRESS, - token=cfg.ENV.SKYSCAN_SKYDRIVER_AUTH, + ENV.SKYSCAN_SKYDRIVER_ADDRESS, + token=ENV.SKYSCAN_SKYDRIVER_AUTH, timeout=10.0, retries=1, # backoff_factor=0.3, @@ -50,7 +76,7 @@ async def nonurgent_request(rc: RestClient, args: dict[str, Any]) -> Any: async def kill_switch_check_from_skydriver() -> None: """Routinely check SkyDriver whether to continue the scan.""" - if not cfg.ENV.SKYSCAN_SKYDRIVER_ADDRESS: + if not ENV.SKYSCAN_SKYDRIVER_ADDRESS: return logger = logging.getLogger("skyscan.kill_switch") @@ -58,10 +84,10 @@ async def kill_switch_check_from_skydriver() -> None: skydriver_rc = connect_to_skydriver(urgent=False) while True: - await asyncio.sleep(cfg.ENV.SKYSCAN_KILL_SWITCH_CHECK_INTERVAL) + await asyncio.sleep(ENV.SKYSCAN_KILL_SWITCH_CHECK_INTERVAL) status = await skydriver_rc.request( - "GET", f"/scan/{cfg.ENV.SKYSCAN_SKYDRIVER_SCAN_ID}/status" + "GET", f"/scan/{ENV.SKYSCAN_SKYDRIVER_SCAN_ID}/status" ) if status["scan_state"].startswith("STOPPED__"): @@ -71,12 +97,15 @@ async def kill_switch_check_from_skydriver() -> None: os.kill(os.getpid(), signal.SIGINT) # NOTE - sys.exit only exits thread +######################################################################################## + + async def fetch_event_contents_from_skydriver() -> Any: """Fetch event contents from SkyDriver.""" skydriver_rc = connect_to_skydriver(urgent=True) manifest = await skydriver_rc.request( - "GET", f"/scan/{cfg.ENV.SKYSCAN_SKYDRIVER_SCAN_ID}/manifest" + "GET", f"/scan/{ENV.SKYSCAN_SKYDRIVER_SCAN_ID}/manifest" ) LOGGER.info("Fetched event contents from SkyDriver") return manifest["event_i3live_json_dict"] @@ -102,6 +131,9 @@ def fetch_event_contents_from_file(event_file: Optional[Path]) -> dict: return data +######################################################################################## + + def _is_pow_of_two(intval: int) -> bool: # I know, I know, no one likes bit shifting... buuuut... return isinstance(intval, int) and (intval > 0) and (intval & (intval - 1) == 0) diff --git a/skymap_scanner/utils/pixel_classes.py b/skymap_scanner/utils/pixel_classes.py index 36d48c707..bb3fe9e08 100644 --- a/skymap_scanner/utils/pixel_classes.py +++ b/skymap_scanner/utils/pixel_classes.py @@ -1,6 +1,5 @@ """Classes for representing a pixel-like things in various forms.""" - import dataclasses as dc import time from typing import Any, Dict, Tuple diff --git a/tests/compare_scan_results.py b/tests/compare_scan_results.py index 775c35f56..cfb5ff988 100644 --- a/tests/compare_scan_results.py +++ b/tests/compare_scan_results.py @@ -101,8 +101,10 @@ def compare_then_exit( logger.info("Actual vs Expected...") if equal or close: + logger.info("Good.") sys.exit(0) else: + logger.error("Failed!") if do_assert: assert False sys.exit(1) diff --git a/tests/data/reco_pixel_single/conversion_scripts/convert_out_pkl.py b/tests/data/reco_pixel_single/conversion_scripts/convert_out_pkl.py new file mode 100644 index 000000000..3ae809443 --- /dev/null +++ b/tests/data/reco_pixel_single/conversion_scripts/convert_out_pkl.py @@ -0,0 +1,31 @@ +"""A script to help when a pickle format changes. + +The should be used as a one-off script & updated as needed. +""" + +import base64 +import dataclasses as dc # noqa: F401 +import json +import pickle +import sys +from pathlib import Path + +from icecube.dataclasses import I3Position # type: ignore[import] # noqa: F401 +from icecube.icetray import I3Frame # type: ignore[import] # noqa: F401 + +OUT_PKL_FPATH = sys.argv[1] # "/local/test-data/1660761104.474899.out.pkl" +with open(OUT_PKL_FPATH, "rb") as f: + msg = pickle.load(f) + print(msg) +with open(Path(OUT_PKL_FPATH).parent / "out.json", "w") as f: + print(msg.items()) + json.dump( + { + "reco_pixel_variation_pkl_b64": base64.b64encode( + pickle.dumps(msg["reco_pixel_variation"]) + ).decode(), + "runtime": msg["runtime"], + }, + f, + indent=4, + ) diff --git a/tests/env-vars.sh b/tests/env-vars.sh index 7db2b4f76..66c3d374d 100755 --- a/tests/env-vars.sh +++ b/tests/env-vars.sh @@ -1,25 +1,51 @@ #!/bin/bash -set -ex # file is sourced so turn off at end +set -ex # file is sourced so turn off at end -# export SKYSCAN_CACHE_DIR=$PWD/cache-dir -- rely on user value -# export SKYSCAN_OUTPUT_DIR=$PWD/output-dir -- rely on user value -export SKYSCAN_BROKER_CLIENT=${SKYSCAN_BROKER_CLIENT:-"rabbitmq"} -# note=auth env vars are in job(s) +######################################################################## +# +# Export many environment variables needed to run a local scan +# +# NOTE: source this file +# +######################################################################## -export EWMS_PILOT_TASK_TIMEOUT=${EWMS_PILOT_TASK_TIMEOUT:-600} -export EWMS_PILOT_DUMP_SUBPROC_OUTPUT=${EWMS_PILOT_DUMP_SUBPROC_OUTPUT:-"True"} +export SKYSCAN_SKYDRIVER_SCAN_ID=$(uuidgen) -# export SKYSCAN_DEBUG_DIR=debug-pkl-dir -- rely on user value -export SKYSCAN_MQ_TIMEOUT_TO_CLIENTS=${SKYSCAN_MQ_TIMEOUT_TO_CLIENTS:-5} -export SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS=${SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS:-600} -# export SKYSCAN_MQ_CLIENT_TIMEOUT_WAIT_FOR_FIRST_MESSAGE=0 +# to-client queue +# -> server +export SKYSCAN_MQ_TOCLIENT="to-clients-$SKYSCAN_SKYDRIVER_SCAN_ID" +export SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN=${SKYSCAN_MQ_TOCLIENT_AUTH_TOKEN:-""} # note: set in ci job +export SKYSCAN_MQ_TOCLIENT_BROKER_TYPE=${SKYSCAN_MQ_TOCLIENT_BROKER_TYPE:-"rabbitmq"} +export SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS=${SKYSCAN_MQ_TOCLIENT_BROKER_ADDRESS:-""} # note: set in ci job +# -> worker/client/pilot +# note: set in launch_worker.sh +# +# from-client queue +# -> server +export SKYSCAN_MQ_FROMCLIENT="from-clients-$SKYSCAN_SKYDRIVER_SCAN_ID" +export SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN=${SKYSCAN_MQ_FROMCLIENT_AUTH_TOKEN:-""} # note: set in ci job +export SKYSCAN_MQ_FROMCLIENT_BROKER_TYPE=${SKYSCAN_MQ_FROMCLIENT_BROKER_TYPE:-"rabbitmq"} +export SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS=${SKYSCAN_MQ_FROMCLIENT_BROKER_ADDRESS:-""} # note: set in ci job +# -> worker/client/pilot +# note: set in launch_worker.sh -export SKYSCAN_DOCKER_PULL_ALWAYS=${SKYSCAN_DOCKER_PULL_ALWAYS:-0} -export SKYSCAN_DOCKER_IMAGE_TAG=${SKYSCAN_DOCKER_IMAGE_TAG:-"local"} -export SKYSCAN_MINI_TEST=${SKYSCAN_MINI_TEST:-'yes'} +# timeouts -- these are listed in order of occurrence +# -> worker/client/pilot +export EWMS_PILOT_TIMEOUT_QUEUE_WAIT_FOR_FIRST_MESSAGE=${EWMS_PILOT_TIMEOUT_QUEUE_WAIT_FOR_FIRST_MESSAGE:-60} +export EWMS_PILOT_TIMEOUT_QUEUE_INCOMING=${EWMS_PILOT_TIMEOUT_QUEUE_INCOMING:-5} +# export EWMS_PILOT_TASK_TIMEOUT -> this is very specific to the task, if it's wanted set it in a place where we now inputs +export EWMS_PILOT_STOP_LISTENING_ON_TASK_ERROR=${EWMS_PILOT_STOP_LISTENING_ON_TASK_ERROR:-"True"} +export EWMS_PILOT_OKAY_ERRORS=${EWMS_PILOT_OKAY_ERRORS:-"TimeoutError"} # this is a space-delimited list +# ^^^ in production, we run O(1k) cpus so a slow reco will be delivered to a new cpu, here we have to be more conservative. So, let the local workers retry the reco +# ^^^ if EWMS_PILOT_STOP_LISTENING_ON_TASK_ERROR=false (or similar: no, 0, etc.), then this var is ignored +# -> server +export SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS=${SKYSCAN_MQ_TIMEOUT_FROM_CLIENTS:-$((60 * 10))} # just need a big value -- only used to detect MIA workers (it isn't important in a successful scan) +# other/misc +# -> worker/client/pilot +export _SKYSCAN_CI_MINI_TEST=${_SKYSCAN_CI_MINI_TEST:-'yes'} export SKYSCAN_LOG=${SKYSCAN_LOG:-"DEBUG"} export SKYSCAN_LOG_THIRD_PARTY=${SKYSCAN_LOG_THIRD_PARTY:-"INFO"} +# -> worker/client/pilot +export EWMS_PILOT_KEEP_ALL_TASK_FILES="True" # don't delete stderr/stdout files -export CLIENT_STARTER_WAIT_FOR_STARTUP_JSON=${CLIENT_STARTER_WAIT_FOR_STARTUP_JSON:-60} - -set +ex # file is sourced so turn off +set +ex # file is sourced so turn off