Skip to content

Commit

Permalink
Wrote tool to reorganise existing image beam locations in mongo and s…
Browse files Browse the repository at this point in the history
…tore without version in image name key. Also modified API/importer code to store beam locations without the version in image name going forward
  • Loading branch information
Peter Nemere committed Jan 8, 2025
1 parent dc80c3c commit a00cefa
Show file tree
Hide file tree
Showing 8 changed files with 637 additions and 15 deletions.
12 changes: 8 additions & 4 deletions api/coreg/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,13 @@ func importNewImage(jobId string, imageUrl string, baseRTT string, marsViewerExp

// Also insert a blank entry for beam locations for this image, as we're expecting to import scans for it
coll = hctx.Svcs.MongoDB.Collection(dbCollections.ImageBeamLocationsName)
beamImageName := wsHelpers.GetImageNameSansVersion(scanImage.ImagePath)
beamLocs := &protos.ImageLocations{
ImageName: scanImage.ImagePath,
ImageName: beamImageName,
LocationPerScan: []*protos.ImageLocationsForScan{},
}

beamResult, err := coll.UpdateByID(ctx, scanImage.ImagePath, bson.D{{Key: "$set", Value: beamLocs}}, opt)
beamResult, err := coll.UpdateByID(ctx, beamImageName, bson.D{{Key: "$set", Value: beamLocs}}, opt)
if err != nil {
return "", nil, nil, err
}
Expand All @@ -355,7 +356,7 @@ func readExistingLocationsForImage(jobId string, image string, hctx wsHelpers.Ha

// We're adding to the beam locations for the base image! First, read the base image beam locations structure as there should
// already be one!
filter := bson.M{"_id": image}
filter := bson.M{"_id": wsHelpers.GetImageNameSansVersion(image)}
baseImageBeamsResult := coll.FindOne(ctx, filter)

if baseImageBeamsResult.Err() != nil {
Expand All @@ -372,6 +373,9 @@ func readExistingLocationsForImage(jobId string, image string, hctx wsHelpers.Ha
return baseImageBeams, nil
}

// WARNING: This may have broken since image beam locations are now stored without version info. Because JPL has put the brakes on their side of
//
// the coreg feature, there's no real way to test this out now
func importWarpedToBase(jobId string, baseImage string, ourBaseImageItem *protos.ScanImage, baseImageBeams *protos.ImageLocations, baseRtt string, coregResult *CoregJobResult, marsViewerExport *protos.MarsViewerExport, hctx wsHelpers.HandlerContext) error {
ctx := context.TODO()
coll := hctx.Svcs.MongoDB.Collection(dbCollections.ImageBeamLocationsName)
Expand Down Expand Up @@ -465,7 +469,7 @@ func importWarpedToBase(jobId string, baseImage string, ourBaseImageItem *protos
}

// TODO: Transaction for these 2?
filter := bson.M{"_id": baseImage}
filter := bson.M{"_id": wsHelpers.GetImageNameSansVersion(baseImage)} // See WARNING at top of function
result, err := coll.ReplaceOne(ctx, filter, &baseImageBeams, options.Replace())
if err != nil {
return fmt.Errorf("Coreg import job %v failed to save new beam locations: %v", jobId, err)
Expand Down
17 changes: 10 additions & 7 deletions api/dataimport/internal/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,16 @@ func (s *PIXLISEDataSaver) Save(
return fmt.Errorf("Failed to delete images pre scan import for: %v. Error: %v", data.DatasetID, err)
}

coll = db.Collection(dbCollections.ImageBeamLocationsName)
resBeam, err := coll.DeleteMany(context.TODO(), bson.M{"_id": bson.M{"$in": imageIds}})
if err != nil {
return fmt.Errorf("Failed to delete images pre scan import for: %v. Error: %v", data.DatasetID, err)
}

jobLog.Infof("Deleted %d images, %d image beam locations pre importing scan: %v...", res.DeletedCount, resBeam.DeletedCount, data.DatasetID)
// NOTE: We used to delete beam locations but this no longer makes sense because they're not stored by straight image names, but image names stripped
// of version information. A given entry may hold beams for multiple images, so we just leave it until this causes headaches one day?
/*
coll = db.Collection(dbCollections.ImageBeamLocationsName)
resBeam, err := coll.DeleteMany(context.TODO(), bson.M{"_id": bson.M{"$in": imageIds}})
if err != nil {
return fmt.Errorf("Failed to delete image beam locations pre scan import for: %v. Error: %v", data.DatasetID, err)
}
*/
jobLog.Infof("Deleted %d images pre importing scan: %v...", res.DeletedCount, data.DatasetID)

// Delete from ownership, scan default images and scan itself
coll = db.Collection(dbCollections.ScanDefaultImagesName)
Expand Down
6 changes: 3 additions & 3 deletions api/endpoints/Image.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func GetImage(params apiRouter.ApiHandlerStreamParams) (*s3.GetObjectOutput, str

// Original file exists, generate this modified copy and cache it back in S3 for the rest of this
// function to find!
err = generateImageVersion(genS3Path, minWidthPx, showLocations, s3Path, params.Svcs)
err = generateImageVersion(requestedFileName, genS3Path, minWidthPx, showLocations, s3Path, params.Svcs)
}

if err != nil {
Expand Down Expand Up @@ -251,7 +251,7 @@ func GetImage(params apiRouter.ApiHandlerStreamParams) (*s3.GetObjectOutput, str

const imageSizeStepPx = 200

func generateImageVersion(s3Path string, minWidthPx int, showLocations bool, finalFilePath string, svcs *services.APIServices) error {
func generateImageVersion(imageName string, s3Path string, minWidthPx int, showLocations bool, finalFilePath string, svcs *services.APIServices) error {
if minWidthPx <= 0 {
return fmt.Errorf("generateImageVersion minWidthPx too small: %v", minWidthPx)
}
Expand All @@ -271,7 +271,7 @@ func generateImageVersion(s3Path string, minWidthPx int, showLocations bool, fin
ctx := context.TODO()
coll := svcs.MongoDB.Collection(dbCollections.ImageBeamLocationsName)

filter := bson.M{"_id": path.Base(s3Path)}
filter := bson.M{"_id": wsHelpers.GetImageNameSansVersion(imageName)}
result := coll.FindOne(ctx, filter)

if result.Err() != nil {
Expand Down
2 changes: 2 additions & 0 deletions api/ws/handlers/image-beam-location.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func HandleImageBeamLocationsReq(req *protos.ImageBeamLocationsReq, hctx wsHelpe
if img.MatchInfo != nil && len(img.MatchInfo.BeamImageFileName) > 0 {
imageForBeamRead = img.MatchInfo.BeamImageFileName
}
imageForBeamRead = wsHelpers.GetImageNameSansVersion(imageForBeamRead)

coll = hctx.Svcs.MongoDB.Collection(dbCollections.ImageBeamLocationsName)

Expand Down Expand Up @@ -197,6 +198,7 @@ func HandleImageBeamLocationVersionsReq(req *protos.ImageBeamLocationVersionsReq
if img.MatchInfo != nil && len(img.MatchInfo.BeamImageFileName) > 0 {
imageForBeamRead = img.MatchInfo.BeamImageFileName
}
imageForBeamRead = wsHelpers.GetImageNameSansVersion(imageForBeamRead)

coll = hctx.Svcs.MongoDB.Collection(dbCollections.ImageBeamLocationsName)
vers := map[string]*protos.ImageBeamLocationVersionsResp_AvailableVersions{}
Expand Down
15 changes: 14 additions & 1 deletion api/ws/wsHelpers/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pixlise/core/v4/api/dbCollections"
"github.com/pixlise/core/v4/api/services"
"github.com/pixlise/core/v4/api/ws/wsHelpers"
"github.com/pixlise/core/v4/core/gdsfilename"
protos "github.com/pixlise/core/v4/generated-protos"
"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -40,7 +41,7 @@ func GenerateIJs(imageName string, scanId string, instrument protos.ScanInstrume
}

locs := protos.ImageLocations{
ImageName: imageName,
ImageName: wsHelpers.GetImageNameSansVersion(imageName),
LocationPerScan: []*protos.ImageLocationsForScan{{
ScanId: scanId,
//BeamVersion: 1,
Expand Down Expand Up @@ -153,3 +154,15 @@ func GetDBImageFilter(imageName string) bson.D {

return filter
}

func GetImageNameSansVersion(imageName string) string {
meta, err := gdsfilename.ParseFileName(imageName)
if err != nil {
// It's likely not a PDS style file name
return imageName
}

// Clear the image version
meta.SetVersionStr("__")
return meta.ToString(true, true)
}
5 changes: 5 additions & 0 deletions core/beamLocation/experimentFileToDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/pixlise/core/v4/api/dbCollections"
"github.com/pixlise/core/v4/api/ws/wsHelpers"
"github.com/pixlise/core/v4/core/logger"
protos "github.com/pixlise/core/v4/generated-protos"
"go.mongodb.org/mongo-driver/bson"
Expand All @@ -13,6 +14,10 @@ import (
)

func ImportBeamLocationToDB(imgName string, instrument protos.ScanInstrument, forScanId string, beamVersion uint32, ijIndex int, fromExprPB *protos.Experiment, db *mongo.Database, logger logger.ILogger) error {
// Make sure anything calling us using a PDS style name has the version removed, because beam locations aren't stored with
// the version name
imgName = wsHelpers.GetImageNameSansVersion(imgName)

imagesColl := db.Collection(dbCollections.ImageBeamLocationsName)
ctx := context.TODO()
filter := bson.M{"_id": imgName}
Expand Down
208 changes: 208 additions & 0 deletions internal/cmd-line-tools/beam-geom-image-name-fix/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"sort"
"time"

"github.com/pixlise/core/v4/api/dbCollections"
"github.com/pixlise/core/v4/core/awsutil"
"github.com/pixlise/core/v4/core/gdsfilename"
"github.com/pixlise/core/v4/core/logger"
"github.com/pixlise/core/v4/core/mongoDBConnection"
protos "github.com/pixlise/core/v4/generated-protos"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

var t0 = time.Now().UnixMilli()

func main() {
fmt.Printf("Started: %v\n", time.Now().String())

var mongoSecret string
var dbName string

flag.StringVar(&mongoSecret, "mongoSecret", "", "Mongo DB secret")
flag.StringVar(&dbName, "dbName", "", "DB name we're importing to")

flag.Parse()

// Check they're not empty
checkNotEmpty := []string{
dbName,
}
checkNotEmptyName := []string{
"dbName",
}
for c, s := range checkNotEmpty {
if len(s) <= 0 {
log.Fatalf("Parameter: %v was empty", checkNotEmptyName[c])
}
}

// Get a session for the bucket region
sess, err := awsutil.GetSession()
if err != nil {
log.Fatalf("Failed to create AWS session. Error: %v", err)
}

// Init logger - this used to be local=stdout, cloud env=cloudwatch, but we now write all logs to stdout
iLog := &logger.StdOutLogger{}
iLog.SetLogLevel(logger.LogInfo)

// Connect to mongo
mongoClient, _, err := mongoDBConnection.Connect(sess, mongoSecret, iLog)
if err != nil {
fatalError(err)
}

// Destination DB is the new pixlise one
destDB := mongoClient.Database(dbName) //mongoDBConnection.GetDatabaseName("pixlise", destEnvName))

// Verify the dataset is valid
ctx := context.TODO()
coll := destDB.Collection(dbCollections.ImageBeamLocationsName)
opt := options.Find().SetProjection(bson.D{
{Key: "id", Value: true},
})

result, err := coll.Find(ctx, bson.M{}, opt)
if err != nil {
fatalError(err)
}

imageBeams := []*protos.ImageLocations{}
if err := result.All(ctx, &imageBeams); err != nil {
fatalError(result.Err())
}

// We want to read the images in alphabetical order to ensure we read image versions in order
imageNames := []string{}
for _, img := range imageBeams {
imageNames = append(imageNames, img.ImageName)
}

sort.Strings(imageNames)

newColl := destDB.Collection(dbCollections.ImageBeamLocationsName + "VersionFree")
err = newColl.Drop(ctx)
if err != nil {
fatalError(err)
}

// For each ID, if it's NOT a PDS encoded file name, store it
// If it is PDS encoded file name we check if we have a version-less one stored already
for _, origImageName := range imageNames {
imgReadResult := coll.FindOne(ctx, bson.D{{Key: "_id", Value: origImageName}}, options.FindOne())
if imgReadResult.Err() != nil {
fatalError(imgReadResult.Err())
}

img := &protos.ImageLocations{}
if err := imgReadResult.Decode(img); err != nil {
log.Fatalln(err)
}

meta, err := gdsfilename.ParseFileName(img.ImageName)
if err == nil {
// Snip off the version and see if this is stored already
meta.SetVersionStr("__")

if len(meta.FilePath) <= 0 {
log.Fatalf("Expected path in image name: %v\n", img.ImageName)
}

img.ImageName = meta.ToString(true, true)

// If there is an existing one, we have to update it, otherwise just write this as the record
existingImgRes := newColl.FindOne(ctx, bson.D{{Key: "_id", Value: img.ImageName}})
if existingImgRes.Err() != nil {
if existingImgRes.Err() != mongo.ErrNoDocuments {
log.Fatalf("%v: %v", origImageName, existingImgRes.Err())
}

// Fall through: Let it be written as is
} else {
// There's an existing one, update that one with any new beams we may have stored
existingImg := &protos.ImageLocations{}
if err = existingImgRes.Decode(existingImg); err != nil {
log.Fatalf("%v: %v", origImageName, err)
}

for _, loc := range img.LocationPerScan {
found := false
//foundEquals := true
for locIdx, existingLoc := range existingImg.LocationPerScan {
if loc.BeamVersion == existingLoc.BeamVersion && loc.ScanId == existingLoc.ScanId {
found = true

// Already exists, check if they're equal
for c, l := range loc.Locations {
el := existingLoc.Locations[c]
replaceExisting := false

if el == nil && l == nil {
continue
} else if el != nil && l != nil {
if el.I != l.I || el.J != l.J {
log.Printf("WARNING: Image %v beam version %v doesn't match image %v, beam version %v at idx: %v\n", origImageName, loc.BeamVersion, existingImg.ImageName, existingLoc.BeamVersion, c)
replaceExisting = true
}
} else {
log.Printf("WARNING: Image %v beam version %v doesn't match image %v, beam version %v at idx: %v, one is nil, one is not\n", origImageName, loc.BeamVersion, existingImg.ImageName, existingLoc.BeamVersion, c)
replaceExisting = true
}

if replaceExisting {
// At this point, delete what's already there, we will replace it with the newer image versions beam locations
existingImg.LocationPerScan = append(existingImg.LocationPerScan[0:locIdx], existingImg.LocationPerScan[locIdx+1:]...)

found = false // We removed it, so say it's not found (allowing it to be added now)
break
}
}
break
}
}

if !found {
// Add it
existingImg.LocationPerScan = append(existingImg.LocationPerScan, loc)
}
}

img = existingImg
}
}
// else: Must be a non-PDS file name, just store it in the new collection

insResult, err := newColl.UpdateByID(ctx, img.ImageName, bson.D{{Key: "$set", Value: img}}, options.Update().SetUpsert(true))
if err != nil {
fatalError(err)
} else if insResult.UpsertedCount <= 0 && insResult.MatchedCount <= 0 && insResult.ModifiedCount <= 0 {
log.Printf("Unexpected result for beam location upsert: %v. %+v\n", img.ImageName, insResult)
}
}

printFinishStats()
}

func getWithoutVersion(fileName string) string {
return fileName[0:len(fileName)-6] + "__" + fileName[len(fileName)-4:]
}

func fatalError(err error) {
printFinishStats()
log.Fatal(err)
}

func printFinishStats() {
t1 := time.Now().UnixMilli()
sec := (t1 - t0) / 1000
fmt.Printf("Runtime %v seconds\n", sec)
}
Loading

0 comments on commit a00cefa

Please sign in to comment.