Program.cs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. using Microsoft.Azure.Batch.Conventions.Files;
  2. using Microsoft.WindowsAzure.Storage.Blob;
  3. using System;
  4. using System.Diagnostics;
  5. using System.IO;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. namespace BatchWrapper
  9. {
  10. public static class Program
  11. {
  12. private static string dataDirectory = "F:\\data";
  13. private static string tempDirectory = "F:\\temp";
  14. private static string[] directories = { dataDirectory, tempDirectory };
  15. private static readonly TimeSpan stdoutFlushDelay = TimeSpan.FromSeconds(3);
  16. private static void WriteLine(string message) => WriteLineInternal(Console.Out, message);
  17. private static void WriteErrorLine(string message) => WriteLineInternal(Console.Error, message);
  18. private static void WriteLineInternal(TextWriter writer, string message)
  19. {
  20. var lines = message?.Split('\n') ?? new string[0];
  21. foreach (var line in lines)
  22. {
  23. writer.WriteLine($"[{DateTime.UtcNow:u}] {line?.TrimEnd()}");
  24. }
  25. }
  26. public static async Task<int> Main(string[] args)
  27. {
  28. var assembly = typeof(Program).Assembly;
  29. WriteLine($"{assembly.ManifestModule.Name} v{assembly.GetName().Version.ToString(3)}");
  30. // Get the command payload
  31. var payload = new Payload();
  32. if (args.Length > 0)
  33. {
  34. payload.Action = (ActionType)Enum.Parse(typeof(ActionType), args[0]);
  35. payload.LogicalServerName = args[1] + ".database.windows.net";
  36. payload.DatabaseName = args[2];
  37. payload.AccessToken = args[3];
  38. payload.ApplicatonPackageName = args[4];
  39. payload.ApplicatonPackageVersion = args[5];
  40. }
  41. // Cleanup folders
  42. foreach (string dir in directories)
  43. {
  44. if (Directory.Exists(dir))
  45. {
  46. Directory.Delete(dir, true);
  47. }
  48. Directory.CreateDirectory(dir);
  49. }
  50. string sqlPackageBacpacFile = Path.Combine(dataDirectory, payload.DatabaseName + ".bacpac");
  51. string sqlPackageLogPath = payload.DatabaseName + ".log";
  52. var targetDir = Environment.GetEnvironmentVariable(Constants.EnvironmentVariableNames.AppPackagePrefix + "_" + payload.ApplicatonPackageName + "#" + payload.ApplicatonPackageVersion);
  53. var workingDir = Environment.GetEnvironmentVariable(Constants.EnvironmentVariableNames.TaskWorkingDir);
  54. string taskId = Environment.GetEnvironmentVariable(Constants.EnvironmentVariableNames.AzBatchTaskId);
  55. string jobContainerUrl = Environment.GetEnvironmentVariable(Constants.EnvironmentVariableNames.JobContainerUrl);
  56. // Build the import/export command
  57. var cmdBuilder = new StringBuilder();
  58. cmdBuilder.Append($"/Action:{payload.Action}");
  59. cmdBuilder.Append(" /MaxParallelism:16");
  60. cmdBuilder.Append(String.Format(" /DiagnosticsFile:{0}", sqlPackageLogPath));
  61. cmdBuilder.Append(" /p:CommandTimeout=604800");
  62. switch (payload.Action)
  63. {
  64. case ActionType.Export:
  65. cmdBuilder.Append($" /SourceServerName:{payload.LogicalServerName}");
  66. cmdBuilder.Append($" /SourceDatabaseName:{payload.DatabaseName}");
  67. cmdBuilder.Append($" /AccessToken:{payload.AccessToken}");
  68. cmdBuilder.Append($" /TargetFile:{sqlPackageBacpacFile}");
  69. cmdBuilder.Append($" /SourceTimeout:30");
  70. cmdBuilder.Append(String.Format(" /p:TempDirectoryForTableData=\"{0}\"", tempDirectory));
  71. cmdBuilder.Append(" /p:VerifyFullTextDocumentTypesSupported=false");
  72. break;
  73. case ActionType.Import:
  74. cmdBuilder.Append($" /TargetServerName:{payload.LogicalServerName}");
  75. cmdBuilder.Append($" /TargetDatabaseName:{payload.DatabaseName}");
  76. cmdBuilder.Append($" /AccessToken:{payload.AccessToken}");
  77. cmdBuilder.Append($" /TargetTimeout:30");
  78. cmdBuilder.Append($" /SourceFile:{sqlPackageBacpacFile}");
  79. break;
  80. default:
  81. throw new ArgumentException($"Invalid action type: {payload.Action}");
  82. }
  83. if (payload.Action == ActionType.Import)
  84. {
  85. WriteLine(string.Format("Downloading {0} bacpac file to {1}", payload.DatabaseName, sqlPackageBacpacFile));
  86. CloudBlobContainer container = new CloudBlobContainer(new Uri(jobContainerUrl));
  87. CloudBlockBlob blob = container.GetBlockBlobReference(String.Format("$JobOutput/{0}.bacpac", payload.DatabaseName));
  88. blob.DownloadToFile(sqlPackageBacpacFile, FileMode.CreateNew);
  89. if (File.Exists(sqlPackageBacpacFile))
  90. {
  91. WriteLine(string.Format("Downloaded {0} bacpac file to {1}", payload.DatabaseName, sqlPackageBacpacFile));
  92. }
  93. else
  94. {
  95. throw new Exception(string.Format("{0} didn't download", sqlPackageBacpacFile));
  96. }
  97. }
  98. // Perform the import/export process
  99. var startTime = DateTimeOffset.UtcNow;
  100. var process = new Process
  101. {
  102. StartInfo = new ProcessStartInfo
  103. {
  104. WorkingDirectory = workingDir,
  105. FileName = Path.Combine(targetDir, "sqlpackage.exe"),
  106. Arguments = cmdBuilder.ToString(),
  107. CreateNoWindow = true,
  108. UseShellExecute = false,
  109. RedirectStandardOutput = true,
  110. RedirectStandardError = true
  111. }
  112. };
  113. process.OutputDataReceived += (s, e) => WriteLine(e.Data);
  114. process.ErrorDataReceived += (s, e) => WriteErrorLine(e.Data);
  115. process.Start();
  116. process.BeginOutputReadLine();
  117. process.BeginErrorReadLine();
  118. process.WaitForExit();
  119. WriteLine(String.Format("SqlPackage.exe exited with code: {0}", process.ExitCode));
  120. if (payload.Action == ActionType.Export)
  121. {
  122. if (File.Exists(sqlPackageBacpacFile))
  123. {
  124. WriteLine(string.Format("Downloaded {0} bacpac file to {1}", payload.DatabaseName, sqlPackageBacpacFile));
  125. }
  126. else
  127. {
  128. throw new Exception(string.Format("{0} didn't downloaded", sqlPackageBacpacFile));
  129. }
  130. // Persist the Job Output
  131. JobOutputStorage jobOutputStorage = new JobOutputStorage(new Uri(jobContainerUrl));
  132. await jobOutputStorage.SaveAsync(JobOutputKind.JobOutput, sqlPackageLogPath);
  133. WriteLine(String.Format("Uploaded {0} to job account", sqlPackageLogPath));
  134. await jobOutputStorage.SaveAsync(JobOutputKind.JobOutput, sqlPackageBacpacFile, payload.DatabaseName + ".bacpac");
  135. WriteLine(String.Format("Uploaded {0} to job account", sqlPackageBacpacFile));
  136. }
  137. // We are tracking the disk file to save our standard output, but the node agent may take
  138. // up to 3 seconds to flush the stdout stream to disk. So give the file a moment to catch up.
  139. await Task.Delay(stdoutFlushDelay);
  140. // Cleanup folders
  141. foreach (string dir in directories)
  142. {
  143. if (Directory.Exists(dir))
  144. {
  145. Directory.Delete(dir, true);
  146. }
  147. }
  148. return process.ExitCode;
  149. }
  150. }
  151. }