Reputation: 1699
I am trying to do a bulk upsert with the mgo library. I was reading the documentation regarding bulk upserts since this is the first time I've ever used MongoDB, and it looks like I have to provide pairs of documents to update.
In my function, I'm performing a find all query, then using the results from the query to use as the existing part of the pair for the bulk.Upsert()
operation. I'm not sure if this is the right way to go about it, but I have to do upserts on ~65k documents at a time.
Here are the type structs, and the worker pool function which reads from a channel to perform the aforementioned MongoDB operations.
// types from my project's `lib` package.
type Auctions struct {
Auc int `json:"auc" bson:"_id"`
Item int `json:"item" bson:"item"`
Owner string `json:"owner" bson:"owner"`
OwnerRealm string `json:"ownerRealm" bson:"ownerRealm"`
Bid int `json:"bid" bson:"bid"`
Buyout int `json:"buyout" bson:"buyout"`
Quantity int `json:"quantity" bson:"quantity"`
TimeLeft string `json:"timeLeft" bson:"timeLeft"`
Rand int `json:"rand" bson:"rand"`
Seed int `json:"seed" bson:"seed"`
Context int `json:"context" bson:"context"`
BonusLists []struct {
BonusListID int `json:"bonusListId" bson:"bonusListId"`
} `json:"bonusLists,omitempty" bson:"bonusLists,omitempty"`
Modifiers []struct {
Type int `json:"type" bson:"type"`
Value int `json:"value" bson:"value"`
} `json:"modifiers,omitempty" bson:"modifiers,omitempty"`
PetSpeciesID int `json:"petSpeciesId,omitempty" bson:"petSpeciesId,omitempty"`
PetBreedID int `json:"petBreedId,omitempty" bson:"petBreedId,omitempty"`
PetLevel int `json:"petLevel,omitempty" bson:"petLevel,omitempty"`
PetQualityID int `json:"petQualityId,omitempty" bson:"petQualityId,omitempty"`
}
type AuctionResponse struct {
Realms []struct {
Name string `json:"name"`
Slug string `json:"slug"`
} `json:"realms"`
Auctions []Auctions `json:"auctions"`
}
func (b *Blizzard) RealmAuctionGrabber(realms chan string, db *mgo.Database, wg *sync.WaitGroup) {
defer wg.Done()
for i := range realms {
NewReq, err := http.NewRequest("GET", fmt.Sprintf("%s/auction/data/%s", b.Url, i), nil)
if err != nil {
fmt.Printf("Cannot create new request for realm %s: %s", i, err)
}
// Update the request with the default parameters and grab the files links.
Request := PopulateDefaultParams(NewReq)
log.Debugf("downloading %s auction locations.", i)
Response, err := b.Client.Do(Request)
if err != nil {
fmt.Printf("Error request realm auction data: %s\n", err)
}
defer Response.Body.Close()
Body, err := ioutil.ReadAll(Response.Body)
if err != nil {
fmt.Printf("Error parsing request body: %s\n", err)
}
var AuctionResp lib.AuctionAPI
err = json.Unmarshal(Body, &AuctionResp)
if err != nil {
fmt.Printf("Error marshalling auction repsonse body: %s\n", err)
}
for _, x := range AuctionResp.Files {
NewDataReq, err := http.NewRequest("GET", x.URL, nil)
if err != nil {
log.Error(err)
}
AuctionResponse, err := b.Client.Do(NewDataReq)
if err != nil {
fmt.Printf("Error request realm auction data: %s\n", err)
}
defer Response.Body.Close()
AuctionBody, err := ioutil.ReadAll(AuctionResponse.Body)
if err != nil {
fmt.Printf("Error parsing request body: %s\n", err)
}
var AuctionData lib.AuctionResponse
err = json.Unmarshal(AuctionBody, &AuctionData)
// grab all the current records, then perform an Upsert!
var existing []lib.Auctions
col := db.C(i)
err = col.Find(nil).All(&existing)
if err != nil {
log.Error(err)
}
log.Infof("performing bulk upsert for %s", i)
auctionData, err := bson.Marshal(AuctionData.Auctions)
if err != nil {
log.Error("error marshalling bson: %s", err)
}
existingData, _ := bson.Marshal(existing)
bulk := db.C(i).Bulk()
bulk.Upsert(existingData, auctionData)
_, err = bulk.Run()
if err != nil {
log.Error("error performing upsert! error: ", err)
}
}
}
}
When I call bulk.Upsert(existingData,auctionData)
, things are fine. However, when I call bulk.Run()
, I'm getting this logged error message:
{"level":"error","msg":"error performing upsert! error: wrong type for 'q' field, expected object, found q: BinData(0, 0500000000)","time":"2016-07-09T16:53:45-07:00"}
I'm assuming this is related to how I'm doing the BSON tagging in the Auction
struct, but I'm not sure because this is the first time I've worked with MongoDB. Right now there are no collections in the database,
Is the error message related to the BSON tagging, and how can I fix it?
Upvotes: 0
Views: 3282
Reputation: 4235
Don't know if this is still actual. Here is the code:
package main
import (
"gopkg.in/mgo.v2"
"log"
"io/ioutil"
"encoding/json"
"gopkg.in/mgo.v2/bson"
)
type Auctions struct {
Auc int `json:"auc" bson:"_id"`
Owner string `json:"owner" bson:"owner"`
}
type AuctionResponse struct {
Auctions []Auctions `json:"auctions"`
}
func main() {
session, err := mgo.Dial("mongodb://127.0.0.1:27017/aucs")
if err != nil {
panic(err)
}
defer session.Close()
session.SetMode(mgo.Monotonic, true)
db := session.DB("aucs")
var AuctionData AuctionResponse
AuctionBody, _ := ioutil.ReadFile("auctions.json")
err = json.Unmarshal(AuctionBody, &AuctionData)
log.Println("performing bulk upsert for %s", "realm")
bulk := db.C("realm").Bulk()
for _, auc := range AuctionData.Auctions {
bulk.Upsert(bson.M{"_id": auc.Auc}, auc)
}
_, err = bulk.Run()
if err != nil {
log.Panic("error performing upsert! error: ", err)
}
}
There are some changes that I've made to check if I was right on real data but I hope it's not hard to understand what's going on. Now let me explain a bit Mongo's part.
existing
documents. That would be strange to request all existing documents before updating or inserting them especially on huge databases.mgo
's upsert
function requires two parameters:
selector
document (I prefer to think about it as about WHERE
condition in terms of SQL) document
that you actually have and that will be inserted into DB if selector query failsupsert
there is unlimited amount of selector - document
pairs available to push in one request like bulk.Upsert(sel1, doc1, sel2, doc2, sel3, doc3)
but in your case there is no point to use this feature.upsert
affect only single document so even if you selector
fits multiple documents only first will be updated. In other words, upsert
is more similar to MySQL's INSERT ... ON DUPLICATE KEY UPDATE
than to UPDATE ... WHERE
. In your case (with unique auction's IDs) single upsert
looks like MySQL query:
INSERT INTO realm (auc, owner) VALUES ('1', 'Hogger') ON DUPLICATE KEY UPDATE owner = 'Hogger';
With bulk
we can just make multiple upserts in the loop and then run them at once.
Upvotes: 1