1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| func (db *DB) ExecMulti(c redis.Connection) redis.Reply { return db.ExecMultiCommand(c.GetEnqueuedCmdLine(), c.GetWatching()) }
func (db *DB) ExecMultiCommand(cmdLines [][][]byte, watching map[string]uint32) redis.Reply {
writeKeys := make([]string, len(cmdLines)) readKeys := make([]string, len(cmdLines)+len(watching)) for _, cmdLine := range cmdLines { cmdName := strings.ToLower(string(cmdLine[0])) cmd, _ := cmdTable[cmdName]
prepare := cmd.prepare write, read := prepare(cmdLine[1:]) writeKeys = append(writeKeys, write...) readKeys = append(readKeys, read...) }
watchingKeys := make([]string, 0, len(watching)) for key := range watching { watchingKeys = append(watchingKeys, key) } readKeys = append(readKeys, watchingKeys...)
db.RWLocks(writeKeys, readKeys) defer db.RWUnLocks(writeKeys, readKeys)
versionChanged := db.checkVersionChanged(watching) if versionChanged { return reply.MakeNullBulkStringReply() }
var results [][]byte var undoLogs [][]CmdLine aborted := false for _, cmdLine := range cmdLines { cmdName := strings.ToLower(string(cmdLine[0])) cmd, _ := cmdTable[cmdName]
if config.Properties.OpenAtomicTx { key := string(cmdLine[1]) undoLogs = append(undoLogs, db.GetUndoLog(key)) }
fun := cmd.executor r, aofExpireCtx := fun(db, cmdLine[1:])
if config.Properties.OpenAtomicTx && reply.IsErrorReply(r) { undoLogs = undoLogs[:len(undoLogs)-1] aborted = true break }
results = append(results, []byte(r.DataString())) db.afterExec(r, aofExpireCtx, cmdLine) }
if len(results) == 0 { return reply.MakeEmptyMultiBulkStringReply() }
if config.Properties.OpenAtomicTx && aborted { size := len(undoLogs) for i := size - 1; i >= 0; i-- { undoLog := undoLogs[i] if len(undoLog) == 0 { continue } for _, cmdLine := range undoLog { db.ExecWithLock(cmdLine) } } return reply.MakeErrReply("EXECABORT Transaction rollback because of errors during executing. (atomic tx is open)") }
db.AddVersion(writeKeys...)
return reply.MakeMultiBulkStringReply(results) }
|