1 package com.acumenvelocity.ath.gcs;
2
3 import java.io.File;
4 import java.io.FileInputStream;
5 import java.io.InputStream;
6 import java.net.URI;
7 import java.nio.ByteBuffer;
8 import java.nio.channels.Channels;
9 import java.util.ArrayList;
10 import java.util.List;
11
12 import com.acumenvelocity.ath.common.Const;
13 import com.acumenvelocity.ath.common.Log;
14 import com.acumenvelocity.ath.common.exception.AthException;
15 import com.google.api.gax.paging.Page;
16 import com.google.auth.Credentials;
17 import com.google.auth.oauth2.GoogleCredentials;
18 import com.google.cloud.ReadChannel;
19 import com.google.cloud.WriteChannel;
20 import com.google.cloud.storage.Blob;
21 import com.google.cloud.storage.BlobId;
22 import com.google.cloud.storage.BlobInfo;
23 import com.google.cloud.storage.Storage;
24 import com.google.cloud.storage.Storage.BlobListOption;
25 import com.google.cloud.storage.StorageOptions;
26
27 public class AthStorage {
28
29 private static Storage gcs;
30
31 public static void init() throws Exception {
32 Credentials credentials = GoogleCredentials
33 .fromStream(new FileInputStream(Const.ATH_GCP_SECRET_FILE));
34
35 gcs = StorageOptions.newBuilder().setCredentials(credentials).build()
36 .getService();
37 }
38
39 public static boolean exists(URI gcsUrl) {
40 try {
41 BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
42 Blob blob = gcs.get(blobId);
43
44 return blob != null;
45
46 } catch (Exception e) {
47 return false;
48 }
49 }
50
51
52
53
54
55
56
57
58
59
60
61
62
63 public static void storeFile(URI gcsUrl, String contentType, File file) throws Exception {
64 BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
65
66
67 BlobInfo blobInfo = BlobInfo.newBuilder(blobId)
68 .setContentType(contentType)
69 .build();
70
71 long fileLength = file.length();
72
73
74 if (fileLength == 0) {
75
76 throw new Exception("Input file is 0 bytes. Cannot store to GCS.");
77 }
78
79
80
81 long totalBytesCopied = 0;
82
83 try (InputStream inputStream = new FileInputStream(file)) {
84
85
86 try (WriteChannel writer = gcs.writer(blobInfo)) {
87
88 byte[] buffer = new byte[8192];
89 int bytesRead;
90
91 while ((bytesRead = inputStream.read(buffer)) != -1) {
92
93 writer.write(ByteBuffer.wrap(buffer, 0, bytesRead));
94 totalBytesCopied += bytesRead;
95 }
96
97
98 }
99 }
100
101
102 if (totalBytesCopied != fileLength) {
103 System.err.println("GCS upload warning: Copied bytes (" + totalBytesCopied +
104 ") does not match input file size (" + fileLength + ")");
105
106 }
107 }
108
109 public static void loadFile(URI gcsUrl, File file) throws Exception {
110
111 BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
112 Blob blob = gcs.get(blobId);
113
114 if (blob == null) {
115 AthException.logAndThrow(AthStorage.class, "Cannot acccess GCS blob '{}'", blobId);
116 }
117
118 blob.downloadTo(file.toPath());
119 }
120
121 public static InputStream getInputStream(URI gcsUrl) throws Exception {
122
123 BlobId blobId = BlobId.fromGsUtilUri(gcsUrl.toString());
124 Blob blob = gcs.get(blobId);
125
126 if (blob == null) {
127 AthException.logAndThrow(AthStorage.class, "Cannot acccess GCS blob '{}'", blobId);
128 }
129
130 ReadChannel channel = blob.reader();
131 return Channels.newInputStream(channel);
132 }
133
134 public static Storage getGcs() {
135 return gcs;
136 }
137
138 public static List<String> getNames(String bucketName, String prefix) throws Exception {
139 List<String> list = new ArrayList<>();
140
141 Page<Blob> blobs = gcs.list(bucketName, BlobListOption.prefix(prefix));
142
143 for (Blob blob : blobs.iterateAll()) {
144 list.add(blob.getName());
145 }
146
147 return list;
148 }
149
150 public static void deleteMany(String bucketName, String prefix) throws Exception {
151 Page<Blob> blobs = gcs.list(bucketName, BlobListOption.prefix(prefix));
152
153 for (Blob blob : blobs.iterateAll()) {
154 Log.info(AthStorage.class, "Deleting GCS blob '{}'", blob.getName());
155 blob.delete();
156 }
157 }
158 }